Skip to main content

循环栅栓CyclicBarrier

huhxAbout 3 minjavaConcurrency-ToolkitConcurrency

CyclicBarrier是另外一种多线程并发控制实用工具。和CountDownLatch非常类似,它也可以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大。

CyclicBarrier的使用

CyclicBarrier适用于这样的情况:你希望创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成。类似于电商中的拼团、拼购,先准备购买的人必须等待,直到要购买的人数达到一定值时才开团。

cyclic barrier
cyclic barrier

只有4个线程都到达栅栏时,所有的线程才会被放行。先到的线程得等待后来的线程,直到最后一个线程到达。

CyclicBarrier类有两个构造方法:

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
  • parties:必须在栅栏处凑齐parties数目的线程后,才会将这些线程放行
  • barrierAction: 凑齐parties数目的最后一个线程会在放行前运行一次barrierAction操作

CyclicBarrier中主要的方法:

public int await() throws InterruptedException, BrokenBarrierException

public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

public int getNumberWaiting()
  • await: 当线程到达栅栏时,需要调用该函数告知自己已经到达栅栏并开始等待。它的返回值是线程的到达顺序,最先到达的返回值parties - 1,最后到达的返回0。一旦返回0,则表示栅栏要放行了。
  • getNumberWaiting: 返回当前在栅栏处等待的线程数量,主要用于调试和断言。

下面我们举个例子来说明:只有运动员在跑道上面都准备好之后,才能起跑

public class CyclicBarrierTest {
    final static CyclicBarrier barrier = new CyclicBarrier(3, () -> {
        System.out.println(Thread.currentThread().getName() + ": 大家都好了! "  + LocalDateTime.now());
    });

    public static void main(String[] args) throws IOException, InterruptedException {
        var executor = Executors.newFixedThreadPool(3);
        executor.submit(new Thread(new Runner("1号选手")));
        executor.submit(new Thread(new Runner("2号选手")));
        executor.submit(new Thread(new Runner("3号选手")));

        executor.shutdown();
    }

    public static class Runner implements Runnable {
        private final String name;

        public Runner(String name) {
            super();
            this.name = name;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(new Random().nextInt(10000));
                System.out.println(Thread.currentThread().getName() + ": " + name + " 准备好了! " + LocalDateTime.now());
                System.out.println(Thread.currentThread().getName() + ": " + "还有" + barrier.await() + "位没到达!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ": " + name + " 起跑! " + LocalDateTime.now());
        }
    }
}

 

























 









输出结果:

pool-1-thread-2: 2号选手 准备好了! 2023-08-16T17:55:27.020934
pool-1-thread-3: 3号选手 准备好了! 2023-08-16T17:55:29.598635
pool-1-thread-1: 1号选手 准备好了! 2023-08-16T17:55:31.402894
pool-1-thread-1: 大家都好了! 2023-08-16T17:55:31.403166
pool-1-thread-1: 还有0位没到达!
pool-1-thread-3: 还有1位没到达!
pool-1-thread-2: 还有2位没到达!
pool-1-thread-1: 1号选手 起跑! 2023-08-16T17:55:31.414811
pool-1-thread-3: 3号选手 起跑! 2023-08-16T17:55:31.414909
pool-1-thread-2: 2号选手 起跑! 2023-08-16T17:55:31.414989

上述例子模拟3名运动员(3个线程),他们都需要花些时间热身做准备。当他们准备好后,会在赛道上面等待barrier.await()其他运动员,裁判员会统计赛道(栅栏)上面等待的人数。在裁判员统计到在场的所有3位运动员还未到齐之前,已经在赛道前的运动员都还只能在等待(线程状态WAITING)。

最后一个运动员准备好之后,会执行CyclicBarrier第二个参数的逻辑(如果为null,就不用执行)。然后裁判员知道所有的运动员到齐之后,他一声枪响(线程放行),运动员们便不再等待开始起跑(线程状态RUNNABLE)。

从上述打印结果来看,3位运动员准备就绪的顺序是: 231,而起跑的顺序是: 132

分析

FAQ

Guava里面的RateLimiter?

总结

参考