Thursday, April 2, 2020

CyclicBarrier : Java Concurrency

CyclicBarrier 


How many of you are aware about CyclicBarrier ? Almost all Java developers. Am i right ? When my boss asked me the same, i was thinking is it related to any cycle path of national highway. Oh, later on i found that it is a concept if Java Concurrency.In this article, we will dig it more.


Introduction

It is a synchronization aid(Synchronizers) that allows a set of threads to all wait for each other to reach a common barrier point. 

CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

It is a part of the java.util.concurrent package which almost contains more than 57 numbers of classes.

CyclicBarrier is similar to CountDownLatch in Java(of course there are some differences) and allows multiple threads to wait for each other (barrier) before proceeding.




CyclicBarrier Usage

The CyclicBarrier supports a barrier action, which is a Runnable that is executed once the last thread arrives. 

There are two types of constructor available. 

public CyclicBarrier(int parties)

Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and does not perform a predefined action when the barrier is tripped.

Optionally, we can pass the second argument to the constructor, which is a Runnable instance. This has logic that would be run by the last thread that trips the barrier:

public CyclicBarrier(int parties, Runnable barrierAction

Example :


CyclicBarrier: Consider the same IT world scenario where manager divided modules between development teams (A and B). He goes on leave and asked both team to wait for each other to complete their respective task once both are done assign it to QA team for testing.
Here manager thread works as main thread and development team works as worker thread. Development team threads wait for other development team threads after completing their task.

Implementation

Let's consider the following scenario. 
Suppose we have two different services performs some operations and store the corresponding results in a list which should wait for each other to complete the execution. When both the services finish performing their action, the main thread starts procession the data and gives the sum of all numbers. 
public class CyclicBarrierDemo {
private CyclicBarrier cyclicBarrier;
    private List<List<Integer>> partialResults= Collections.synchronizedList(new ArrayList<>());
    private Random random = new Random();
    
    public void runSimulation() {
        cyclicBarrier = new CyclicBarrier(2, new AggregatorThread());
        Thread worker = new Thread(new ServiceOneThread());
        worker.start();
        
        Thread worker2 = new Thread(new ServiceTwoThread());
        worker2.start();
    }
    public static void main(String[] args) {
    CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.runSimulation();
    }
    
    
    
    class AggregatorThread implements Runnable {
     
        @Override
        public void run() {
            String thisThreadName = Thread.currentThread().getName();
            System.out.println(thisThreadName + ": Computing sum of 2 service" );
            int sum = 0;
            System.out.println(partialResults);
            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult+" ");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(thisThreadName + ": Final result = " + sum);
        }
    }
    
    class ServiceOneThread implements Runnable{
    @Override
    public void run() {
    
    System.out.println("Service-1 run method");
    List<Integer> partialResult = new ArrayList<>();
    for (int i = 0; i < 10; i++) {    
    Integer num = random.nextInt(10);
    partialResult.add(num);
    }
    System.out.println("Service-1 partial results :" + partialResult);
    partialResults.add(partialResult);
    try {
    System.out.println("Service-1 is moving to sleeping state...");
    Thread.sleep(100000);
    System.out.println("Service-1 is comes up from sleeping state...");
    System.out.println("Service-1  waiting for others to reach barrier.");
    cyclicBarrier.await();
    
    } catch (InterruptedException e) {
    } catch (BrokenBarrierException e) {
    }
    }
    }
    
    
    class ServiceTwoThread implements Runnable{
    @Override
    public void run() {
    System.out.println("Service-2 run method");
    List<Integer> partialResult = new ArrayList<>();
    for (int i = 0; i < 10; i++) {    
    Integer num = random.nextInt(5);
    partialResult.add(num);
    }
    partialResults.add(partialResult);
    try {
    System.out.println("Service-2 numbers :" + partialResult);
    System.out.println("Service-2  waiting for others to reach barrier.");
    cyclicBarrier.await();
    } catch (InterruptedException e) {
    } catch (BrokenBarrierException e) {
    }
    }
    }
    
}



Output :
Service-1 run method
Service-1 partial results :[8, 0, 8, 3, 9, 2, 6, 1, 4, 2]
Service-1 is moving to sleeping state...   //see here, service-1 moved to sleep
Service-2 run method
Service-2 numbers :[2, 0, 0, 2, 3, 0, 0, 4, 2, 2]
Service-2  waiting for others to reach barrier.
Service-1 is comes up from sleeping state...
Service-1  waiting for others to reach barrier.
Thread-0: Computing sum of 2 service //main thread
[[8, 0, 8, 3, 9, 2, 6, 1, 4, 2], [2, 0, 0, 2, 3, 0, 0, 4, 2, 2]]
Adding 8 0 8 3 9 2 6 1 4 2 
Adding 2 0 0 2 3 0 0 4 2 2 
Thread-0: Final result = 58

No comments:

Post a Comment