Showing posts with label Java synchronization barriers. Show all posts
Showing posts with label Java synchronization barriers. Show all posts

Friday, August 16, 2024

Java Semaphore With Examples

Semaphore is one of the synchronization aid provided by Java concurrency util in Java 5 along with other synchronization aids like CountDownLatch, CyclicBarrier, Phaser and Exchanger.

The Semaphore class present in java.util.concurrent package is a counting semaphore in which a semaphore, conceptually, maintains a set of permits. Semaphore class in Java has two methods that make use of permits-

  • acquire()- Acquires a permit from this semaphore, blocking until one is available, or the thread is interrupted. It has another overloaded version acquire(int permits).
  • release()- Releases a permit, returning it to the semaphore. It has another overloaded method release(int permits).

Thursday, August 15, 2024

Java Exchanger With Examples

Exchanger in Java is one of the Synchronization class added along with other synchronization classes like CyclicBarrier, CountDownLatch, Semaphore and Phaser in java.util.concurrent package.

How does Exchanger in Java work

Exchanger makes it easy for two threads to exchange data between themselves. Exchanger provides a synchronization point at which two threads can pair and swap elements. Exchanger waits until two separate threads call its exchange() method. When two threads have called the exchange() method, Exchanger will swap the objects presented by the threads.

Thursday, June 6, 2024

Java CountDownLatch With Examples

There are scenarios in an application when you want one or more threads to wait until one or more events being performed in other threads complete. CountDownLatch in Java concurrent API helps in handling such scenarios.

Note that CountDownLatch was introduced in Java 5 along with other concurrent classes like CyclicBarrier, ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue with in java.util.Concurrent package.


How CountDownLatch is used

CountDownLatch in Java, as the name suggests, can be visualized as a latch that is released only after the given number of events occur. When an instance of CountDownLatch is created it is initialized with a count. This count denotes the number of times event must occur before waiting threads can pass through the latch.

Note that a CountDownLatch initialized to N can be used either ways-

  • To make one thread wait until N threads have completed some action, or
  • Some action has been completed N times (may be by a single thread).

Each time one of these events occur count is decremented using the countdown() method of the CountDownLatch class. Waiting threads are released when the count reaches zero.

Thread(s) that are waiting for the latch to release are blocked using await() method.

Java CountDownLatch Constructor

CountDownLatch(int count)

Constructs a CountDownLatch initialized with the given count. Here count specifies the number of events that must happen in order for the latch to open.

await() and countdown() methods in CountDownLatch class

await() and countdown() are two main methods in CountDownLatch class which control the working of the latch.

await() method- A thread that waits on the latch to open calls await() method, await() method has two forms.

1. public void await() throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted. If the current count is zero then this method returns immediately.

If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happen:

  • The count reaches zero due to invocations of the countDown() method
  • Some other thread interrupts the current thread.

2. public boolean await(long timeout, TimeUnit unit) throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted, or the specified waiting time elapses, the waiting time is specified by an object of TimeUnit enumeration.

If the current count is zero then this method returns immediately with the value true. If the current count is greater than zero then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happen:

  • The count reaches zero due to invocations of the countDown() method.
  • Some other thread interrupts the current thread.
  • The specified waiting time elapses.

countdown() method- Threads which are executing the events signal the completion of the event by calling countDown() method.

public void countDown()

Decrements the count of the latch, releasing all waiting threads if the count reaches zero.

CountDownLatch Java Example program

That's a lot of theory so let's see an example to make it clearer and see how await(), countdown() and the constructor to provide count are actually used.

Let's take a scenario where your application needs to read 3 files, parse the read lines and only after reading and parsing all the three files the application should move ahead to do some processing with the parsed objects.
So here we'll have three separate threads reading three separate files and the main thread awaits until all the three threads finish and call countdown().

public class CountdownlatchDemo {
  public static void main(String[] args) {
    CountDownLatch cdl = new CountDownLatch(3);
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
    Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
    Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));
    t1.start();
    t2.start();
    t3.start();
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("Files are read ... Start further processing");
  }
}

