Showing posts with label Java concurrency. Show all posts
Showing posts with label Java concurrency. Show all posts

Friday, August 16, 2024

Java Semaphore With Examples

Semaphore is one of the synchronization aid provided by Java concurrency util in Java 5 along with other synchronization aids like CountDownLatch, CyclicBarrier, Phaser and Exchanger.

The Semaphore class present in java.util.concurrent package is a counting semaphore in which a semaphore, conceptually, maintains a set of permits. Semaphore class in Java has two methods that make use of permits-

  • acquire()- Acquires a permit from this semaphore, blocking until one is available, or the thread is interrupted. It has another overloaded version acquire(int permits).
  • release()- Releases a permit, returning it to the semaphore. It has another overloaded method release(int permits).

Thursday, August 15, 2024

Java Exchanger With Examples

Exchanger in Java is one of the Synchronization class added along with other synchronization classes like CyclicBarrier, CountDownLatch, Semaphore and Phaser in java.util.concurrent package.

How does Exchanger in Java work

Exchanger makes it easy for two threads to exchange data between themselves. Exchanger provides a synchronization point at which two threads can pair and swap elements. Exchanger waits until two separate threads call its exchange() method. When two threads have called the exchange() method, Exchanger will swap the objects presented by the threads.

Thursday, June 6, 2024

Java CountDownLatch With Examples

There are scenarios in an application when you want one or more threads to wait until one or more events being performed in other threads complete. CountDownLatch in Java concurrent API helps in handling such scenarios.

Note that CountDownLatch was introduced in Java 5 along with other concurrent classes like CyclicBarrier, ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue with in java.util.Concurrent package.


How CountDownLatch is used

CountDownLatch in Java, as the name suggests, can be visualized as a latch that is released only after the given number of events occur. When an instance of CountDownLatch is created it is initialized with a count. This count denotes the number of times event must occur before waiting threads can pass through the latch.

Note that a CountDownLatch initialized to N can be used either ways-

  • To make one thread wait until N threads have completed some action, or
  • Some action has been completed N times (may be by a single thread).

Each time one of these events occur count is decremented using the countdown() method of the CountDownLatch class. Waiting threads are released when the count reaches zero.

Thread(s) that are waiting for the latch to release are blocked using await() method.

Java CountDownLatch Constructor

CountDownLatch(int count)

Constructs a CountDownLatch initialized with the given count. Here count specifies the number of events that must happen in order for the latch to open.

await() and countdown() methods in CountDownLatch class

await() and countdown() are two main methods in CountDownLatch class which control the working of the latch.

await() method- A thread that waits on the latch to open calls await() method, await() method has two forms.

1. public void await() throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted. If the current count is zero then this method returns immediately.

If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happen:

  • The count reaches zero due to invocations of the countDown() method
  • Some other thread interrupts the current thread.

2. public boolean await(long timeout, TimeUnit unit) throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted, or the specified waiting time elapses, the waiting time is specified by an object of TimeUnit enumeration.

If the current count is zero then this method returns immediately with the value true. If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happen:

  • The count reaches zero due to invocations of the countDown() method.
  • Some other thread interrupts the current thread.
  • The specified waiting time elapses.

countdown() method- Threads which are executing the events signal the completion of the event by calling countDown() method.

public void countDown()

Decrements the count of the latch, releasing all waiting threads if the count reaches zero.

CountDownLatch Java Example program

That's a lot of theory so let's see an example to make it clearer and see how await(), countdown() and the constructor to provide count are actually used.

Let's take a scenario where your application needs to read 3 files, parse the read lines and only after reading and parsing all the three files the application should move ahead to do some processing with the parsed objects.
So here we'll have three separate threads reading three separate files and the main thread awaits until all the three threads finish and call countdown().

public class CountdownlatchDemo {
  public static void main(String[] args) {
    CountDownLatch cdl = new CountDownLatch(3);
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
    Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
    Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));
    t1.start();
    t2.start();
    t3.start();
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("Files are read ... Start further processing");
  }
}

class FileReader implements Runnable {
  private String threadName;
  private String fileName;
  private CountDownLatch cdl;
  FileReader(String threadName, String fileName, CountDownLatch cdl){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cdl = cdl;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);
    // do countdown here
    cdl.countDown();
  } 
}

Output

Reading file file-1 thread thread-1
Reading file file-3 thread thread-3
Reading file file-2 thread thread-2
Files are read ... Start further processing

Here it can be seen that inside main() method, CountDownLatch instance cdl is created with an initial count of 3. Then three instances of FileReader are created that start three new threads. Then the main thread calls await() on cdl, which causes the main thread to wait until cdl count has been decremented three times. Notice that cdl instance is passed as a parameter to the FileReader constructor that cdl instance is used to call countdown() method in order to decrement the count. Once the countdown reaches zero, the latch opens allowing the main thread to resume.

You can comment the code where await() is called, then main thread will resume even before all the 3 files are read, so you see in these type of scenarios where you want the thread to resume only after certain events occur then CountDownLatch is a powerful synchronization aid that allows one or more threads to wait for certain events to finish in other threads.

From the above example if you got the feeling that whatever count you have given in the CountDownLatch, you should spawn the same number of threads for countdown then that is a wrong understanding. As I have mentioned it depends on the number of events, so you can very well have a single thread with a loop and decrementing the count there.

Let's change the example used above to have single thread and use for loop to countdown.

public class CountdownlatchDemo {
  public static void main(String[] args) {
    CountDownLatch cdl = new CountDownLatch(3);
    // Initializing threads to read 3 different files.
    Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
    /*Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
    Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));*/
    t1.start();
    /*t2.start();
    t3.start();*/
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("Files are read ... Start further processing");
  }
}

class FileReader implements Runnable {
  private String threadName;
  private String fileName;
  private CountDownLatch cdl;
  FileReader(String threadName, String fileName, CountDownLatch cdl){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cdl = cdl;        
  }
  @Override
  public void run() {
    for(int i = 0; i < 3; i++){
      System.out.println("Reading file " + fileName + " thread " + threadName);
      // do countdown here
      cdl.countDown();
    }
  }
}

Output

Reading file file-1 thread thread-1
Reading file file-1 thread thread-1
Reading file file-1 thread thread-1
Files are read ... Start further processing

Here you can see that only a single thread is used and countdown is done on the number of events. So it is true both ways. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

Usage of CountDownLatch in Java

As you have seen in the example you can use CountDownLatch when you want to break your code in such a way that more than one thread can process the part of the code but you can start further processing only when all the threads which are working on some part of the code have finished. Once all the threads have finished main thread can come out of the await (as the latch is released) and start further processing.

You can also use CountDownLatch to test concurrency by giving a certain count in the CountDownLatch Constructor and start that many threads. Also there may be more than one waiting thread, so that scenario how waiting threads behave once the countdown reaches zero (as all of them will be released at once) can also be tested.

If you have some external dependencies and once all the dependencies are up and running then only you should start processing in your application. That kind of scenario can also be handled with CountDownLatch.

CountDownLatch in Java can not be reused

One point to remember is CountDownLatch cannot be reused. Once the countdown reaches zero any further call to await() method won't block any thread. It won't throw any exception either.

Let's see an example. We'll use the same example as above and spawn 3 more threads once the first three set of threads are done.

