【Java】--CyclicBarrier的介绍及应用

CyclicBarrier

简介

CyclicBarrier是java.util.concurrent包下的一个类,CyclicBarrier的字面意思是 可循环(Cyclic) 使用的 屏障(Barrier)

CyclicBarrier能让一组线程到达一个屏障(也可叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。

CyclicBarrier会等齐其他线程在继续进行,即CyclicBarrier会每次加加直到某个数字,正好与CountDownLatch相反(CountDownLatch会每次减减直到0)

方法说明

构造函数

/**
*参数表示屏障拦截的线程数量,
*每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
**/
public CyclicBarrier(int parties) {
    this(parties, null);
}

/**
*用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
**/
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

await

调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 不超时等待
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

这两个方法最终都会调用dowait(boolean, long)方法,它也是CyclicBarrier的核心方法,该方法定义如下:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        // 如果这代损坏了,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
 
        // 如果线程中断了,抛出异常
        if (Thread.interrupted()) {
            // 将损坏状态设置为true
            // 并通知其他阻塞在此栅栏上的线程
            breakBarrier();
            throw new InterruptedException();
        }
 
        // 获取下标
        int index = --count;
        // 如果是 0,说明最后一个线程调用了该方法
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 执行栅栏任务
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,将count重置,将generation重置
                // 唤醒之前等待的线程
                nextGeneration();
                return 0;
            } finally {
                // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
                if (!ranAction)
                    breakBarrier();
            }
        }
 
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                 // 如果没有时间限制,则直接等待,直到被唤醒
                if (!timed)
                    trip.await();
                // 如果有时间限制,则等待指定时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 当前代没有损坏
                if (g == generation && ! g.broken) {
                    // 让栅栏失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面条件不满足,说明这个线程不是这代的
                    // 就不会影响当前这代栅栏的执行,所以,就打个中断标记
                    Thread.currentThread().interrupt();
                }
            }
 
            // 当有任何一个线程中断了,就会调用breakBarrier方法
            // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
 
            // g != generation表示正常换代了,返回当前线程所在栅栏的下标
            // 如果 g == generation,说明还没有换代,那为什么会醒了?
            // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
            // 正是因为这个原因,才需要generation来保证正确。
            if (g != generation)
                return index;
            
            // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放独占锁
        lock.unlock();
    }
}

dowait(boolean, long)方法的主要逻辑处理比较简单,如果该线程不是最后一个调用await方法的线程,则它会一直处于等待状态,除非发生以下情况:

  • 最后一个线程到达,即index == 0
  • 某个参与线程等待超时
  • 某个参与线程被中断
  • 调用了CyclicBarrier的reset()方法。该方法会将屏障重置为初始状态

在上面的源代码中,我们可能需要注意Generation 对象,在上述代码中我们总是可以看到抛出BrokenBarrierException异常,那么什么时候抛出异常呢?如果一个线程处于等待状态时,如果其他线程调用reset(),或者调用的barrier原本就是被损坏的,则抛出BrokenBarrierException异常。同时,任何线程在等待时被中断了,则其他所有线程都将抛出BrokenBarrierException异常,并将barrier置于损坏状态。

同时,Generation描述着CyclicBarrier的更新换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier之后,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。

private static class Generation {
    boolean broken = false;
}

默认barrier是没有损坏的。当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程:

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

当所有线程都已经到达barrier处(index == 0),则会通过nextGeneration()进行更新换地操作,在这个步骤中,做了三件事:唤醒所有线程,重置count,generation:

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