class FileReader implements Runnable {
  private String threadName;
  private String fileName;
  private CountDownLatch cdl;
  FileReader(String threadName, String fileName, CountDownLatch cdl){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cdl = cdl;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);
    // do countdown here
    cdl.countDown();
  } 
}

Output

Reading file file-1 thread thread-1
Reading file file-3 thread thread-3
Reading file file-2 thread thread-2
Files are read ... Start further processing

Here it can be seen that inside main() method, CountDownLatch instance cdl is created with an initial count of 3. Then three instances of FileReader are created that start three new threads. Then the main thread calls await() on cdl, which causes the main thread to wait until cdl count has been decremented three times. Notice that cdl instance is passed as a parameter to the FileReader constructor that cdl instance is used to call countdown() method in order to decrement the count. Once the countdown reaches zero, the latch opens allowing the main thread to resume.

You can comment the code where await() is called, then main thread will resume even before all the 3 files are read, so you see in these type of scenarios where you want the thread to resume only after certain events occur then CountDownLatch is a powerful synchronization aid that allows one or more threads to wait for certain events to finish in other threads.

From the above example if you got the feeling that whatever count you have given in the CountDownLatch, you should spawn the same number of threads for countdown then that is a wrong understanding. As I have mentioned it depends on the number of events, so you can very well have a single thread with a loop and decrementing the count there.

Let's change the example used above to have single thread and use for loop to countdown.

public class CountdownlatchDemo {
  public static void main(String[] args) {
    CountDownLatch cdl = new CountDownLatch(3);
    // Initializing threads to read 3 different files.
    Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
    /*Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
    Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));*/
    t1.start();
    /*t2.start();
    t3.start();*/
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("Files are read ... Start further processing");
  }
}

class FileReader implements Runnable {
  private String threadName;
  private String fileName;
  private CountDownLatch cdl;
  FileReader(String threadName, String fileName, CountDownLatch cdl){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cdl = cdl;        
  }
  @Override
  public void run() {
    for(int i = 0; i < 3; i++){
      System.out.println("Reading file " + fileName + " thread " + threadName);
      // do countdown here
      cdl.countDown();
    }
  }
}

Output

Reading file file-1 thread thread-1
Reading file file-1 thread thread-1
Reading file file-1 thread thread-1
Files are read ... Start further processing

Here you can see that only a single thread is used and countdown is done on the number of events. So it is true both ways. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

Usage of CountDownLatch in Java

As you have seen in the example you can use CountDownLatch when you want to break your code in such a way that more than one thread can process the part of the code but you can start further processing only when all the threads which are working on some part of the code have finished. Once all the threads have finished main thread can come out of the await (as the latch is released) and start further processing.

You can also use CountDownLatch to test concurrency by giving a certain count in the CountDownLatch Constructor and start that many threads. Also there may be more than one waiting thread, so that scenario how waiting threads behave once the countdown reaches zero (as all of them will be released at once) can also be tested.

If you have some external dependencies and once all the dependencies are up and running then only you should start processing in your application. That kind of scenario can also be handled with CountDownLatch.

CountDownLatch in Java can not be reused

One point to remember is CountDownLatch cannot be reused. Once the countdown reaches zero any further call to await() method won't block any thread. It won't throw any exception either.

Let's see an example. We'll use the same example as above and spawn 3 more threads once the first three set of threads are done.

public class CountdownlatchDemo {
  public static void main(String[] args) {
    CountDownLatch cdl = new CountDownLatch(3);
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new FileReader("thread-1", "file-1", cdl));
    Thread t2 = new Thread(new FileReader("thread-2", "file-2", cdl));
    Thread t3 = new Thread(new FileReader("thread-3", "file-3", cdl));
    t1.start();
    t2.start();
    t3.start();
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("Files are read ... Start further processing");
    Thread t4 = new Thread(new FileReader("thread-4", "file-4", cdl));
    Thread t5 = new Thread(new FileReader("thread-5", "file-5", cdl));
    Thread t6 = new Thread(new FileReader("thread-6", "file-6", cdl));
    t4.start();
    t5.start();
    t6.start();
    try {
      // main thread waiting till all the files are read
      cdl.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
      System.out.println("Files are read again ... Start further processing");
  }
}