public class CountdownlatchDemo {
  public static void main(String[] args) {
    CountDownLatch cdl = new CountDownLatch(3);
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
    Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
    Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));
    t1.start();
    t2.start();
    t3.start();
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("Files are read ... Start further processing");
    Thread t4 = new Thread(new FileReader("thread-4", "file-4", cdl));
    Thread t5 = new Thread(new FileReader("thread-5", "file-5", cdl));
    Thread t6 = new Thread(new FileReader("thread-6", "file-6", cdl));
    t4.start();
    t5.start();
    t6.start();
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
      System.out.println("Files are read again ... Start further processing");
  }
}

class FileReader implements Runnable {
  private String threadName;
  private String fileName;
  private CountDownLatch cdl;
  FileReader(String threadName, String fileName, CountDownLatch cdl){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cdl = cdl;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);
    // do countdown here
    cdl.countDown();
  }
}

Output

Reading file file-2 thread thread-2
Reading file file-3 thread thread-3
Reading file file-1 thread thread-1
Files are read ... Start further processing
Files are read again ... Start further processing
Reading file file-4 thread thread-4
Reading file file-6 thread thread-6
Reading file file-5 thread thread-5

Here note that await() is called again after starting thread4, thread5 and thread6 but it doesn't block main thread as it did for the first three threads. "Files are read again ... Start further processing" is printed even before the next three threads are processed. Another concurrent utility CyclicBarrier can be resued infact that is one of the difference between CountDownLatch and CyclicBarrier.

Points to note

  • A CountDownLatch initialized to N, using its constructor, can be used to make one (or more) thread wait until N threads have completed some action, or some action has been completed N times.
  • countDown() method is used to decrement the count, once the count reaches zero the latch is released.
  • await() method is used to block the thread(s) waiting for the latch to release.
  • CountDownLatch cannot be reused. Once the countdown reaches zero any further call to await() method won't block any thread.

That's all for this topic Java CountDownLatch With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between CountDownLatch And CyclicBarrier in Java
  2. ConcurrentHashMap in Java With Examples
  3. AtomicInteger in Java With Examples
  4. ConcurrentSkipListMap in Java With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. Difference Between Thread And Process in Java
  2. Java ThreadLocal Class With Examples
  3. How to remove duplicate elements from an ArrayList in Java
  4. Difference Between Comparable and Comparator in Java
  5. Creating Custom Exception Class in Java
  6. Garbage Collection in Java
  7. NodeJS NPM Package Manager
  8. Spring Batch Processing Using JDBCTemplate batchUpdate() Method

Wednesday, June 5, 2024

Java CyclicBarrier With Examples

There are scenarios in concurrent programming when you want set of threads to wait for each other at a common point until all threads in the set have reached that common point, concurrent util provides a synchronization aid CyclicBarrier in Java to handle such scenarios where you want set of threads to wait for each other to reach a common barrier point.

The barrier is called cyclic because it can be re-used after the waiting threads are released.

Note that CyclicBarrier was introduced in Java 5 along with other concurrent classes like CountDownLatch, ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue with in the java.util.Concurrent package.


CyclicBarrier class constructors

CyclicBarrier class in Java has following two constructors-

CyclicBarrier(int parties)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and does not perform a predefined action when the barrier is tripped.

CyclicBarrier(int parties, Runnable barrierAction)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.

Here parties parameter signifies the number of threads that must invoke await() before the barrier is tripped.

barrierAction specifies a thread that will be executed when the barrier is reached.

How CyclicBarrier is used

First thing is to create a CyclicBarrier object using any of the two constructors, specifying the number of threads that should wait for each other. When each thread reaches the barrier (common point) call await() method on the CyclicBarrier object. This will suspend the thread until all the threads call the await() method on the same CyclicBarrier object. Once all the specified threads have called await() method that will trip the barrier and all threads can resume operation.

If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue.

await() method in CyclicBarrier

await() method has following two forms-

  1. public int await() throws InterruptedException, BrokenBarrierException
  2. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

In the second form it Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.

If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens:

  • The last thread arrives; or
  • The specified timeout elapses; (In case of second form) or
  • Some other thread interrupts the current thread; or
  • Some other thread interrupts one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes reset() on this barrier.

Await() method returns int which is the arrival index of the current thread, where index (Number of specified threads - 1) indicates the first to arrive and zero indicates the last to arrive.

CyclicBarrier Java example

Now is the time to see an example of CyclicBarrier in Java. Let's take a scenario where your application needs to read 3 files using 3 threads, parse the read lines and only after reading and parsing all the three files the application should call another thread for further processing. In this scenario we can use CyclicBarrier and provide a runnable action to execute thread once all the threads reach the barrier.

public class CyclicBarrierDemo {
  public static void main(String[] args) {
    CyclicBarrier cb = new CyclicBarrier(3, new AfterAction());
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new TxtReader("thread-1", "file-1", cb));
    Thread t2 = new Thread(new TxtReader("thread-2", "file-2", cb));
    Thread t3 = new Thread(new TxtReader("thread-3", "file-3", cb));
    t1.start();
    t2.start();
    t3.start();
    
    System.out.println("Done ");
  }
}

class TxtReader implements Runnable {
  private String threadName;
  private String fileName;
  private CyclicBarrier cb;
  TxtReader(String threadName, String fileName, CyclicBarrier cb){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cb = cb;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);    
    try{
      // calling await so the current thread suspends
      cb.await();           
    } catch (InterruptedException e) {
      System.out.println(e);
    } catch (BrokenBarrierException e) {
      System.out.println(e);
    }
  }
}

class AfterAction implements Runnable {
  @Override
  public void run() {
    System.out.println("In after action class, start further processing as all files are read");
  }
}

Output

Done 
Reading file file-2 thread thread-2
Reading file file-1 thread thread-1
Reading file file-3 thread thread-3
In after action class, start further processing as all files are read

In the code CyclicBarrier instance is created with 3 parties so the barrier will trip when 3 threads are waiting upon it.

One thing to note here is that main thread doesn't block as can be seen from the "Done" printed even before the threads start. Also it can be seen the AfterAction class is executed once all the three threads call the await() method and the barrier is tripped.

Now if you want to block the main thread then you have to call the await() on the main thread too. Let's take another CyclicBarrier example where two services are started using two separate threads and main thread should start process only after both the services are executed.

public class CBExample {
  public static void main(String[] args) {
    CyclicBarrier cb = new CyclicBarrier(3);
    // Creating two threads with CyclicBarrier obj as param
    Thread t1 = new Thread(new FirstService(cb));
    Thread t2 = new Thread(new SecondService(cb));
    System.out.println("starting threads ");
    t1.start();
    t2.start();
        
    try {
      // Calling await for main thread
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    // once await is called for all the three threads, execution starts again
    System.out.println("In main thread, processing starts again ..... ");
    }
}

class FirstService implements Runnable {
  CyclicBarrier cb;
  FirstService(CyclicBarrier cb){
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("In First service, thread " + Thread.currentThread().getName());
    try {
      // Calling await for Thread-0
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }      
  }   
}

class SecondService implements Runnable {
  CyclicBarrier cb;
  SecondService(CyclicBarrier cb){
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("In Second service, thread " + Thread.currentThread().getName());
    try {
      // Calling await for Thread-1
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }    
  }    
}

Output

starting threads 
In First service, thread Thread-0
In Second service, thread Thread-1
In main thread, processing starts again .....

Here it can be seen that main thread starts only after both the services are executed.

CyclicBarrier can be reused

Unlike CountDownLatch, CyclicBarrier in Java can be reused after the waiting threads are released.

Let's reuse the same example as above where three threads were used to read 3 files. Now three more threads are added to read 3 more files and the same CyclicBarrier object is used with initial count as 3.

public class CyclicBarrierDemo {
  public static void main(String[] args) {
    CyclicBarrier cb = new CyclicBarrier(3, new AfterAction());
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new TxtReader("thread-1", "file-1", cb));
    Thread t2 = new Thread(new TxtReader("thread-2", "file-2", cb));
    Thread t3 = new Thread(new TxtReader("thread-3", "file-3", cb));
    