除了上面讲到的栅栏更新换代以及损坏状态,我们在使用CyclicBarrier时还要要注意以下几点:

  • CyclicBarrier使用独占锁来执行await方法,并发性可能不是很高
  • 如果在等待过程中,线程被中断了,就抛出异常。但如果中断的线程所对应的CyclicBarrier不是这代的,比如,在最后一次线程执行signalAll后,并且更新了这个“代”对象。在这个区间,这个线程被中断了,那么,JDK认为任务已经完成了,就不必在乎中断了,只需要打个标记。该部分源码已在dowait(boolean, long)方法中进行了注释。
  • 如果线程被其他的CyclicBarrier唤醒了,那么g肯定等于generation,这个事件就不能return了,而是继续循环阻塞。反之,如果是当前CyclicBarrier唤醒的,就返回线程在CyclicBarrier的下标。完成了一次冲过栅栏的过程。该部分源码已在dowait(boolean, long)方法中进行了注释。

应用案例

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int num = 7;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
            System.out.println("*****开始召唤神龙******");
        });

        for (int i = 1; i <= num; i++) {
            final int temp = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "\t收集到第" + temp + "龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i) ).start();
        }
    }
}

运行结果:
在这里插入图片描述

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断
  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置

参考文章:
Java并发编程之CyclicBarrier详解

热门文章

暂无图片
编程学习 ·

省市区三级行政区数据获取和GeoJson地图下载

文章目录1.背景2.行政区域数据获取3.获取GeoJson数据 1.背景 项目中用到省市区三级的行政区划的选择,在网上找到的数据与最新的行政区域划分不一致,也难以确认数据的完成性。 基于echarts完成数据地区分布图时,需要提供地区对应的geoJson格式地图。 2.行政区域数据获取 高德…
暂无图片
编程学习 ·

Linux下用ls和du命令查看文件以及文件夹大小