class FileReader implements Runnable {
  private String threadName;
  private String fileName;
  private CountDownLatch cdl;
  FileReader(String threadName, String fileName, CountDownLatch cdl){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cdl = cdl;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);
    // do countdown here
    cdl.countDown();
  }
}

Output

Reading file file-2 thread thread-2
Reading file file-3 thread thread-3
Reading file file-1 thread thread-1
Files are read ... Start further processing
Files are read again ... Start further processing
Reading file file-4 thread thread-4
Reading file file-6 thread thread-6
Reading file file-5 thread thread-5

Here note that await() is called again after starting thread4, thread5 and thread6 but it doesn't block main thread as it did for the first three threads. "Files are read again ... Start further processing" is printed even before the next three threads are processed. Another concurrent utility CyclicBarrier can be resued infact that is one of the difference between CountDownLatch and CyclicBarrier.

Points to note

  • A CountDownLatch initialized to N, using its constructor, can be used to make one (or more) thread wait until N threads have completed some action, or some action has been completed N times.
  • countDown() method is used to decrement the count, once the count reaches zero the latch is released.
  • await() method is used to block the thread(s) waiting for the latch to release.
  • CountDownLatch cannot be reused. Once the countdown reaches zero any further call to await() method won't block any thread.

That's all for this topic Java CountDownLatch With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between CountDownLatch And CyclicBarrier in Java
  2. ConcurrentHashMap in Java With Examples
  3. AtomicInteger in Java With Examples
  4. ConcurrentSkipListMap in Java With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. Difference Between Thread And Process in Java
  2. Java ThreadLocal Class With Examples
  3. How to remove duplicate elements from an ArrayList in Java
  4. Difference Between Comparable and Comparator in Java
  5. Creating Custom Exception Class in Java
  6. Garbage Collection in Java
  7. NodeJS NPM Package Manager
  8. Spring Batch Processing Using JDBCTemplate batchUpdate() Method

Wednesday, June 5, 2024

Java CyclicBarrier With Examples

There are scenarios in concurrent programming when you want set of threads to wait for each other at a common point until all threads in the set have reached that common point, concurrent util provides a synchronization aid CyclicBarrier in Java to handle such scenarios where you want set of threads to wait for each other to reach a common barrier point.

The barrier is called cyclic because it can be re-used after the waiting threads are released.

Note that CyclicBarrier was introduced in Java 5 along with other concurrent classes like CountDownLatch, ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue with in the java.util.Concurrent package.


CyclicBarrier class constructors

CyclicBarrier class in Java has following two constructors-

CyclicBarrier(int parties)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and does not perform a predefined action when the barrier is tripped.

CyclicBarrier(int parties, Runnable barrierAction)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.

Here parties parameter signifies the number of threads that must invoke await() before the barrier is tripped.

barrierAction specifies a thread that will be executed when the barrier is reached.

How CyclicBarrier is used

First thing is to create a CyclicBarrier object using any of the two constructors, specifying the number of threads that should wait for each other. When each thread reaches the barrier (common point) call await() method on the CyclicBarrier object. This will suspend the thread until all the threads call the await() method on the same CyclicBarrier object. Once all the specified threads have called await() method that will trip the barrier and all threads can resume operation.

If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue.

await() method in CyclicBarrier

await() method has following two forms-

  1. public int await() throws InterruptedException, BrokenBarrierException
  2. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

In the second form it Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.

If the current thread is not the last to arrive then it is disabled for thread scheduling purposes and lies dormant until one of the following things happens:

  • The last thread arrives; or
  • The specified timeout elapses; (In case of second form) or
  • Some other thread interrupts the current thread; or
  • Some other thread interrupts one of the other waiting threads; or
  • Some other thread times out while waiting for barrier; or
  • Some other thread invokes reset() on this barrier.