    t1.start();
    t2.start();
    t3.start();
        
    System.out.println("Start another set of threads ");
    
    Thread t4 = new Thread(new TxtReader("thread-4", "file-4", cb));
    Thread t5 = new Thread(new TxtReader("thread-5", "file-5", cb));
    Thread t6 = new Thread(new TxtReader("thread-6", "file-6", cb));
    t4.start();
    t5.start();
    t6.start();           
  }
}

class TxtReader implements Runnable {
  private String threadName;
  private String fileName;
  private CyclicBarrier cb;
  TxtReader(String threadName, String fileName, CyclicBarrier cb){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cb = cb;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);    
    try{
      // calling await so the current thread suspends
      cb.await();
        
    } catch (InterruptedException e) {
      System.out.println(e);
    } catch (BrokenBarrierException e) {
      System.out.println(e);
    }
  }
}

class AfterAction implements Runnable {
  @Override
  public void run() {
    System.out.println("In after action class, start further processing 
     as all files are read");
  }
}

Output

Start another set of threads 
Reading file file-1 thread thread-1
Reading file file-2 thread thread-2
Reading file file-3 thread thread-3
In after action class, start further processing as all files are read
Reading file file-4 thread thread-4
Reading file file-5 thread thread-5
Reading file file-6 thread thread-6
In after action class, start further processing as all files are read

Here it can be seen that specified runnableAction class is called twice as the CyclicBarrier is reused here. Note that the thread order may be different while executing the code.

Points to note

  • A CyclicBarrier initialized to N, using its constructor, can be used to make N threads wait using await() and the barrier will be broken once all the N threads call await() method.
  • A barrierAction can also be provided while creating CyclicBarrier object. This barrierAction will be executed once the barrier is tripped. This barrier action is useful for updating shared-state before any of the parties continue.
  • If the current thread is not the last to arrive then it is paused after calling await() and lies dormant until the last thread arrives, current thread or some other waiting thread is interrupted by any other thread, specified timeout elapses (as provided in await()) or some thread calls reset() method.
  • reset() method resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a BrokenBarrierException.
  • CyclicBarrier in Java uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (orInterruptedException if they too were interrupted at about the same time).

That's all for this topic Java CyclicBarrier With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between CountDownLatch And CyclicBarrier in Java
  2. Java ArrayBlockingQueue With Examples
  3. Java Semaphore With Examples
  4. Java Exchanger With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. ConcurrentHashMap in Java With Examples
  2. Volatile Keyword in Java With Examples
  3. Just In Time Compiler (JIT) in Java
  4. Optional class in Java 8
  5. Method reference in Java 8
  6. finalize method in Java
  7. How to Resolve Local Variable Defined in an Enclosing Scope Must be Final or Effectively Final Error
  8. Angular One-Way Data Binding Using String Interpolation

Sunday, June 2, 2024

Java Phaser With Examples

Phaser in Java is one of the synchronization aid provided in concurrency util. Phaser is similar to other synchronization barrier utils like CountDownLatch and CyclicBarrier. What sets Phaser apart is it is reusable (like CyclicBarrier) and more flexible in usage. In both CountDownLatch and CyclicBarrier number of parties (thread) that are registered for waiting can't change where as in Phaser that number can vary. Also note that Phaser has been introduced in Java 7.

Phaser in Java is more suitable for use where it is required to synchronize threads over one or more phases of activity. Though Phaser can be used to synchronize a single phase, in that case it acts more like a CyclicBarrier. But it is more suited where threads should wait for a phase to finish, then advance to next phase, wait again for that phase to finish and so on.


Java Phaser constructors

Phaser class in Java has 4 constructors

  • Phaser()- Creates a new phaser with no initially registered parties, no parent, and initial phase number 0.
  • Phaser(int parties)- Creates a new phaser with the given number of registered unarrived parties, no parent, and initial phase number 0.
  • Phaser(Phaser parent)- Creates a new phaser with the given parent with no initially registered parties.
  • Phaser(Phaser parent, int parties)- Creates a new phaser with the given parent and number of registered unarrived parties.

How Phaser in Java works

First thing is to create a new instance of Phaser.

Next thing is to register one or more parties with the Phaser. That can be done using register(), bulkRegister(int) or by specifying number of parties in the constructor.

Now since Phaser is a synchronization barrier so we have to make phaser wait until all registered parties finish a phase. That waiting can be done using arrive() or any of the variants of arrive() method. When the number of arrivals is equal to the parties which are registered that phase is considered completed and it advances to next phase (if there is any), or terminate.

Note that each generation of a phaser has an associated phase number. The phase number starts at zero, and advances when all parties arrive at the phaser, wrapping around to zero after reaching Integer.MAX_VALUE.

Methods in Java Phaser class

Some of the methods in Phaser class are as given below-

  • resgister()- Adds a new unarrived party to this phaser. It returns the arrival phase number to which this registration applied.
  • arrive()- Arrives at this phaser, without waiting for others to arrive. Note that arrive() method does not suspend execution of the calling thread. Returns the arrival phase number, or a negative value if terminated. Note that this method should not be called by an unregistered party.
  • arriveAndDeregister()- Arrives at this phaser and deregisters from it without waiting for others to arrive. Returns the arrival phase number, or a negative value if terminated.
  • arriveAndAwaitAdvance()- This method awaits other threads to arrives at this phaser. Returns the arrival phase number, or the (negative) current phase if terminated. If you want to wait for all the other registered parties to complete a given phase then use this method.
  • bulkRegister(int parties)– Used to register perties in bulk. Given number of new unarrived parties will be registered to this phaser.
  • onAdvance(int phase, int registeredParties)– If you want to perform some action before the phase is advanced you can override this method. Also used to control termination.

Java Phaser Features

1. Phaser is more flexible- Unlike the case for other barriers, the number of parties registered to synchronize on a Phaser may vary over time. Tasks may be registered at any time (using methods register(), bulkRegister(int), or by specifying initial number of parties in the constructor). Tasks may also be optionally deregistered upon any arrival (using arriveAndDeregister()).

2. Phaser termination- A Phaser may enter a termination state, that may be checked using method isTerminated(). Upon termination, all synchronization methods immediately return without waiting for advance, as indicated by a negative return value. Similarly, attempts to register upon termination have no effect.

3. Phaser Tiering- Phasers in Java may be tiered (i.e., constructed in tree structures) to reduce contention. Phasers with large numbers of parties may experience heavy synchronization contention costs. These may be set up as a groups of sub-phasers which share a common parent. This may greatly increase throughput even though it incurs greater per-operation overhead.

Phaser Java example code

Let's try to make things clearer through an example. So we'll have two phases in the application. In the first phase we have three threads reading 3 different files, parsing and storing them in DB, then in second phase 2 threads are started to query the DB table on the inserted records. Let's assume that one of the field is age in the DB table and we want to query count of those having age greater than 40 using one thread and in another thread we want to get the count of those having age less than or equal to 40.

public class PhaserDemo {