ls的用法 ls -l |grep “^-”|wc -l或find ./company -type f | wc -l 查看某文件夹下文件的个数,包括子文件夹里的。 ls -lR|grep “^-”|wc -l 查看某文件夹下文件夹的个数,包括子文件夹里的。 ls -lR|grep “^d”|wc -l 说明:ls -l 长列表输出该目录下文件信息(注意这里…
暂无图片
编程学习 ·

Java ssm框架搭建实现登录

Java ssm框架搭建实现登录前言准备新建项目配置TomcatJava文件和resources文件Javaresources建立Package配置文件applicationContext.xmldb.propertiesspring-mvc.xmlUserMapper.xmlcc0701UserUserControllerUserDaoUserServiceUserServiceImplWebfailure.jspok.jspindex.jspw…
暂无图片
编程学习 ·

客户端渲染与服务端渲染

本人是前端小白菜,最近在苦学前端,做点自己的学习小总结。欢迎各位大佬纠错。 模版引擎原来一开始是后端使用的,后来才慢慢支持前端,听起来很高大上的模版引擎,什么页面渲染,我不喜欢这么专业的难懂的叫法,所以我要自己亲自总结一下。 服务端渲染模版引擎不关心内容,只…
暂无图片
编程学习 ·

flex布局,左右两端固定,中间自适应且超出隐藏

这几个月来,在样式上备受打击,感觉自己css都不会写。有需求要flex布局,左右两端固定,中间自适应且超出隐藏,我百度加自己整理下,记录下来。 这个是flex布局,左右两端固定,中间自适应 //html代码 <div class="parent"><div class="left"&g…
暂无图片
编程学习 ·

驾考知识自查

驾考知识自查1事故的种类没有车辆追尾路段,只有事故多发路段2驾驶证 行驶证的换领驾驶证为核发地 行驶证为登记地3山坡挂挡问题p=fv 机动车功率一定 抵挡获得更大的牵引力,但不可以松开加速踏板,松开会导致遛坡4违规处罚问题饮酒后驾驶机动车的,处暂扣六个月机动车驾驶证,…
暂无图片
编程学习 ·

7月1日 Day1

7月2日中午补 111 制定了最近的训练计划,大致方向 222 7月1日晚 CodeforcesRound#654(Div.2) 赛时水过了四题, EF待补 思路没有问题,代码需要重新写下. 补题:
暂无图片
编程学习 ·

linux下载源码并编译x264,x265并引用进FFmpeg

x264源码下载地址:https://www.videolan.org/developers/x264.html x265源码下载地址:https://www.videolan.org/developers/x265.html FFmpeg官网源码下载地址:http://ffmpeg.org/download.html FFmpeg默认支持H264的解码,但是并不支持H264的编码,如果想要让FFmpeg支持H2…
暂无图片
编程学习 ·

利用python爬虫爬取斗鱼图片(简单详细)

关于 在一个安静的夜晚,我缓慢的打开了电脑,望着已经睡着的父母,我轻轻的把门关上,看着斗鱼颜值主播的魅力,我不尽感叹,要是每天都可以不需要那么麻烦的去看那该有多好! 于是我想起了最近刚学的爬虫,嘴角露出了迷之微笑。 开始 我原本以为我这样的菜鸟,如果想爬的话应该…
暂无图片
编程学习 ·

skywalking分布式追踪系统 + mysql

软件版本apache-skywalking-apm-8.0.1.tar.gz mysql-connector-java-8.0.16.jar mysql-8.0.x JDK-1.8安装启动脚本, /bin 目录. 包含linux启动和Windows启动服务和UI的脚本. 配置文件, /config 目录. 包含以下配置文件.application.yml log4j.xml alarm-settings.yml 引用Jar, …
暂无图片
编程学习 ·

【华为云技术分享】玩转华为物联网IoTDA服务系列六-恒温空调

摘要:本文主要讲述空调接入到物联网平台后,通过恒温空调控制系统,不论空调是否开机,都可以调整空调默认温度,待空调上电开机后,自动按默认温度调节。场景简介通过恒温控制系统,不论空调是否开机,都可以调整空调默认温度,待空调上电开机后,自动按默认温度调节。该场景…
暂无图片
编程学习 ·

精通以太坊-10~14章-思维导图

《精通以太坊》 第十章~第十四章 代币 预言机 DApp EVM 共识 学习笔记 思维导图附:文本结构 精通以太坊-10~14章代币使用方式数字货币通过私下交易的方式确定它的价值资源来自一个共享经济体或资源分享环境所产出或获取的资源资产链上或链下,有形或无形的资产访问权限针对物…
暂无图片
编程学习 ·

02-线性结构2 一元多项式的乘法与加法运算

#include <stdio.h> #include <stdlib.h>typedef struct PolyNode *Polynomial; //自定义类型名,多项式结构体指针PolynomialPolynomial ReadPoly(); void Attach(int c, int e, Polynomial *pRear); Polynomial Add(Polynomial P1, Polynomial P2); Polynomial …
暂无图片
编程学习 ·

1017: 判断正整数位数 ZZULIOJ

1017: 判断正整数位数 题目描述 给定一个不多于5位的正整数,判断它是几位数,并输出。 输入 一个不多于5位的正整数。 输出 输出正整数的位数,单独占一行。 样例输入 Copy 111 样例输出 Copy 3 #include<stdio.h> int main() {int a,b;scanf("%d",&a);b=…
暂无图片
编程学习 ·

JavaScript:利用递归实现对象深拷贝

JavaScript:利用递归实现对象深拷贝 先来普及一下深拷贝和浅拷贝的区别 浅拷贝:就是简单的复制,用等号即可完成 let a = {a: 1} let b = a这就完成了一个浅拷贝 但是当修改对象b的时候,我们发现对象a的值也被改变了 b.a = 10 console.log(a.a) => 10这是因为浅拷贝只复制…
暂无图片
编程学习 ·

Go 方法的基本概念及使用

方法基本介绍在某些情况下,我们要需要声明(定义)方法。比如 Person 结构体:除了有一些字段外( 年龄,姓名..),Person 结构体还有一些行为比如:可以说话、跑步..,通过学习,还可以做算术题。这时就要用方法才能完成。Golang 中的方法是作用在指定的数据类型上的(即:和指定的数…