Await() method returns int which is the arrival index of the current thread, where index (Number of specified threads - 1) indicates the first to arrive and zero indicates the last to arrive.

CyclicBarrier Java example

Now is the time to see an example of CyclicBarrier in Java. Let's take a scenario where your application needs to read 3 files using 3 threads, parse the read lines and only after reading and parsing all the three files the application should call another thread for further processing. In this scenario we can use CyclicBarrier and provide a runnable action to execute thread once all the threads reach the barrier.

public class CyclicBarrierDemo {
  public static void main(String[] args) {
    CyclicBarrier cb = new CyclicBarrier(3, new AfterAction());
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new TxtReader("thread-1", "file-1", cb));
    Thread t2 = new Thread(new TxtReader("thread-2", "file-2", cb));
    Thread t3 = new Thread(new TxtReader("thread-3", "file-3", cb));
    t1.start();
    t2.start();
    t3.start();
    
    System.out.println("Done ");
  }
}

class TxtReader implements Runnable {
  private String threadName;
  private String fileName;
  private CyclicBarrier cb;
  TxtReader(String threadName, String fileName, CyclicBarrier cb){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cb = cb;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);    
    try{
      // calling await so the current thread suspends
      cb.await();           
    } catch (InterruptedException e) {
      System.out.println(e);
    } catch (BrokenBarrierException e) {
      System.out.println(e);
    }
  }
}

class AfterAction implements Runnable {
  @Override
  public void run() {
    System.out.println("In after action class, start further processing as all files are read");
  }
}

Output

Done 
Reading file file-2 thread thread-2
Reading file file-1 thread thread-1
Reading file file-3 thread thread-3
In after action class, start further processing as all files are read

In the code CyclicBarrier instance is created with 3 parties so the barrier will trip when 3 threads are waiting upon it.

One thing to note here is that main thread doesn't block as can be seen from the "Done" printed even before the threads start. Also it can be seen the AfterAction class is executed once all the three threads call the await() method and the barrier is tripped.

Now if you want to block the main thread then you have to call the await() on the main thread too. Let's take another CyclicBarrier example where two services are started using two separate threads and main thread should start process only after both the services are executed.