 public static void main(String[] args) {
  Phaser ph = new Phaser(1);
  int curPhase;
  curPhase = ph.getPhase();
  System.out.println("Phase in Main " + curPhase + " started");
  // Threads for first phase
  new FileReaderThread("thread-1", "file-1", ph);
  new FileReaderThread("thread-2", "file-2", ph);
  new FileReaderThread("thread-3", "file-3", ph);
  //For main thread
  ph.arriveAndAwaitAdvance();
  System.out.println("New phase " + ph.getPhase() + " started");
  // Threads for second phase
  new QueryThread("thread-1", 40, ph);
  new QueryThread("thread-2", 40, ph);
  curPhase = ph.getPhase();
  ph.arriveAndAwaitAdvance();
  System.out.println("Phase " + curPhase + " completed");
  // deregistering the main thread
  ph.arriveAndDeregister();
 }
}

class FileReaderThread implements Runnable {
 private String threadName;
 private String fileName;
 private Phaser ph;

 FileReaderThread(String threadName, String fileName, Phaser ph){
  this.threadName = threadName;
  this.fileName = fileName;
  this.ph = ph;
  ph.register();
  new Thread(this).start();
 }
 @Override
 public void run() {
  System.out.println("This is phase " + ph.getPhase());
  
  try {
   Thread.sleep(20);
   System.out.println("Reading file " + fileName + " thread " 
                           + threadName + " parsing and storing to DB ");
   // Using await and advance so that all thread wait here
   ph.arriveAndAwaitAdvance();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  ph.arriveAndDeregister();
 }
}

class QueryThread implements Runnable {
 private String threadName;
 private int param;
 private Phaser ph;
 
 QueryThread(String threadName, int param, Phaser ph){
  this.threadName = threadName;
  this.param = param;
  this.ph = ph;
  ph.register();
  new Thread(this).start();
 }
 
 @Override
 public void run() {
  
  System.out.println("This is phase " + ph.getPhase());
  System.out.println("Querying DB using param " + param 
                  + " Thread " + threadName);
  ph.arriveAndAwaitAdvance();
  System.out.println("Threads finished");
  ph.arriveAndDeregister();
 }
}

Output

Phase in Main 0 started
This is phase 0
This is phase 0
This is phase 0
Reading file file-1 thread thread-1 parsing and storing to DB 
Reading file file-2 thread thread-2 parsing and storing to DB 
Reading file file-3 thread thread-3 parsing and storing to DB 
New phase 1 started
This is phase 1
Querying DB using param 40 Thread thread-1
This is phase 1
Querying DB using param 40 Thread thread-2
Threads finished
Threads finished
Phase 1 completed

Here it can be seen that first a Phaser instance ph is created with initial party count as 1, which corresponds to the main thread.

Then in the first set of 3 threads which are used in the first phase ph object is also passed which is used for synchronization. As you can see in the run method of the FileReaderThread class arriveAndAwaitAdvance() method is used so that the threads wait there for other threads. We have registered 3 more threads after the initial main thread so arriveAndAwaitAdvance() is used in the main method too to make the main thread wait before advancing.

In the second phase another set of two threads are created which are using the same phaser object ph for synchronization.

Logic for reading the file, parsing the file and storing it in the DB is not given here. Also the queries used in the second thread are not given. The scenario used here is to explain Phaser so that's where the concentration is.

Phaser Monitoring

Phaser class in Java has several methods for monitoring. These methods can be called by any caller not only by registered parties.

  • getRegisteredParties()- Returns the number of parties registered at this phaser.
  • getArrivedParties()- Returns the number of registered parties that have arrived at the current phase of this phaser.
  • getUnarrivedParties()- Returns the number of registered parties that have not yet arrived at the current phase of this phaser.
  • getPhase()- Returns the current phase number.

Overriding onAdvance() method in Phaser

If you want to perform an action before advancing from one phase to another, it can be done by overriding the onAdvance() method of the Phaser class. This method is invoked when the Phaser advances from one phase to another.
If this method returns true, this phaser will be set to a final termination state upon advance, and subsequent calls to isTerminated() will return true.
If this method returns false, phaser will be kept alive.

onAdvance() method

protected boolean onAdvance(int phase, int registeredParties)
Here
  • phase- current phase number on entry to this method, before this phaser is advanced.
  • registeredParties- the current number of registered parties.

One of the use case to override onAdvance() method is to ensure that your phaser executes a given number of phases and then stop.

So we'll create a class called PhaserAdvance that will extend Phaser and override the onAdvance() method to ensure that specified number of phases are executed.

Overriding onAdvance() method example

public class PhaserAdvance extends Phaser{
  PhaserAdvance(int parties){
    super(parties);
  }
    
  // Overriding the onAdvance method
  @Override
  protected boolean onAdvance(int phase, int registeredParties) {
    System.out.println("In onAdvance method, current phase which is completed 
      is " + phase );
    // Want to ensure that phaser runs for 2 phases i.e. phase 1 
    // or the no. of registered parties become zero
    if(phase == 1 || registeredParties == 0){
      System.out.println("phaser will be terminated ");
      return true;
    }else{
      System.out.println("phaser will continue ");
      return false;
    }     
  }
    
  public static void main(String... args) {
    // crating phaser instance
    PhaserAdvance ph = new PhaserAdvance(1);
    // creating three threads
    new TestThread("thread-1", ph);
    new TestThread("thread-2", ph);
    new TestThread("thread-3", ph);
    
    while(!ph.isTerminated()){
      ph.arriveAndAwaitAdvance();
    }
    System.out.println("In main method, phaser is terminated");
  }
}

class TestThread implements Runnable {
  private String threadName;
  private Phaser ph;

