java并发-通信工具类

article/2024/4/19 22:55:38

文章目录

  • 1 Semaphore
    • 1.2 Semaphore简介
    • 1.2 Semaphore案例
    • 1.3 Semaphore原理
  • 2 Exchanger
  • 3 CountDownLatch
    • 3.1 CountDownLatch简介
    • 3.2 CountDownLatch案例
  • 4 CyclicBarrier
    • 4.1 CyclicBarrier简介
    • 4.2 CyclicBarrier Barrier被破坏
    • 4.4 CyclicBarrier原理

java.util.concurrent包下,JDK中提供了一些工具类以供开发者使用。

作用
Semaphore限制线程的数量
Exchanger两个线程交换数据
CountDownLatch线程等待直到计数器减为0时开始工作
CyclicBarrier作用跟CountDownLatch类似,但是可以重复使用
Phaser增强的CyclicBarrier

1 Semaphore

1.2 Semaphore简介

Semaphore翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int类型的数据,也可以看成是一种“资源”。

可以在构造函数中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的

// 默认情况下使用非公平
public Semaphore(int permits) {sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

最主要的方法是acquire方法和release方法。acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。

每次acquirepermits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。

1.2 Semaphore案例

Semaphore往往用于资源有限的场景中,去限制线程的数量。举个例子,我想限制同时只能有3个线程在工作:

public class SemaphoreDemo {static class MyThread implements Runnable {private int value;private Semaphore semaphore;public MyThread(int value, Semaphore semaphore) {this.value = value;this.semaphore = semaphore;}@Overridepublic void run() {try {semaphore.acquire(); // 获取permitSystem.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",value, semaphore.availablePermits(), semaphore.getQueueLength()));// 睡眠随机时间,打乱释放顺序Random random =new Random();Thread.sleep(random.nextInt(1000));System.out.println(String.format("线程%d释放了资源", value));} catch (InterruptedException e) {e.printStackTrace();} finally{semaphore.release(); // 释放permit}}}public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 10; i++) {new Thread(new MyThread(i, semaphore)).start();}}
}

输出:

当前线程是1, 还剩2个资源,还有0个线程在等待
当前线程是0, 还剩1个资源,还有0个线程在等待
当前线程是6, 还剩0个资源,还有0个线程在等待
线程6释放了资源
当前线程是2, 还剩0个资源,还有6个线程在等待
线程2释放了资源
当前线程是4, 还剩0个资源,还有5个线程在等待
线程0释放了资源
当前线程是7, 还剩0个资源,还有4个线程在等待
线程1释放了资源
当前线程是8, 还剩0个资源,还有3个线程在等待
线程7释放了资源
当前线程是5, 还剩0个资源,还有2个线程在等待
线程4释放了资源
当前线程是3, 还剩0个资源,还有1个线程在等待
线程8释放了资源
当前线程是9, 还剩0个资源,还有0个线程在等待
线程9释放了资源
线程5释放了资源
线程3释放了资源

可以看到,在这次运行中,最开始是1, 0, 6这三个线程获得了资源,而其它线程进入了等待队列。然后当某个线程释放资源后,就会有等待队列中的线程获得资源。

当然,Semaphore默认的acquire方法是会让线程进入等待队列,且会抛出中断异常。但它还有一些方法可以忽略中断或不进入阻塞队列

// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)// 不进入等待队列,底层使用CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)

1.3 Semaphore原理

Semapore内部有一个继承了AQS的同步器Sync,重写了tryAcquireShared方法。这个方法会尝试去获取资源。

如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入AQS的等待队列。

2 Exchanger

Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。先来一个案例看看如何使用,比如两个线程之间想要传送字符串:

public class ExchangerDemo {public static void main(String[] args) throws InterruptedException {Exchanger<String> exchanger = new Exchanger<>();new Thread(() -> {try {System.out.println("这是线程A,得到了另一个线程的数据:"+ exchanger.exchange("这是来自线程A的数据"));} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");Thread.sleep(1000);new Thread(() -> {try {System.out.println("这是线程B,得到了另一个线程的数据:"+ exchanger.exchange("这是来自线程B的数据"));} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}

输出:

这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据

可以看到,当一个线程调用exchange方法后,它是处于阻塞状态的,只有当另一个线程也调用了exchange方法,它才会继续向下执行。看源码可以发现它是使用park/unpark来实现等待状态的切换的,但是在使用park/unpark方法之前,使用了CAS检查,估计是为了提高性能。

Exchanger一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以我们可以传输任何的数据,比如IO流或者IO缓存。根据JDK里面的注释的说法,可以总结为一下特性:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。

Exchanger类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用exchange,就会抛出一个超时异常。

public V exchange(V x, long timeout, TimeUnit unit)

需要注意的是,exchanger是可以重复使用的。也就是说。两个线程可以使用Exchanger在内存中不断地再交换数据。

3 CountDownLatch

3.1 CountDownLatch简介

先来解读一下CountDownLatch这个类名字的意义。CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。

CountDownLatch的方法也很简单,如下:

// 构造方法:
public CountDownLatch(int count)public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count

3.2 CountDownLatch案例

我们知道,玩游戏的时候,在游戏真正开始之前,一般会等待一些前置任务完成,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,玩家才能真正进入游戏。下面我们就来模拟一下这个demo。

public class CountDownLatchDemo {// 定义前置任务线程static class PreTaskThread implements Runnable {private String task;private CountDownLatch countDownLatch;public PreTaskThread(String task, CountDownLatch countDownLatch) {this.task = task;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.println(task + " - 任务完成");countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {// 假设有三个模块需要加载CountDownLatch countDownLatch = new CountDownLatch(3);// 主任务new Thread(() -> {try {System.out.println("等待数据加载...");System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));countDownLatch.await();System.out.println("数据加载完成,正式开始游戏!");} catch (InterruptedException e) {e.printStackTrace();}}).start();// 前置任务new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();}
}

输出:

等待数据加载…
还有3个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!

其实CountDownLatch类的原理挺简单的,内部同样是一个继承了AQS的实现类Sync,且实现起来还很简单,可能是JDK里面AQS的子类中最简单的实现了,有兴趣的读者可以去看看这个内部类的源码。

需要注意的是构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。

4 CyclicBarrier

4.1 CyclicBarrier简介

CyclicBarrirer从名字上来理解是“循环的屏障”的意思。前面提到了CountDownLatch一旦计数值count被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障

4.2 CyclicBarrier Barrier被破坏

如果参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以用isBroken()方法检测Barrier是否被破坏。

  1. 如果在等待的过程中,线程被中断,会抛出InterruptedException异常,并且这个异常会传播到其他所有的线程。
  2. 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出BrokenBarrierException,屏障被损坏。
  3. 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出BrokenBarrierException异常。
  4. 如果有线程已经处于等待状态,调用reset方法会导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会导致始终无法等待。

我们同样用玩游戏的例子。如果玩一个游戏有多个“关卡”,那使用CountDownLatch显然不太合适,那需要为每个关卡都创建一个实例。那我们可以使用CyclicBarrier来实现每个关卡的数据加载等待功能。

public class CyclicBarrierDemo {static class PreTaskThread implements Runnable {private String task;private CyclicBarrier cyclicBarrier;public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {this.task = task;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {// 假设总共三个关卡for (int i = 1; i < 4; i++) {try {Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.println(String.format("关卡%d的任务%s完成", i, task));cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}}public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {System.out.println("本关卡所有前置任务完成,开始游戏...");});new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();}
}

输出:

关卡1的任务加载地图数据完成
关卡1的任务加载背景音乐完成
关卡1的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡2的任务加载地图数据完成
关卡2的任务加载背景音乐完成
关卡2的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡3的任务加载人物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景音乐完成
本关卡所有前置任务完成,开始游戏…

注意这里跟CountDownLatch的代码有一些不同。CyclicBarrier没有分为await()countDown(),而是只有单独的一个await()方法。

一旦调用await()方法的线程数量等于构造方法中传入的任务总量(这里是3),就代表达到屏障了CyclicBarrier允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个Runnable类型的对象。上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏…”。

// 构造方法
public CyclicBarrier(int parties) {this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {// 具体实现
}

4.4 CyclicBarrier原理

CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使用的是Lock + Condition实现的等待/通知模式。详情可以查看这个方法的源码:

private static class Generation {boolean broken = false;
}private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {/获取当前锁/final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;/若generation被打破了,抛出异常/if (g.broken)throw new BrokenBarrierException();/若线程被中断了,打破/if (Thread.interrupted()) {/将当前的障碍物生成设置为破坏,并唤醒所有人。仅在保持锁定时调用。/breakBarrier();throw new InterruptedException();}/count是仍在等待的缔约方数量。在每一代中,从缔约方数减至0。在每一个新的世代或broken时,它被重置。/int index = --count;/若当前等待方==0,则执行传入的线程命令/if (index == 0) {  // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {/若非定时任务/if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}

http://www.ngui.cc/article/show-1007571.html

相关文章

浅谈Dubbo的异步调用

之前简单写了一下dubbo线程模型&#xff0c;提到了Dubbo底层是基于NIO的Netty框架实现的&#xff0c;通过IO线程池和Work线程池实现了请求和业务处理之间的异步从而提升性能。 这篇文章要写的是Dubbo对于消费端调用和服务端接口业务逻辑处理的异步&#xff0c;在2.7版本中Dubb…

1650_MIT 6.828 open函数的简单梳理

全部学习汇总&#xff1a; GreyZhang/g_unix: some basic learning about unix operating system. (github.com) 一个shell的例程分析了很长时间&#xff0c;里面的基础知识一层嵌套一层。不过&#xff0c;这也是补充基本知识的很好的机会。既然自己日常接触的更多的还是这种通…

C++ 15 string容器

目录 一、string容器 1.1 简介 1.2 构造函数 1.3 赋值操作 1.4 字符串拼接 1.5 字符串查找和替换 1.6 字符串比较 1.7 字符串存取 1.8 字符串插入和删除 1.9 子串获取 一、string容器 1.1 简介 ① string是C风格的字符串&#xff0c;而string本质上是一个类。 ② s…

TryHackMe-Sustah(boot2root)

Sustah 开发人员在他们的游戏中添加了反作弊措施。你是 能否突破限制以访问其内部 CMS&#xff1f; 端口扫描 循例 nmap Web枚举 80端口没啥东西&#xff0c;看一下8085端口 gobuster扫一下 /ping似乎没什么东西 回来home&#xff0c;看看burp 使用bash生成数字字典 使用ff…

【mongodb 基础2】Install MongoDB Community Edition on macOS

文章目录一. 安装准备Install Xcode Command-Line ToolsInstall Homebrew二. Installing MongoDB 6.0 Community Edition1. 下载MongoDB Homebrew 组件包2. 更新组件包3. 安装MongoDBTo install MongoDB三. 安装后包含的组件四. Run&stop MongoDB1. 作为macOS服务的方式运行…

计算机二级考试(C++)复习

文章目录基础知识部分C知识点部分C流操作基础知识部分 指令周期&#xff1a; 一般把计算机完成一条指令所花费的时间称为一个指令周期。指令周期越短&#xff0c;指令执行就越快。 顺序程序&#xff1a; 顺序程序具有顺序性、封闭性和可再现性的特点&#xff0c;使得程序设…

用 Java 演奏千千阙歌是什么体验?

JFugue简介 ​JFugue 是一个开放源代码编程库&#xff0c;它允许人们使用 Java 编程语言来编程音乐&#xff0c;而无需 MIDI 的复杂性。它由 David Koelle 于 2002 年首次发布。当前版本是 JFugue 5.0&#xff0c;已于 2015 年 3 月发布。Brian Eubanks 将 JFugue 描述为 “对于…

一个看起来非常科幻的人脸识别接口与其实现逻辑,用于二次开发

前言看起来非常高端的人脸识别接口&#xff0c;简单的进行二次开发就可以衍生为人脸识别考勤系统、人脸识别安全系统等等&#xff1b;展厅以及实现逻辑启动界面点击“是”&#xff1a;人脸已经录入数据库&#xff0c;识别失败弹出Warning可能因为误判&#xff0c;重新识别&…

Unity即时战略/塔防项目实战(一)——构造网格建造系统

Unity即时战略/塔防项目实战&#xff08;一&#xff09;—— 构造网格建造系统 效果展示 Unity RTS游戏网格建造系统实现原理 地形和格子划分&#xff0c;建造系统BuildManager构建 地形最终需要划分成一个一个的小方格&#xff0c;首先定义一下小方格&#xff1a; private…

2022(一等奖)D678基于改进结构函数法的大气气溶胶遥感反演

作品介绍 1 应用背景 大气气溶胶是大气中重要的成分之一&#xff0c;是悬浮于大气中的固体和液体微粒与它们的气体载体共同组成的多相体系&#xff0c;其尺度大约在10-3到102 μm之间。大气气溶胶的特性对空气质量具有良好的指示作用&#xff0c;气溶胶的研究对空气质量的监测…