public class CBExample {
  public static void main(String[] args) {
    CyclicBarrier cb = new CyclicBarrier(3);
    // Creating two threads with CyclicBarrier obj as param
    Thread t1 = new Thread(new FirstService(cb));
    Thread t2 = new Thread(new SecondService(cb));
    System.out.println("starting threads ");
    t1.start();
    t2.start();
        
    try {
      // Calling await for main thread
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    // once await is called for all the three threads, execution starts again
    System.out.println("In main thread, processing starts again ..... ");
    }
}

class FirstService implements Runnable {
  CyclicBarrier cb;
  FirstService(CyclicBarrier cb){
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("In First service, thread " + Thread.currentThread().getName());
    try {
      // Calling await for Thread-0
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }      
  }   
}

class SecondService implements Runnable {
  CyclicBarrier cb;
  SecondService(CyclicBarrier cb){
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("In Second service, thread " + Thread.currentThread().getName());
    try {
      // Calling await for Thread-1
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }    
  }    
}

Output

starting threads 
In First service, thread Thread-0
In Second service, thread Thread-1
In main thread, processing starts again .....

Here it can be seen that main thread starts only after both the services are executed.

CyclicBarrier can be reused

Unlike CountDownLatch, CyclicBarrier in Java can be reused after the waiting threads are released.

Let's reuse the same example as above where three threads were used to read 3 files. Now three more threads are added to read 3 more files and the same CyclicBarrier object is used with initial count as 3.

public class CyclicBarrierDemo {
  public static void main(String[] args) {
    CyclicBarrier cb = new CyclicBarrier(3, new AfterAction());
    // Initializing three threads to read 3 different files.
    Thread t1 = new Thread(new TxtReader("thread-1", "file-1", cb));
    Thread t2 = new Thread(new TxtReader("thread-2", "file-2", cb));
    Thread t3 = new Thread(new TxtReader("thread-3", "file-3", cb));
    
    t1.start();
    t2.start();
    t3.start();
        
    System.out.println("Start another set of threads ");
    
    Thread t4 = new Thread(new TxtReader("thread-4", "file-4", cb));
    Thread t5 = new Thread(new TxtReader("thread-5", "file-5", cb));
    Thread t6 = new Thread(new TxtReader("thread-6", "file-6", cb));
    t4.start();
    t5.start();
    t6.start();           
  }
}

class TxtReader implements Runnable {
  private String threadName;
  private String fileName;
  private CyclicBarrier cb;
  TxtReader(String threadName, String fileName, CyclicBarrier cb){
    this.threadName = threadName;
    this.fileName = fileName;
    this.cb = cb;        
  }
  @Override
  public void run() {
    System.out.println("Reading file " + fileName + " thread " + threadName);    
    try{
      // calling await so the current thread suspends
      cb.await();
        
    } catch (InterruptedException e) {
      System.out.println(e);
    } catch (BrokenBarrierException e) {
      System.out.println(e);
    }
  }
}

class AfterAction implements Runnable {
  @Override
  public void run() {
    System.out.println("In after action class, start further processing 
     as all files are read");
  }
}

Output

Start another set of threads 
Reading file file-1 thread thread-1
Reading file file-2 thread thread-2
Reading file file-3 thread thread-3
In after action class, start further processing as all files are read
Reading file file-4 thread thread-4
Reading file file-5 thread thread-5
Reading file file-6 thread thread-6
In after action class, start further processing as all files are read

Here it can be seen that specified runnableAction class is called twice as the CyclicBarrier is reused here. Note that the thread order may be different while executing the code.

Points to note

  • A CyclicBarrier initialized to N, using its constructor, can be used to make N threads wait using await() and the barrier will be broken once all the N threads call await() method.
  • A barrierAction can also be provided while creating CyclicBarrier object. This barrierAction will be executed once the barrier is tripped. This barrier action is useful for updating shared-state before any of the parties continue.
  • If the current thread is not the last to arrive then it is paused after calling await() and lies dormant until the last thread arrives, current thread or some other waiting thread is interrupted by any other thread, specified timeout elapses (as provided in await()) or some thread calls reset() method.
  • reset() method resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a BrokenBarrierException.
  • CyclicBarrier in Java uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (orInterruptedException if they too were interrupted at about the same time).

That's all for this topic Java CyclicBarrier With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between CountDownLatch And CyclicBarrier in Java
  2. Java ArrayBlockingQueue With Examples
  3. Java Semaphore With Examples
  4. Java Exchanger With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. ConcurrentHashMap in Java With Examples
  2. Volatile Keyword in Java With Examples
  3. Just In Time Compiler (JIT) in Java
  4. Optional class in Java 8
  5. Method reference in Java 8
  6. finalize method in Java
  7. How to Resolve Local Variable Defined in an Enclosing Scope Must be Final or Effectively Final Error
  8. Angular One-Way Data Binding Using String Interpolation

Sunday, June 2, 2024

Java Phaser With Examples

Phaser in Java is one of the synchronization aid provided in concurrency util. Phaser is similar to other synchronization barrier utils like CountDownLatch and CyclicBarrier. What sets Phaser apart is it is reusable (like CyclicBarrier) and more flexible in usage. In both CountDownLatch and CyclicBarrier number of parties (thread) that are registered for waiting can't change where as in Phaser that number can vary. Also note that Phaser has been introduced in Java 7.

Phaser in Java is more suitable for use where it is required to synchronize threads over one or more phases of activity. Though Phaser can be used to synchronize a single phase, in that case it acts more like a CyclicBarrier. But it is more suited where threads should wait for a phase to finish, then advance to next phase, wait again for that phase to finish and so on.


Java Phaser constructors

Phaser class in Java has 4 constructors

