优秀的编程知识分享平台

网站首页 > 技术文章 正文

Java多线程实战|CyclicBarrier原理介绍及使用场景

nanyue 2025-01-29 17:17:38 技术文章 6 ℃

前言:

今天我们讲一下java.util.concurrent工具类里的另一个工具CyclicBarrier正如其名,“循环栅栏”,是Java提供的一种特定场景下的多线程之间进行交互的使用方法;

CyclicBarrier:

举个例子,比如小明,小美,小华,小丽几人终于历经多年课本出题历程,高考结束,相约一起聚餐,然而他们每个人到达约会地点的耗时都一样,有的人会早到,有的人会晚到,但是他们要都到了以后才可以决定点那些菜。

这个时候我们就可以使用JUC包中为我们提供了一个同步工具类来模拟这类场景,CyclicBarrier,利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。这里没人人相当于一个线程,而餐厅就是 CyclicBarrier。

介绍:CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 和 CountDownLatch 很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。

网上找了一个很形象的动图如下:

CyclicBarrier它有两个构造函数:

public?CyclicBarrier(int?parties)?{
????this(parties,?null);
}

public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
????if?(parties?<=?0)?throw?new?IllegalArgumentException();
????this.parties?=?parties;
????this.count?=?parties;
????this.barrierCommand?=?barrierAction;
}
  • parties 是参与线程的个数,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
  • 第二个构造方法有一个 Runnable 参数,这个参数的意思是,线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

await()方法:

//非定时等待
public?int?await()?throws?InterruptedException,?BrokenBarrierException?{
??try?{
????return?dowait(false,?0L);
??}?catch?(TimeoutException?toe)?{
????throw?new?Error(toe);
??}
}

//定时等待
public?int?await(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException?{
??return?dowait(true,?unit.toNanos(timeout));
}
  • 线程调用 await() 表示自己已经到达栅栏
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时

可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法

dowait:

private?int?dowait(boolean?timed,?long?nanos)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException?{
??final?ReentrantLock?lock?=?this.lock;
??lock.lock();
??try?{
????final?Generation?g?=?generation;
????//检查当前栅栏是否被打翻
????if?(g.broken)?{
??????throw?new?BrokenBarrierException();
????}
????//检查当前线程是否被中断
????if?(Thread.interrupted())?{
??????//如果当前线程被中断会做以下三件事
??????//1.打翻当前栅栏
??????//2.唤醒拦截的所有线程
??????//3.抛出中断异常
??????breakBarrier();
??????throw?new?InterruptedException();
????}
????//每次都将计数器的值减1
????int?index?=?--count;
????//计数器的值减为0则需唤醒所有线程并转换到下一代
????if?(index?==?0)?{
??????boolean?ranAction?=?false;
??????try?{
????????//唤醒所有线程前先执行指定的任务
????????final?Runnable?command?=?barrierCommand;
????????if?(command?!=?null)?{
??????????command.run();
????????}
????????ranAction?=?true;
????????//唤醒所有线程并转到下一代
????????nextGeneration();
????????return?0;
??????}?finally?{
????????//确保在任务未成功执行时能将所有线程唤醒
????????if?(!ranAction)?{
??????????breakBarrier();
????????}
??????}
????}

????//如果计数器不为0则执行此循环
????for?(;;)?{
??????try?{
????????//根据传入的参数来决定是定时等待还是非定时等待
????????if?(!timed)?{
??????????trip.await();
????????}else?if?(nanos?>?0L)?{
??????????nanos?=?trip.awaitNanos(nanos);
????????}
??????}?catch?(InterruptedException?ie)?{
????????//若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
????????if?(g?==?generation?&&?!?g.broken)?{
??????????breakBarrier();
??????????throw?ie;
????????}?else?{
??????????//若在捕获中断异常前已经完成在栅栏上的等待,?则直接调用中断操作
??????????Thread.currentThread().interrupt();
????????}
??????}
??????//如果线程因为打翻栅栏操作而被唤醒则抛出异常
??????if?(g.broken)?{
????????throw?new?BrokenBarrierException();
??????}
??????//如果线程因为换代操作而被唤醒则返回计数器的值
??????if?(g?!=?generation)?{
????????return?index;
??????}
??????//如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
??????if?(timed?&&?nanos?<=?0L)?{
????????breakBarrier();
????????throw?new?TimeoutException();
??????}
????}
??}?finally?{
????lock.unlock();
??}
}

代码示例:

/**
?*?TODO
?*
?*?@author?taoze
?*?@version?1.0
?*?@date?6/24/21?3:16?PM
?*/
public?class?CyclicBarrierTest?{

????public?static?void?main(String[]?args)?{
????????ExecutorService?service?=?Executors.newCachedThreadPool();
????????CyclicBarrier?barrier?=?new?CyclicBarrier(5,?new?Runnable()?{
????????????@Override
????????????public?void?run()?{
????????????????System.out.println("全部到达"+Thread.currentThread().getName()+"呼叫服务员开始点餐!");
????????????????service.shutdown();

????????????}
????????});
????????for?(int?j?=?0;?j?

执行结果:

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

来源:
https://juejin.cn/post/6977549754217529358

Tags:

最近发表
标签列表