  TestThread(String threadName, Phaser ph){
    this.threadName = threadName;
    this.ph = ph;
    // register new unarrived party to this phaser
    ph.register();
    new Thread(this).start();
  }
  @Override
  public void run() {
    // be in the loop till the phaser is terminated
    while(!ph.isTerminated()){
      System.out.println("This is phase " + ph.getPhase() + 
        " And Thread - "+ threadName);
      // Using await and advance so that all thread wait here
      ph.arriveAndAwaitAdvance();
    }      
  }
}

Output

This is phase 0 And Thread - thread-1
This is phase 0 And Thread - thread-2
This is phase 0 And Thread - thread-3
In onAdvance method, current phase which is completed is 0
phaser will continue 
This is phase 1 And Thread - thread-3
This is phase 1 And Thread - thread-2
This is phase 1 And Thread - thread-1
In onAdvance method, current phase which is completed is 1
phaser will be terminated 
In main method, phaser is terminated

Here it can be seen that a new class PhaserAdvance is created extending the Phaser class. This PhaserAdvance class overrides the onAdvance() method of the Phaser class. In the overridden onAdvance() method it is ensured that 2 phases are executed thus the if condition with phase == 1 (phase count starts from 0).

That's all for this topic Java Phaser With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between CountDownLatch And CyclicBarrier in Java
  2. Java Exchanger With Examples
  3. ConcurrentHashMap in Java With Examples
  4. Java PriorityBlockingQueue With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. Difference Between ReentrantLock and Synchronized in Java
  2. Callable and Future in Java With Examples
  3. Why wait(), notify() And notifyAll() Methods Are in Object Class And Not in Thread Class
  4. Java ThreadLocal Class With Examples
  5. How to Sort Elements in Different Order in TreeSet
  6. How ArrayList Works Internally in Java
  7. Global Keyword in Python With Examples
  8. Spring Transaction Management Example - @Transactional Annotation and JDBC

Saturday, June 1, 2024

ConcurrentHashMap in Java With Examples

ConcurrentHashMap in Java is introduced as another thread-safe alternative to Hashtable (or explicitly synchronizing the map using synchronizedMap method) from Java 5. ConcurrentHashMap class extends AbstractMap class and implements ConcurrentMap interface through which it provides thread safety and atomicity guarantees.

Why another Map

First thing that comes to mind is why another map when we already have a HashMap or HashTable. If you need to use a Map like structure with in a multi-threaded environment with thread safety you can use a Hashtable or a synchronized HashMap by using Collections.synchronizedMap() method. Then what unique does ConcurrentHashMap in Java offer?

Problem with Hashtable or synchronized Map is that all of its methods are synchronized on a common lock thus only a single thread can access it at any given time, even for read operations, making these data structures slower. ConcurrentHashMap is designed to provide better performance while providing thread safety too.


How performance is improved in ConcurrentHashMap

ConcurrentHashMap in Java is also a hash based map like HashMap, how it differs is the locking strategy used by it. Unlike HashTable (or synchronized HashMap) it doesn't synchronize every method on a common lock. ConcurrentHashMap uses separate lock for separate buckets thus locking only a portion of the Map.

If you have idea about the internal implementation of the HashMap you must be knowing that by default there are 16 buckets. Same concept is used in ConcurrentHashMap and by default there are 16 buckets and also separate locks for separate buckets. So the default concurrency level is 16.

Since there are 16 buckets having separate locks of their own which effectively means at any given time 16 threads can operate on the map concurrently, provided all these threads are operating on separate buckets. So you see how ConcurrentHashMap provides better performance by locking only the portion of the map rather than blocking the whole map resulting in greater shared access.

For locking any of the bucket independently of the other buckets the first node in the bucket is locked by using synchronized keyword. Note that before Java 8, implementation of Java ConcurrentHashMap used to have Segment array with with each segment having its own lock which has been changed from Java 8. Now the first node in the bucket itself is locked using the node's own builtin synchronized monitors.

ConcurrentHashMap Internal implementation in Java
ConcurrentHashMap implementation in Java

Further performance improvement

Performance of Java ConcurrentHashMap is further improved by providing read access concurrently without any blocking. Retrieval operations (including get) generally do not block, so may overlap with update operations (including put and remove). Retrievals reflect the results of the most recently completed update operations which may mean that retrieval operations may not fetch the current/in-progress value (Which is one drawback). Memory visibility for the read operations is ensured by volatile reads. You can see in the ConcurrentHashMap code that the val and next fields of Node are volatile.

Also for aggregate operations such as putAll and clear which works on the whole map, concurrent retrievals may reflect insertion or removal of only some entries (another drawback of separate locking). Because read operations are not blocking but some of the writes (which are on the same bucket) may still be blocking.

Constructors in Java ConcurrentHashMap

There are five constructors in the ConcurrentHashMap class-

  • ConcurrentHashMap()- Creates a new, empty map with the default initial table size (16).
  • ConcurrentHashMap(int initialCapacity)- Creates a new, empty map with an initial table size accommodating the specified number of elements without the need to dynamically resize.
  • ConcurrentHashMap(int initialCapacity, float loadFactor)- Creates a new, empty map with an initial table size based on the given number of elements (initialCapacity) and initial table density (loadFactor).
  • ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)- Creates a new, empty map with an initial table size based on the given number of elements (initialCapacity), table density (loadFactor), and number of concurrently updating threads (concurrencyLevel).
  • ConcurrentHashMap​(Map<? extends K,? extends V> m)- Creates a new map with the same mappings as the given map.

Java ConcurrentHashMap example

At this juncture let's see a simple example where a ConcurrentHashMap is created and (key, value) pairs are added to it. Then getting the collection view of the Map it is iterated to display the stored keys and values.

public class CHMDemo {
  public static void main(String[] args) {
    // Creating ConcurrentHashMap
    Map<String, String> cityTemperatureMap = new ConcurrentHashMap<String, String>();
    
    // Storing elements
    cityTemperatureMap.put("Delhi", "24");
    cityTemperatureMap.put("Mumbai", "32");
    //cityTemperatureMap.put(null, "26");
    cityTemperatureMap.put("Chennai", "35");
    cityTemperatureMap.put("Bangalore", "22" );
    
    for (Map.Entry<String, String> e : cityTemperatureMap.entrySet()) {
      System.out.println(e.getKey() + " = " + e.getValue());
    }
  }
}

Null is not allowed in ConcurrentHashMap

Though HashMap allows one null as key but ConcurrentHashMap doesn't allow null as key. In the previous example you can uncomment the line which has null key. While trying to execute the program it will throw null pointer exception.

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CHMDemo {
  public static void main(String[] args) {
    // Creating ConcurrentHashMap
    Map cityTemperatureMap = new ConcurrentHashMap();
    
    // Storing elements
    cityTemperatureMap.put("Delhi", "24");
    cityTemperatureMap.put("Mumbai", "32");
    // Adding null key
    cityTemperatureMap.put(null, "26");
    cityTemperatureMap.put("Chennai", "35");
    cityTemperatureMap.put("Bangalore", "22" );
    
    for (Map.Entry e : cityTemperatureMap.entrySet()) {
      System.out.println(e.getKey() + " = " + e.getValue());
    }
  }
}
Exception in thread "main" java.lang.NullPointerException
 at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
 at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
 at org.netjs.prog.CHMDemo.main(CHMDemo.java:16)

Atomic operations in ConcurrentHashMap

ConcurrentHashMap in Java provides a lot of atomic methods, let's see it with an example how these atomic methods help. Note that from Java 8 many new atomic methods are added.

Suppose you have a word Map that counts the frequency of every word where key is the word and count is the value, in a multi-threaded environment, even if ConcurrentHashMap is used, there may be a problem as described in the code snippet.

public class CHMAtomicDemo {
  public static void main(String[] args) {
    ConcurrentHashMap<String, Integer> wordMap = new ConcurrentHashMap<>();
    ..
    ..
    // Suppose one thread is interrupted after this line and 
    // another thread starts execution
    Integer prevValue = wordMap.get(word); 
    
    Integer newValue = (prevValue == null ? 1 : prevValue + 1);
    // Here the value may not be correct after the execution of 
    // both threads
    wordMap.put(word, newValue);  
  }
}

To avoid these kind of problems you can use atomic method, one of the atomic method is compute which can be used here.

wordMap.compute(word, (k,v)-> v == null ? 1 : v + 1);

If you see the general structure of the Compute method

compute(K key, BiFunction<? super K,? super V,? extendsV> remappingFunction)
Here BiFunction functional interface is used which can be implemented as a lambda expression.

So here rather than having these lines-

Integer prevValue = wordMap.get(word); 
Integer newValue = (prevValue == null ? 1 : prevValue + 1);
wordMap.put(word, newValue);
you can have only this line
wordMap.compute(word, (k,v)-> v == null ? 1 : v + 1);

The entire method invocation is performed atomically. Some attempted update operations on this map by other threads may be blocked while computation is in progress.

There are several other atomic operations like computeIfAbsent, computeIfPresent, merge, putIfAbsent.

Fail-safe iterator in ConcurrentHashMap

Iterator returned by the Java ConcurrentHashMap is fail-safe which means it will not throw ConcurrentModificationException. It can be seen in the example code where a new (key, value) pair is added while the map is iterated, still it doesn't throw ConcurrentModificationException.

public class CHMDemo {
  public static void main(String[] args) {
    // Creating ConcurrentHashMap
    Map<String, String> cityTemperatureMap = new ConcurrentHashMap<String, String>();
    
    // Storing elements
    cityTemperatureMap.put("Delhi", "24");
    cityTemperatureMap.put("Mumbai", "32");
    cityTemperatureMap.put("Chennai", "35");
    cityTemperatureMap.put("Bangalore", "22" );
    
    Iterator<String> iterator = cityTemperatureMap.keySet().iterator();   
    while (iterator.hasNext()){
      System.out.println(cityTemperatureMap.get(iterator.next()));
      // adding new value, it won't throw error
      cityTemperatureMap.put("Kolkata", "34");        
    }
  }
}

Output

24
35
34
32
22

According to the JavaDocs- The view's iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

When is ConcurrentHashMap a better choice

ConcurrentHashMap is a better choice when there are more reads than writes. As mentioned above retrieval operations are non-blocking so many concurrent threads can read without any performance problem. If there are more writes and that too many threads operating on the same segment then the threads will block which will deteriorate the performance.

Points to note