  • Phaser()- Creates a new phaser with no initially registered parties, no parent, and initial phase number 0.
  • Phaser(int parties)- Creates a new phaser with the given number of registered unarrived parties, no parent, and initial phase number 0.
  • Phaser(Phaser parent)- Creates a new phaser with the given parent with no initially registered parties.
  • Phaser(Phaser parent, int parties)- Creates a new phaser with the given parent and number of registered unarrived parties.

How Phaser in Java works

First thing is to create a new instance of Phaser.

Next thing is to register one or more parties with the Phaser. That can be done using register(), bulkRegister(int) or by specifying number of parties in the constructor.

Now since Phaser is a synchronization barrier so we have to make phaser wait until all registered parties finish a phase. That waiting can be done using arrive() or any of the variants of arrive() method. When the number of arrivals is equal to the parties which are registered that phase is considered completed and it advances to next phase (if there is any), or terminate.

Note that each generation of a phaser has an associated phase number. The phase number starts at zero, and advances when all parties arrive at the phaser, wrapping around to zero after reaching Integer.MAX_VALUE.

Methods in Java Phaser class

Some of the methods in Phaser class are as given below-

  • resgister()- Adds a new unarrived party to this phaser. It returns the arrival phase number to which this registration applied.
  • arrive()- Arrives at this phaser, without waiting for others to arrive. Note that arrive() method does not suspend execution of the calling thread. Returns the arrival phase number, or a negative value if terminated. Note that this method should not be called by an unregistered party.
  • arriveAndDeregister()- Arrives at this phaser and deregisters from it without waiting for others to arrive. Returns the arrival phase number, or a negative value if terminated.
  • arriveAndAwaitAdvance()- This method awaits other threads to arrives at this phaser. Returns the arrival phase number, or the (negative) current phase if terminated. If you want to wait for all the other registered parties to complete a given phase then use this method.
  • bulkRegister(int parties)– Used to register perties in bulk. Given number of new unarrived parties will be registered to this phaser.
  • onAdvance(int phase, int registeredParties)– If you want to perform some action before the phase is advanced you can override this method. Also used to control termination.

Java Phaser Features

1. Phaser is more flexible- Unlike the case for other barriers, the number of parties registered to synchronize on a Phaser may vary over time. Tasks may be registered at any time (using methods register(), bulkRegister(int), or by specifying initial number of parties in the constructor). Tasks may also be optionally deregistered upon any arrival (using arriveAndDeregister()).

2. Phaser termination- A Phaser may enter a termination state, that may be checked using method isTerminated(). Upon termination, all synchronization methods immediately return without waiting for advance, as indicated by a negative return value. Similarly, attempts to register upon termination have no effect.

3. Phaser Tiering- Phasers in Java may be tiered (i.e., constructed in tree structures) to reduce contention. Phasers with large numbers of parties may experience heavy synchronization contention costs. These may be set up as a groups of sub-phasers which share a common parent. This may greatly increase throughput even though it incurs greater per-operation overhead.

Phaser Java example code

Let's try to make things clearer through an example. So we'll have two phases in the application. In the first phase we have three threads reading 3 different files, parsing and storing them in DB, then in second phase 2 threads are started to query the DB table on the inserted records. Let's assume that one of the field is age in the DB table and we want to query count of those having age greater than 40 using one thread and in another thread we want to get the count of those having age less than or equal to 40.

public class PhaserDemo {

 public static void main(String[] args) {
  Phaser ph = new Phaser(1);
  int curPhase;
  curPhase = ph.getPhase();
  System.out.println("Phase in Main " + curPhase + " started");
  // Threads for first phase
  new FileReaderThread("thread-1", "file-1", ph);
  new FileReaderThread("thread-2", "file-2", ph);
  new FileReaderThread("thread-3", "file-3", ph);
  //For main thread
  ph.arriveAndAwaitAdvance();
  System.out.println("New phase " + ph.getPhase() + " started");
  // Threads for second phase
  new QueryThread("thread-1", 40, ph);
  new QueryThread("thread-2", 40, ph);
  curPhase = ph.getPhase();
  ph.arriveAndAwaitAdvance();
  System.out.println("Phase " + curPhase + " completed");
  // deregistering the main thread
  ph.arriveAndDeregister();
 }
}

class FileReaderThread implements Runnable {
 private String threadName;
 private String fileName;
 private Phaser ph;

