循环栅栓CyclicBarrier
About 3 min
CyclicBarrier
是另外一种多线程并发控制实用工具。和CountDownLatch
非常类似,它也可以实现线程间的计数等待,但它的功能比CountDownLatch
更加复杂且强大。
CyclicBarrier的使用
CyclicBarrier
适用于这样的情况:你希望创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成。类似于电商中的拼团、拼购,先准备购买的人必须等待,直到要购买的人数达到一定值时才开团。
只有4
个线程都到达栅栏时,所有的线程才会被放行。先到的线程得等待后来的线程,直到最后一个线程到达。
CyclicBarrier
类有两个构造方法:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
parties
:必须在栅栏处凑齐parties
数目的线程后,才会将这些线程放行barrierAction
: 凑齐parties
数目的最后一个线程会在放行前运行一次barrierAction
操作
CyclicBarrier
中主要的方法:
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
public int getNumberWaiting()
await
: 当线程到达栅栏时,需要调用该函数告知自己已经到达栅栏并开始等待。它的返回值是线程的到达顺序,最先到达的返回值parties - 1
,最后到达的返回0。一旦返回0,则表示栅栏要放行了。getNumberWaiting
: 返回当前在栅栏处等待的线程数量,主要用于调试和断言。
下面我们举个例子来说明:只有运动员在跑道上面都准备好之后,才能起跑
public class CyclicBarrierTest {
final static CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println(Thread.currentThread().getName() + ": 大家都好了! " + LocalDateTime.now());
});
public static void main(String[] args) throws IOException, InterruptedException {
var executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner("1号选手")));
executor.submit(new Thread(new Runner("2号选手")));
executor.submit(new Thread(new Runner("3号选手")));
executor.shutdown();
}
public static class Runner implements Runnable {
private final String name;
public Runner(String name) {
super();
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10000));
System.out.println(Thread.currentThread().getName() + ": " + name + " 准备好了! " + LocalDateTime.now());
System.out.println(Thread.currentThread().getName() + ": " + "还有" + barrier.await() + "位没到达!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ": " + name + " 起跑! " + LocalDateTime.now());
}
}
}
输出结果:
pool-1-thread-2: 2号选手 准备好了! 2023-08-16T17:55:27.020934
pool-1-thread-3: 3号选手 准备好了! 2023-08-16T17:55:29.598635
pool-1-thread-1: 1号选手 准备好了! 2023-08-16T17:55:31.402894
pool-1-thread-1: 大家都好了! 2023-08-16T17:55:31.403166
pool-1-thread-1: 还有0位没到达!
pool-1-thread-3: 还有1位没到达!
pool-1-thread-2: 还有2位没到达!
pool-1-thread-1: 1号选手 起跑! 2023-08-16T17:55:31.414811
pool-1-thread-3: 3号选手 起跑! 2023-08-16T17:55:31.414909
pool-1-thread-2: 2号选手 起跑! 2023-08-16T17:55:31.414989
上述例子模拟3
名运动员(3
个线程),他们都需要花些时间热身做准备。当他们准备好后,会在赛道上面等待barrier.await()
其他运动员,裁判员会统计赛道(栅栏)上面等待的人数。在裁判员统计到在场的所有3
位运动员还未到齐之前,已经在赛道前的运动员都还只能在等待(线程状态WAITING
)。
最后一个运动员准备好之后,会执行CyclicBarrier
第二个参数的逻辑(如果为null
,就不用执行)。然后裁判员知道所有的运动员到齐之后,他一声枪响(线程放行),运动员们便不再等待开始起跑(线程状态RUNNABLE
)。
从上述打印结果来看,3
位运动员准备就绪的顺序是: 231
,而起跑的顺序是: 132
。