CyclicBarrier 是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的 barrier。
注意比较CountDownLatch 和CyclicBarrier:
(01) CountDownLatch 的作用是允许1或N个线程等待其他线程完成执行;而 CyclicBarrier 则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
CyclicBarrier函数列表
1 | CyclicBarrier(int parties) |
CyclicBarrier数据结构
CyclicBarrier 是包含了 “ ReentrantLock 对象 lock “和” Condition 对象 trip “,它是通过独占锁实现的。下面通过源码去分析到底是如何实现的。
1. 构造函数
CyclicBarrier 的构造函数共2个:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1个构造函数是调用第2个构造函数来实现的,下面第2个构造函数的源码
1 | public CyclicBarrier(int parties, Runnable barrierAction) { |
2. 等待函数
1 | public int await() throws InterruptedException, BrokenBarrierException { |
1 | private int dowait(boolean timed, long nanos) |
说明:dowait() 的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。
(01) generation 是 CyclicBarrier 的一个成员变量,在 CyclicBarrier 中,同一批的线程属于同一代,即同一个Generation;CyclicBarrier 中通过 generation 对象,记录属于哪一代。当有parties个线程到达 barrier,generation 就会被更新换代。
它的定义如下:
1 | private Generation generation = new Generation(); |
(02) 如果当前线程被中断,即 Thread.interrupted() 为true;则通过 breakBarrier() 终止 CyclicBarrier。breakBarrier() 会设置当前中断标记 broken 为 true,意味着“将该 Generation 中断”;同时,设置 count=parties ,即重新初始化 count;最后,通过 signalAll() 唤醒 CyclicBarrier 上所有的等待线程。 breakBarrier() 的源码如下:
1 | private void breakBarrier() { |
(03) 将“ count 计数器”-1,即 –count ;然后判断是不是“有 parties 个线程到达 barrier ”,即 index 是不是为0。首先,它会调用 signalAll() 唤醒 CyclicBarrier 上所有的等待线程;接着,重新初始化 count;最后,更新 generation 的值。
当 index =0时,如果 barrierCommand 不为null,则执行该 barrierCommand , barrierCommand 就是我们创建 CyclicBarrier 时,传入的 Runnable 对象。然后,调用 nextGeneration() 进行换代工作, nextGeneration() 的源码如下:
1 | private void nextGeneration() { |
04) 在 for(;;) 循环中。 timed 是用来表示当前是不是“超时等待”线程。如果不是,则通过 trip.await() 进行等待;否则,调用 awaitNanos() 进行超时等待。
CyclicBarrier 的使用示例
1 | import java.util.concurrent.CyclicBarrier; |
1 | Thread-1 wait for CyclicBarrier. |
结果说明:主线程中新建了5个线程,所有的这些线程都调用 cb.await() 等待。所有这些线程一直等待,直到 cb 中所有线程都达到 barrie r时,这些线程才继续运行!
示例2
新建5个线程,当这5个线程达到一定的条件时,执行某项任务。
1 | import java.util.concurrent.CyclicBarrier; |
1 | Thread-1 wait for CyclicBarrier. |
结果说明:主线程中新建了5个线程,所有的这些线程都调用 cb.await() 等待。所有这些线程一直等待,直到cb中所有线程都达到barrier时,执行新建cb时注册的Runnable任务。