 FileReaderThread(String threadName, String fileName, Phaser ph){
  this.threadName = threadName;
  this.fileName = fileName;
  this.ph = ph;
  ph.register();
  new Thread(this).start();
 }
 @Override
 public void run() {
  System.out.println("This is phase " + ph.getPhase());
  
  try {
   Thread.sleep(20);
   System.out.println("Reading file " + fileName + " thread " 
                           + threadName + " parsing and storing to DB ");
   // Using await and advance so that all thread wait here
   ph.arriveAndAwaitAdvance();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  ph.arriveAndDeregister();
 }
}

class QueryThread implements Runnable {
 private String threadName;
 private int param;
 private Phaser ph;
 
 QueryThread(String threadName, int param, Phaser ph){
  this.threadName = threadName;
  this.param = param;
  this.ph = ph;
  ph.register();
  new Thread(this).start();
 }
 
 @Override
 public void run() {
  
  System.out.println("This is phase " + ph.getPhase());
  System.out.println("Querying DB using param " + param 
                  + " Thread " + threadName);
  ph.arriveAndAwaitAdvance();
  System.out.println("Threads finished");
  ph.arriveAndDeregister();
 }
}

Output

Phase in Main 0 started
This is phase 0
This is phase 0
This is phase 0
Reading file file-1 thread thread-1 parsing and storing to DB 
Reading file file-2 thread thread-2 parsing and storing to DB 
Reading file file-3 thread thread-3 parsing and storing to DB 
New phase 1 started
This is phase 1
Querying DB using param 40 Thread thread-1
This is phase 1
Querying DB using param 40 Thread thread-2
Threads finished
Threads finished
Phase 1 completed

Here it can be seen that first a Phaser instance ph is created with initial party count as 1, which corresponds to the main thread.

Then in the first set of 3 threads which are used in the first phase ph object is also passed which is used for synchronization. As you can see in the run method of the FileReaderThread class arriveAndAwaitAdvance() method is used so that the threads wait there for other threads. We have registered 3 more threads after the initial main thread so arriveAndAwaitAdvance() is used in the main method too to make the main thread wait before advancing.

In the second phase another set of two threads are created which are using the same phaser object ph for synchronization.

Logic for reading the file, parsing the file and storing it in the DB is not given here. Also the queries used in the second thread are not given. The scenario used here is to explain Phaser so that's where the concentration is.

Phaser Monitoring

Phaser class in Java has several methods for monitoring. These methods can be called by any caller not only by registered parties.

  • getRegisteredParties()- Returns the number of parties registered at this phaser.
  • getArrivedParties()- Returns the number of registered parties that have arrived at the current phase of this phaser.
  • getUnarrivedParties()- Returns the number of registered parties that have not yet arrived at the current phase of this phaser.
  • getPhase()- Returns the current phase number.

Overriding onAdvance() method in Phaser

If you want to perform an action before advancing from one phase to another, it can be done by overriding the onAdvance() method of the Phaser class. This method is invoked when the Phaser advances from one phase to another.
If this method returns true, this phaser will be set to a final termination state upon advance, and subsequent calls to isTerminated() will return true.
If this method returns false, phaser will be kept alive.

onAdvance() method

protected boolean onAdvance(int phase, int registeredParties)
Here
  • phase- current phase number on entry to this method, before this phaser is advanced.
  • registeredParties- the current number of registered parties.

One of the use case to override onAdvance() method is to ensure that your phaser executes a given number of phases and then stop.

So we'll create a class called PhaserAdvance that will extend Phaser and override the onAdvance() method to ensure that specified number of phases are executed.

Overriding onAdvance() method example

public class PhaserAdvance extends Phaser{
  PhaserAdvance(int parties){
    super(parties);
  }
    