  • ConcurrentHashMap in Java is also a hash based map like HashMap, but ConcurrentHashMap is thread safe.
  • In ConcurrentHashMap thread safety is ensured by having separate locks for separate buckets, resulting in better performance.
  • In ConcurrentHashMap class, by default the bucket size is 16 and the concurrency level is also 16.
  • Null keys are not allowed in Java ConcurrentHashMap.
  • Iterator provided by the ConcurrentHashMap is fail-safe, which means it will not throw ConcurrentModificationException.
  • Retrieval operations (like get) don't block so may overlap with update operations (including put and remove).

That's all for this topic ConcurrentHashMap in Java With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between HashMap And ConcurrentHashMap in Java
  2. CopyOnWriteArrayList in Java With Examples
  3. How HashMap Works Internally in Java
  4. Java CountDownLatch With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. How to Iterate a HashMap of ArrayLists of String in Java
  2. How to Sort HashSet in Java
  3. Difference Between Comparable and Comparator in Java
  4. Try-With-Resources in Java With Examples
  5. static reference to the non-static method or field error
  6. Race Condition in Java Multi-Threading
  7. Angular Reactive Form Validation Example
  8. Spring MVC Radiobutton And Radiobuttons Form Tag Example

Monday, May 20, 2024

Java ReentrantReadWriteLock With Examples

This post gives an introduction to ReadWriteLock interface and it's implementing class ReentrantReadWriteLock in Java with usage examples.


ReadWriteLock in Java

Even in a multi-threading application multiple reads can occur simultaneously for a shared resource. It is only when multiple writes happen simultaneously or intermix of read and write that there is a chance of writing the wrong value or reading the wrong value.

ReadWriteLock in Java uses the same idea in order to boost the performance by having separate pair of locks. A ReadWriteLock maintains a pair of associated locks-

  • One for read-only operations; and
  • one for writing.

The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.

Having a pair of read-write lock allows for a greater level of concurrency in accessing shared data than that permitted by a mutual exclusion lock. It exploits the fact that while only a single thread at a time (a writer thread) can modify the shared data, in many cases any number of threads can concurrently read the data (hence reader threads).

A read-write lock will improve performance over the use of a mutual exclusion lock if the frequency of reads is more than writes, duration of the read operations is more than the duration of the writes. It also depends on the contention for the data - that is, the number of threads that will try to read or write the data at the same time.

For example, a collection that is initially populated with data and thereafter infrequently modified, while being frequently searched (such as a directory of some kind) is an ideal candidate for the use of a read-write lock. However, if updates become frequent then the data spends most of its time being exclusively locked and there is little, if any increase in concurrency.

ReentrantReadWriteLock class in Java

As already mentioned ReentrantReadWriteLock is an implementation of the ReadWriteLock interface which provides a pair of read-write lock. ReentrantReadWriteLock has similar semantics to ReentrantLock in Java.

ReentrantReadWriteLock class in Java does not impose a reader or writer preference ordering for lock access which means there is no acquisition preference. Though there is an optional fairness policy. A ReentrantReadWriteLock is fair or not is specified in its constructor.

ReentrantReadWriteLock in Java allows both read and write locks to reacquire read and write locks in the same fashion as done in Reentrant lock. See an example here.

Java ReentrantReadWriteLock constructors