  // Overriding the onAdvance method
  @Override
  protected boolean onAdvance(int phase, int registeredParties) {
    System.out.println("In onAdvance method, current phase which is completed 
      is " + phase );
    // Want to ensure that phaser runs for 2 phases i.e. phase 1 
    // or the no. of registered parties become zero
    if(phase == 1 || registeredParties == 0){
      System.out.println("phaser will be terminated ");
      return true;
    }else{
      System.out.println("phaser will continue ");
      return false;
    }     
  }
    
  public static void main(String... args) {
    // crating phaser instance
    PhaserAdvance ph = new PhaserAdvance(1);
    // creating three threads
    new TestThread("thread-1", ph);
    new TestThread("thread-2", ph);
    new TestThread("thread-3", ph);
    
    while(!ph.isTerminated()){
      ph.arriveAndAwaitAdvance();
    }
    System.out.println("In main method, phaser is terminated");
  }
}

class TestThread implements Runnable {
  private String threadName;
  private Phaser ph;

  TestThread(String threadName, Phaser ph){
    this.threadName = threadName;
    this.ph = ph;
    // register new unarrived party to this phaser
    ph.register();
    new Thread(this).start();
  }
  @Override
  public void run() {
    // be in the loop till the phaser is terminated
    while(!ph.isTerminated()){
      System.out.println("This is phase " + ph.getPhase() + 
        " And Thread - "+ threadName);
      // Using await and advance so that all thread wait here
      ph.arriveAndAwaitAdvance();
    }      
  }
}

Output

This is phase 0 And Thread - thread-1
This is phase 0 And Thread - thread-2
This is phase 0 And Thread - thread-3
In onAdvance method, current phase which is completed is 0
phaser will continue 
This is phase 1 And Thread - thread-3
This is phase 1 And Thread - thread-2
This is phase 1 And Thread - thread-1
In onAdvance method, current phase which is completed is 1
phaser will be terminated 
In main method, phaser is terminated

Here it can be seen that a new class PhaserAdvance is created extending the Phaser class. This PhaserAdvance class overrides the onAdvance() method of the Phaser class. In the overridden onAdvance() method it is ensured that 2 phases are executed thus the if condition with phase == 1 (phase count starts from 0).

That's all for this topic Java Phaser With Examples. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Difference Between CountDownLatch And CyclicBarrier in Java
  2. Java Exchanger With Examples
  3. ConcurrentHashMap in Java With Examples
  4. Java PriorityBlockingQueue With Examples
  5. Java Concurrency Interview Questions And Answers

You may also like-

  1. Difference Between ReentrantLock and Synchronized in Java
  2. Callable and Future in Java With Examples
  3. Why wait(), notify() And notifyAll() Methods Are in Object Class And Not in Thread Class
  4. Java ThreadLocal Class With Examples
  5. How to Sort Elements in Different Order in TreeSet
  6. How ArrayList Works Internally in Java
  7. Global Keyword in Python With Examples
  8. Spring Transaction Management Example - @Transactional Annotation and JDBC

Wednesday, March 10, 2021

Difference Between CountDownLatch And CyclicBarrier in Java

Though both CountDownLatch and CyclicBarrier are used as a synchronization aid that allows one or more threads to wait but there are certain differences between them. Knowing those differences between CountDownLatch and CyclicBarrier in Java will help you to decide when one of these utilities will serve you better and of course it is a good java interview question too.

CountDownLatch Vs CyclicBarrier in Java

  1. One of the most important difference is When you are using a CountDownLatch, you specify the number of calls to the countdown() method when creating a CountDownLatch object. So a CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
    What this means is you can use CountDownLatch with only a single thread and using countdown() to decrement as and when the specified event occur.

    When you are using CyclicBarrier in Java you specify the number of threads that should call await() method in order to trip the barrier. That means if you have a CyclicBarrier initialized to 3 that means you should have at least 3 threads to call await().