  • ReentrantReadWriteLock()- Creates a new ReentrantReadWriteLock with default (nonfair) ordering properties.
  • ReentrantReadWriteLock(boolean fair)- Creates a new ReentrantReadWriteLock with the given fairness policy.

Fair mode in ReentrantReadWriteLock

When constructed as fair, threads contend for entry using an approximately arrival-order policy. When the currently held lock is released, either the longest-waiting single writer thread will be assigned the write lock, or if there is a group of reader threads waiting longer than all waiting writer threads, that group will be assigned the read lock.

Lock downgrading in ReentrantReadWriteLock

ReentrantReadWriteLock also allows downgrading from the write lock to a read lock. You can first acquire a write lock, then the read lock and then release the write lock. So you are effectively left with a read lock. However, upgrading from a read lock to the write lock is not possible.

Example of lock downgrading

If you have a scenario where you want to read from a cache only if it is still valid, using a read lock. If cache is dirty then you need to acquire a write lock and put data in the cache again.

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReentrantDowngrade {
  Object data;
  volatile boolean cacheValid;
  ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

  void processCacheData(){
    // first acquire a read lock
    rwl.readLock().lock();
    // check if cache is still valid
    if (!cacheValid) {
      // Must release read lock before acquiring 
      // write lock, as upgrading not possible
      rwl.readLock().unlock();
      rwl.writeLock().lock();
      try {
        // Recheck state because another thread might have
        // acquired write lock and changed state before we did.
        if (!cacheValid) {
          // get fresh data for the cache
          data = ...
          cacheValid = true;
        }
        // Downgrade by acquiring read lock before 
        // releasing write lock
        rwl.readLock().lock();
      } finally {
        // Unlock write, still hold read
        rwl.writeLock().unlock(); 
      }
    }
    try {
      // use cache data
      use(data);
    } finally {
      // Finally release the read lock
      rwl.readLock().unlock();
    }
  }
}

ReentrantReadWriteLock Java example

Let us see another example where two threads are using the read lock and one write lock. In class ReentrantRWDemo there are two methods, get() is used to get data from the TreeMap, so read lock is used. Another method put() is used to add value to a map and uses the write lock.

There are 2 classes ReadThread which is used for reader threads and another class WriterThread is used for write threads. In the program two reader thread and one writer thread are spawned.

public class ReentrantRWDemo {
  private final Map<String, String> m = new TreeMap<String, String>();
  private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
  // get method for getting values from map
  // it can be used by many read threads simultaneously
  public String get(String key) {
    System.out.println("In get method waiting to acquire lock");
    rwl.readLock().lock();
    System.out.println("In get method acquired read lock");
    try { 
      try {
        Thread.sleep(1500);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      return m.get(key); 
    }
    finally { 
      rwl.readLock().unlock(); 
      System.out.println("In get method released read lock");
    }
  }
    
  // Put method to store  key, value in a map
  // it acquires a write lock so only one thread at a time
  public String put(String key, String value) {
    System.out.println("In put method waiting to acquire lock");
    rwl.writeLock().lock();
    System.out.println("In put method acquired write lock");
    try { 
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
      return m.put(key, value); 
    }
    finally { 
      rwl.writeLock().unlock(); 
      System.out.println("In put method released write lock");
    }
  }
    
  public void display(){
    m.entrySet().forEach(System.out::println);
      
  }
    
  public static void main(String... args) {
    ReentrantRWDemo rwDemo = new ReentrantRWDemo();
    // Putting some values in the map
    rwDemo.put("1", "One");
    rwDemo.put("2", "Two");
    rwDemo.put("3", "Three");
    
    // Starting two read threads and one write thread
    Thread rThread1 = new Thread(new ReadThread(rwDemo));
    Thread wThread = new Thread(new WriterThread(rwDemo));
    Thread rThread2 = new Thread(new ReadThread(rwDemo));
    rThread1.start();
    wThread.start();
    rThread2.start();
    // Wait for the threads to finish, then only go for display method
    try {
      rThread1.join();
      wThread.join();
      rThread2.join();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }    
    rwDemo.display();        
  }
}

class ReadThread implements Runnable {
  ReentrantRWDemo rwDemo;
  ReadThread(ReentrantRWDemo rwDemo){
    this.rwDemo = rwDemo;
  }
  public void run() {
    System.out.println("Value - " + rwDemo.get("1"));
  }
}

class WriterThread implements Runnable {
  ReentrantRWDemo rwDemo;
  WriterThread(ReentrantRWDemo rwDemo){
    this.rwDemo = rwDemo;
  }
  public void run() {
    rwDemo.put("4", "Four");
  }
}

Output

In put method waiting to acquire lock
In put method acquired write lock
In put method released write lock
In put method waiting to acquire lock
In put method acquired write lock
In put method released write lock
In put method waiting to acquire lock
In put method acquired write lock
In put method released write lock
In get method waiting to acquire lock
In put method waiting to acquire lock
In put method acquired write lock
In get method waiting to acquire lock
In put method released write lock
In get method acquired read lock
In get method acquired read lock
In get method released read lock
Value - One
In get method released read lock
Value - One
1=One
2=Two
3=Three
4=Four

Here you can ignore the first three set of put prints as these are the messages for the first 3 puts that are used to add values to the TreeMap. As mentioned two reader threads and one writer thread are spawned. In the display I got (for you it may vary) it can be seen that write thread first locks the shared object rwDemo, though Thread.sleep is used to introduce some delay but the reader threads will wait until the write lock is released.

But both read locks can acquire lock simultaneously as confirmed by two consecutive "In get method acquired read lock" statement.

Also note that in display() method, method reference with lambda expression is used to display the map values. These features are available from Java 8.

Thread's join method is used so that values are displayed once all the threads have finished.

That's all for this topic Java ReentrantReadWriteLock With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between ReentrantLock and Synchronized in Java
  2. Java StampedLock With Examples
  3. Java CountDownLatch With Examples
  4. CopyOnWriteArrayList in Java With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. Executor And ExecutorService in Java With Examples
  2. Inter-thread Communication Using wait(), notify() And notifyAll() in Java
  3. Race Condition in Java Multi-Threading
  4. How and Why to Synchronize ArrayList in Java
  5. equals() And hashCode() Methods in Java
  6. Java Program to Convert a File to Byte Array
  7. @FunctionalInterface Annotation in Java
  8. Spliterator in Java

Thursday, May 2, 2024

Java ReentrantLock With Examples

java.util.concurrent.locks package has added support for locks, which provides an alternative to using Synchronized in Java in scenarios where we need to control access to a shared resource. In this post we'll talk about one of the concrete implementation of the lock interface called ReentrantLock in Java. There is another implementation ReentrantReadWriteLock which is implementation of ReadWriteLock interface.

ReentrantLock was added in Java 5 along with other concurrent features like CyclicBarrier, ConcurrentHashMap, CopyOnWriteArrayList with in java.util.Concurrent package, to develop concurrent applications.


What is ReentrantLock and why needed

ReentrantLock class in Java is a concrete implementation of the Lock interface which is present in java.util.concurrent.locks package. One question which comes to mind is why this separate functionality for locking is needed when there already is Synchronized keyword in Java which provides the same functionality.

As you must be knowing every object created in Java has one mutually exclusive lock associated with it. When you are using synchronized you are using that lock implicitly (with no other feature) whereas when you are using any of the lock implementation (like ReentrantLock) you are using that lock explicitly. Which means there are methods like lock() to acquire the lock and unlock() to release the lock. Along with that ReentrantLock in Java provides many other features like fairness, ability to interrupt and a thread waiting for a lock only for a specified period.

According to the Java docs "ReentrantLock is a reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities."

Why is it called ReentrantLock

It is called ReentrantLock as there is an acquisition count associated with the lock which means when you use lock() method to acquire a lock and you get it then the acquisition count is 1.

A Reentrant lock will also allow the lock holder to enter another block of code with the same lock object as thread already owns it. In that case, if a thread that holds the lock acquires it again, the acquisition count is incremented and the lock then needs to be released twice to truly release the lock. Let's see it with an example to make it clear-

Here two threads are created. In the run method of the thread class methodA() is called which uses the same lock object to control access. So you will see two things here-

  • Whichever thread acquires the lock will also be able to access methodA() critical section as it already holds the lock. Only thing is acquisition count will become 2.
  • Since in the methodA(), unlock() method is not used to release the lock (remember we need to release it twice as acquisition count is 2). So another thread will never get a chance to acquire a lock.
public class ReentrantDemo {
  public static void main(String[] args) {
    ReentrantLock rLock = new ReentrantLock();
    Thread t1 = new Thread(new Display("Thread-1", rLock));
    Thread t2 = new Thread(new Display("Thread-2", rLock));
    System.out.println("starting threads ");
    t1.start();
    t2.start();
  }
}

class Display implements Runnable {
  private String threadName;
  ReentrantLock lock;
  Display(String threadName, ReentrantLock lock){
    this.threadName = threadName;
    this.lock = lock;
  }
  @Override
  public void run() {
    System.out.println("In Display run method, thread " + threadName + 
     " is waiting to get lock");
    //acquiring lock
    lock.lock();
    try {
      System.out.println("Thread " + threadName + "has got lock");
      methodA();
    } finally{
      lock.unlock();
    }        
  }
    
  public void methodA(){
    System.out.println("In Display methodA, thread " + threadName + 
      " is waiting to get lock");
    //try {        
      lock.lock();      
      System.out.println("Thread " + threadName + "has got lock");
      System.out.println("Count of locks held by thread " + threadName + 
       " - " + lock.getHoldCount());
      // Not calling unlock
      /*} finally{
      lock.unlock();
    }*/
  }    
}

Output

starting threads 
In Display run method, thread Thread-1 is waiting to get lock
In Display run method, thread Thread-2 is waiting to get lock
Thread Thread-1has got lock
In Display methodA, thread Thread-1 is waiting to get lock
Thread Thread-1has got lock
Count of locks held by thread Thread-1 - 2

Here it can be seen that both thread starts and Thread-1 acquires a lock, Thread-1 will acquire the same lock again in methodA() but there it is not released. You can notice the method lock.getHoldCount() which gives the count of holds on this lock by the current thread. Since unlock() method is not called so lock is never released that is why Thread-2 never gets a chance to acquire a lock. You can see it never goes beyond this message "In Display run method, thread Thread-2 is waiting to get lock".

Note that in different runs thread which acquires a lock may vary.

Now let's correct the code and use the unlock() method to release the lock and see what happens.

public class ReentrantDemo {
  public static void main(String[] args) {
    ReentrantLock rLock = new ReentrantLock();
    Thread t1 = new Thread(new Display("Thread-1", rLock));
    Thread t2 = new Thread(new Display("Thread-2", rLock));
    System.out.println("starting threads ");
    t1.start();
    t2.start();
  }
}

class Display implements Runnable {
  private String threadName;
  ReentrantLock lock;
  Display(String threadName, ReentrantLock lock){
    this.threadName = threadName;
    this.lock = lock;
  }
  @Override
  public void run() {
    System.out.println("In Display run method, thread " + threadName + 
     " is waiting to get lock");
    //acquiring lock
    lock.lock();
    try {
      System.out.println("Thread " + threadName + "has got lock");
      methodA();
    } finally{
      lock.unlock();
    }      
  }
    
  public void methodA(){
    System.out.println("In Display methodA, thread " + threadName 
     + " is waiting to get lock");
    //acquiring lock
    lock.lock();
    try {
      System.out.println("Thread " + threadName + "has got lock");
      System.out.println("Count of locks held by thread " + threadName 
       + " - " + lock.getHoldCount());
    } finally{
      lock.unlock();
    }
  }  
}

Output

starting threads 
In Display run method, thread Thread-1 is waiting to get lock
In Display run method, thread Thread-2 is waiting to get lock
Thread Thread-1has got lock
In Display methodA, thread Thread-1 is waiting to get lock
Thread Thread-1has got lock
Count of locks held by thread Thread-1 - 2
Thread Thread-2has got lock
In Display methodA, thread Thread-2 is waiting to get lock
Thread Thread-2has got lock
Count of locks held by thread Thread-2 - 2

Now both threads are able to run as the locks are properly release after acquiring.

Convention while using ReentrantLock in Java

If you had noticed one thing in the above code lock.lock() method is always called before the try block. When you are using Reentrantlock in Java, it is a recommended practice to always immediately follow a call to lock with a try block.

If you will call lock() method with in the try block and some thing goes wrong while acquiring the lock finally block will still be called and there you will have lock.unlock() method. So you will end up unlocking the lock which was never acquired and that will result in IllegalMonitorStateException, that’s why it is recommended to call lock() method before try block.

At the same time you do want to unlock the acquired lock if something goes wrong after acquiring the lock, that is why immediately follow a call to lock with try block.

Features of ReentrantLock in Java

ReentrantLock provides many features like fairness, ability to interrupt and a thread waiting for a lock only for a specified period. Let's have a look at some of these features.

  1. Fairness- ReentrantLock has one constructor which takes boolean value as an argument. That lets you choose whether you want a fair or an unfair lock depending upon whether the boolean value is true or false. A fair lock is one where the threads acquire the lock in the same order they asked for it; whereas in case of an unfair lock a thread can sometimes acquire a lock before another thread that asked for it first.
  2. public ReentrantLock(boolean fair)
    
  3. Lock interruptibly- ReentrantLock provides a method lockInterruptibly, where the thread acquires a lock if it is not interrupted.
    public void lockInterruptibly() throws InterruptedException
     
  4. Ability to check if the lock is being held- ReentrantLock in Java provides ability to check if the lock is already being held using tryLock() method.

    tryLock()- Acquires the lock only if it is not held by another thread at the time of invocation.

    tryLock(long timeout, TimeUnit unit) - Acquires the lock if it is not held by another thread within the given waiting time and the current thread has not been interrupted.

  5. Some of the other methods in ReentrantLock class are as follows-
    • getHoldCount()- Queries the number of holds on this lock by the current thread.
    • getWaitingThreads(Condition condition) - Returns a collection containing those threads that may be waiting on the given condition associated with this lock.
    • isHeldByCurrentThread()- Queries if this lock is held by the current thread.
    • isLocked()- Queries if this lock is held by any thread.

Drawbacks of ReentrantLock in Java

  1. Need to wrap lock acquisitions in a try/finally block and release the lock in finally block. Otherwise, if the critical section code threw an exception, the lock might never be released.
  2. Need to call unlock() method explicitly. Forgetting to do that will result in lock never getting released which will create a lots of problem and make it very hard to detect performance problems.
    With synchronization, the JVM ensures that locks are automatically released.

ReentrantLock Java example code

Let us see one more example of Reentrant lock where a resource is shared between two threads and the access is controlled using locks.

Thread.sleep is used to induce some delay, in that case also another thread won't break in. Only when the unlock() method is called and the lock is released other thread gets a chance.

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
  public static void main(String[] args) {
    ReentrantLock rLock = new ReentrantLock();
    Thread t1 = new Thread(new Counter("Thread-1", rLock));
    Thread t2 = new Thread(new Counter("Thread-2", rLock));
    System.out.println("starting threads ");
    t1.start();
    t2.start();
  }
}

// Shared class for threads
class SharedResource{
  static int count = 0;
}

class Counter implements Runnable {
  private String threadName;
  ReentrantLock lock;
  Counter(String threadName, ReentrantLock lock){
    this.threadName = threadName;
    this.lock = lock;
  }
  @Override
  public void run() {
    System.out.println("In Counter run method, thread " + threadName 
    + " is waiting to get lock");
    // acquiring the lock
    lock.lock();
    try {
      System.out.println("Thread " + threadName + " has got lock");
      SharedResource.count++;
      System.out.println("Thread " + threadName + 
       " Count " + SharedResource.count);
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    } finally{
      System.out.println("Thread " + threadName 
       + " releasing lock");
      // releasing the lock
      lock.unlock();
    }    
  }
}

Output

starting threads 
In Counter run method, thread Thread-1 is waiting to get lock
In Counter run method, thread Thread-2 is waiting to get lock
Thread Thread-1 has got lock
Thread Thread-1 Count 1
Thread Thread-1 releasing lock
Thread Thread-2 has got lock
Thread Thread-2 Count 2
Thread Thread-2 releasing lock

Points to remember

  • ReentrantLock in Java is a reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods with some extended features.
  • Some of these features include fairness, ability to interrupt and a thread waiting for a lock only for a specified period.
  • Thread which is currently holding a lock can repeatedly enter the same lock, acquisition count increments as many times current thread acquires the same lock.
  • lock has to be released as many times as it has been acquired.
  • Failure to call unlock() as many times as the lock is acquired will result is lock not being released and the thread will continue to hold it.
  • unlock() method should be called in a finally block. Otherwise, if the critical section code threw an exception, the lock might never be released

That's all for this topic Java ReentrantLock With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. ConcurrentHashMap in Java With Examples
  2. Java StampedLock With Examples
  3. Difference Between CountDownLatch And CyclicBarrier in Java
  4. Java Phaser With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. Race Condition in Java Multi-Threading
  2. Why wait(), notify() And notifyAll() Methods Are in Object Class And Not in Thread Class
  3. How to Remove Duplicate Elements From an ArrayList in Java
  4. How to Sort elements in different order in TreeSet
  5. Difference Between Comparable and Comparator in Java
  6. @FunctionalInterface Annotation in Java
  7. Java Stream API Tutorial
  8. Dependency Injection in Spring Framework