AQS框架应用(五)

el/2024/2/25 22:35:57

AQS框架应用(五)

定义:

​ 基于AQS框架,Doug Lea给出了很多内置的实现类

基于AQS框架的实现

实现类:

可以看出,有多个工具类是基于AQS框架实现的

  • 独占式同步信号量

    • ReentrantLock

    • Semaphore

  • 共享式同步信号量

    • CountDownLatch
  • 混合同步信号量

    • ReentrantReadWriteLock

ReentrantLock

定义:

​ 它是基于AQS实现的一个==独占式锁==,分为公平和非公平版本

AQS要求实现子类必须实现tryAcquire()/tryRelease()方法,接下来介绍ReentrantLock对于这两个方法的实现

组成:

  • 公平锁 —— FairSync
  • 非公平锁 —— NonFairSync

获取锁

NonFairSync.tryAcquire(int)

定义:

​ 在非公平情况下,去实现tryAcquire()方法,调用本方法去尝试获取锁

源码:

protected final boolean tryAcquire(int acquires) {// 调用nonfairTryAcquire()获取锁return nonfairTryAcquire(acquires);
}
nonfairTryAcquire(int)

定义:

​ 调用本方法去尝试获取锁,套娃

源码:

/*** 尝试获取非公平锁* acquires == 1 默认*/
final boolean nonfairTryAcquire(int acquires) {// 获取当前线程final Thread current = Thread.currentThread();// 拿到当前同步信号量的值int c = getState();// 判断state状态是否为0,为0直接加锁if (c == 0) {// 不需要判断同步队列(CLH)中是否有排队等待线程,因为是非公平锁// 将state修改为1,标记获得锁if (compareAndSetState(0, acquires)) {// 独占状态锁持有者指向当前线程setExclusiveOwnerThread(current);return true;}}/*** state状态不为0,判断锁持有者是否是当前线程,* 如果是当前线程持有 则state+1*/else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}//加锁失败return false;
}

总结:

  1. 获取state信号量,如果为0,不需要判断CLH是否有等待线程,直接CAS抢锁,return抢锁是否成功
  2. 如果不为0,判断是不是自己拿到了锁
  3. 如果是自己拿到锁,state累加return true
  4. 如果不是自己拿到锁,则return false
FairSync.tryAcquire(int)

定义:

​ 在公平情况下,去实现tryAcquire()方法,调用本方法去尝试获取锁

源码:

/*** 重写aqs中的方法逻辑* 尝试加锁,被AQS的acquire()方法调用*/
protected final boolean tryAcquire(int acquires) {// 获取当前线程final Thread current = Thread.currentThread();// 拿到当前同步信号量的值int c = getState();// 判断state状态是否为0,为0直接加锁if (c == 0) {/*** ==与非公平锁中的区别==* 需要先判断 CLH队列 当中是否有等待的节点* 如果没有则可以尝试CAS获取锁*/if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {// 独占线程指向当前线程setExclusiveOwnerThread(current);return true;}}// 如果status不为0,则判断持有锁的线程 是否 是当前线程else if (current == getExclusiveOwnerThread()) {// 是当前线程,信号量++int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}// 加锁失败return false;
}

总结:

  1. 拿到state信号量,如果为0
    1. 判断CLH队列是否有等待线程,有等待线程return false
    2. 如果CLH中没有等待线程,进行CAS抢锁,return抢锁是否成功
  2. 如果不为0
    1. 判断当前持有锁线程是否为自己不是return false
    2. 是自己,则让信号量state = state + acquiresreturn true

释放锁

定义:

​ 公平锁与非公平锁释放锁的逻辑是一样的,都是调用同一个方法tryRelease()

tryRelease()

定义:

​ 该方法用于释放锁

源码:

/*** 释放锁*/
protected final boolean tryRelease(int releases) {// 得到释放后的信号量int c = getState() - releases;// 判断当前释放信号量的线程是否为拥有锁的线程if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 当持有锁的线程完全释放后!锁才能被释放if (c == 0) {free = true;setExclusiveOwnerThread(null);}// 修改信号量setState(c);// 返回是否释放锁 【即当前线程是否完全归还信号量】return free;
}

总结:

  1. 计算释放后的信号量
  2. 合法性判断 【是否为拥有锁的线程来释放信号量
  3. state==0 表示完全归还信号量 【也就是释放锁】,return true
  4. 否则修改state,return false 【释放部分信号量成功,但未释放锁】

应用 BlockingQueue

定义:

​ BlockingQueue,是用于解决 并发生产者 - 消费者问题 的最有用的类。它保证任意时刻只有一个线程可以进行take或者put操作,并且提供了超时 return null 的机制。

类型:

  1. 无限队列 —— 几乎可以无限增长
  2. 有限队列 —— 定义了最大容量

队列数据结构:

  • 通常用链表或者数组实现
  • 具有FIFO先进先出特性,也有双向队列(Deque)优先级队列
  • 主要操作 —— 入队(EnQueue) 与 出队 (DeQueue)

常见的4种阻塞队列

  • ArrayBlockingQueue —— 由数组支持的有界队列
  • LinkedBlockingQueue —— 由链接节点支持的可选有界队列
  • PriorityBlockingQueue —— 由优先级堆支持的无界优先级队列
  • DelayQueue —— 由优先级堆支持的、基于时间的调度队列
原理

阻塞队列每一种操作都根据ReentrantLock来加锁进行保证 任意时刻==只有一个线程可以进行take或者put操作==

以ArrayBlockingQueue来举例:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 构造函数中,单例生成 */final ReentrantLock lock;// 构造函数,单例生成一个public ArrayBlockingQueue(int capacity, boolean fair) {// ...lock = new ReentrantLock(fair);// ...}// 添加元素操作public boolean offer(E e) {checkNotNull(e);// 获取单例锁final ReentrantLock lock = this.lock;// 通过lock锁,锁住lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}// 删除元素操作public E poll() {// 获取单例锁final ReentrantLock lock = this.lock;// 通过lock锁,锁住lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}
}
ArrayBlockingQueue

定义:

​ 队列基于数组实现,容量大小在创建时已经定义

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();

ArrayBlockingQueue

应用场景:

在线程池中有比较多的应用,生产者 - 消费者场景

工作原理:

基于ReentrantLock保证线程安全,根据Condition实现队列满时的阻塞

LinkedBlockingQueue

定义:

​ 基于链表实现的无界队列(理论上有界)

BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();

特点:

向无限队列添加元素的所有操作都将永远不会阻塞,[注意这里 不是说不会加锁 保证线程安全],因此它可以增长到非常大的容量。

应用场景:

生产者 - 消费者场景使用时 消费者应该能够像生产者向队列添加消息一样快地消费消息 【否则产生OutOfMemory异常】

DelayQueue

定义:

​ 由优先级堆支持的、基于时间的调度队列,

  • 内部基于无界队列PriorityQueue实现
  • 无界队列基于数组的扩容实现
BlockingQueue<String> blockingQueue = new DelayQueue();

要求:

​ 入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口

应用场景:

​ 电影票

工作原理:

​ 队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

BlockingQueue API

BlockingQueue 接口的所有方法可以分为两大类:

  • 向队列添加元素的方法
  • 检索这些元素的方法

队列满/空的情况下,来自这两个组的每个方法的行为都不同

添加元素

方法说明
add()如果插入成功则返回 true,否则抛出 IllegalStateException 异常
put()将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入
offer()如果插入成功则返回 true,否则返回 false
offer(E e, long timeout, TimeUnit unit)尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入

检索元素

方法说明
take()获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit)检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null

构建生产者 - 消费者程序时,这些方法是 BlockingQueue 接口中最重要的构建块。

Semaphore

定义:

​ 它是基于AQS实现的一个==共享式==锁,分为公平和非公平版本

AQS要求实现子类必须实现tryAcquire()/tryRelease()方法,接下来介绍ReentrantLock对于这两个方法的实现

获取资源

非公平的acquire()

定义:

非公平情况下,获取共享式资源

源码:

// AQS父类顶层入口
public final void acquireShared(int arg) {// 获取锁失败,则节点阻塞if (tryAcquireShared(arg) < 0)// AQS维护的节点阻塞方法doAcquireShared(arg);
}// Semaphore类实现的tryAcquire()允许外部调用的方法 【非公平】
public boolean tryAcquire() {// 根据下面nonfairTryAcquireShared()// 返回正值,说明抢到了锁// 返回负值,说明没有抢到锁 【资源不足】return sync.nonfairTryAcquireShared(1) >= 0;
}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}
/* 获取非公平共享信号量 锁 */
final int nonfairTryAcquireShared(int acquires) {for (;;) {// 拿到当前的信号量 【剩余资源】int available = getState();// 尝试分配后的信号量int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))// remaining<0 进入这个return,说明资源不足,返回负值// CAS 进入这个return,说明分配成功,返回正值return remaining;}
}
公平的acquire()

定义:

公平情况下,获取共享式锁

源码:

// AQS父类顶层入口
public final void acquireShared(int arg) {// 获取锁失败,则节点阻塞if (tryAcquireShared(arg) < 0)// AQS维护的节点阻塞方法doAcquireShared(arg);
}
// Semaphore子类根据AQS要求实现的tryAcquire()
protected int tryAcquireShared(int acquires) {for (;;) {// 判断CLH等待队列是否为空 【与非公平锁的不同】if (hasQueuedPredecessors())return -1;// 同样的逻辑int available = getState();// 剩余量int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))// remaining<0 进入这个return,说明资源不足,返回负值// CAS 进入这个return,说明分配成功,返回正值return remaining;}
}
总结
  1. 拿到当前信号量,计算剩余量remaining
  2. 如果剩余量小于零,返回负值
  3. 如果剩余量大于零
    • 非公平锁进行CAS抢锁
    • 公平锁判断队列有无节点
      • 有则入队 返回负值
      • 没有则进行CAS抢锁

注意:

  • 默认情况下,是非公平锁。

  • 直接调用 tryAcquire(),走非公平锁的逻辑

  • 调用公平还是非公平,取决于实例化的Sync

释放资源

定义:

​ 公平与非公平一样,释放共享式资源

release()

归还资源

源码:

public void release() {sync.releaseShared(1);
}// 实现AQS框架指定的tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {for (;;) {// 获取当前信号量int current = getState();// 做求和得到 新的信号量int next = current + releases;if (next < current) // 如果溢出throw new Error("Maximum permit count exceeded");// CAS不断设置if (compareAndSetState(current, next))return true;}
}

总结

Semaphore是实现了AQS的一个共享式同步信号量的类,它可以用作服务限流设置

  • accquire()方法对共享同步信号量作减法 【取出操作
  • release()方法对共享同步信号量作加法 【归还操作

CountDownLatch

定义:

​ 它是基于AQS实现的、可中断的一种多线程计数器

开始计数 countDown()

​ 类的入口方法为 countDown()方法

public void countDown() {sync.releaseShared(1);
}protected boolean tryReleaseShared(int releases) {for (;;) {// 获取信号量,资源int c = getState();// 如果资源为0,则没有资源可给了,返回falseif (c == 0)return false;// 剩余资源int nextc = c-1;// CAS释放if (compareAndSetState(c, nextc))return nextc == 0;}
}

等待总数 await()

​ 类的入口方法为 await()方法

// 方法入口
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// AQS中,如果中断则终止,尝试获取同步信号量
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)// 小于0表示失败,失败则进入等待队列doAcquireSharedInterruptibly(arg);
}// CountDownLatch根据AQS规定的实现tryAcquireShared()
protected int tryAcquireShared(int acquires) {// 判断当前信号量是否为0return (getState() == 0) ? 1 : -1;
}

总结

CountDownLatch是一个对多线程计数的类,实现了AQS中共享式同步信号量

  • countDown():对信号量–,可用于统计线程数
  • await():阻塞,对信号量进行判断,如果信号量state == 0,说明不需要再等待任何线程,可以往下执行。

http://www.ngui.cc/el/4893705.html

相关文章

类加载机制(一)

类加载 定义&#xff1a; ​ 当 用到某个类 时&#xff0c;首先需要通过 类加载器 把类加载到JVM上 组成&#xff1a; 引导类加载器 —— 加载lib目录扩展类加载器 —— 加载ext目录应用程序类加载器 —— 加载ClassPath目录 【用户写的】自定义加载器 —— 加载用户自定义…

JVM内存模型(二)

JVM内存模型 定义&#xff1a; ​ JVM内存模型&#xff0c;又称为运行时数据区。【区别Java内存模型】 紫色 —— 线程私有 浅黄色 —— 线程共享 虚拟机栈 定义&#xff1a; ​ 虚拟机栈是一个线程私有的、FILO的结构。 ​ 一个线程到来时&#xff0c;从虚拟机栈中&…

JVM内存分配机制详解(三)

JVM对象创建过程 1.类加载检查 流程&#xff1a; JVM在遇到一条new指令检查这个指令的参数是否在常量池中定位到一个类的符号引用 【方法区】并检查 符号引用 所代表的类是否被类加载器加载 包括 new关键字、对象克隆、对象序列化等等 **总的来说&#xff1a;**执行一个类…

Spring IOC (三)

Spring IOC &#xff08;三&#xff09; refresh() 后八个方法 refresh() 定义&#xff1a; ​ 这个方法中&#xff0c;内置了13个方法&#xff0c;做了很多事情 方法&#xff1a; 方法作用prepareRefresh()保存了容器的启动时间&#xff0c;启动标志等obtainFreshBeanFac…

Spring循环依赖(四)

循环依赖 什么是循环依赖&#xff1f; 依赖&#xff1a;在Spring中&#xff0c;A类将其他的类当作属性时&#xff0c;就造成了A依赖其他类 定义&#xff1a; ​ 现有A、B两个类&#xff0c;如果A类将B类作为属性&#xff08;并设置Autowired&#xff09;&#xff0c;B类也将…

想要学习Python?8年老Python程序员的自学书单曝光,入门必看

什么是Python&#xff1f; 它是一种类似C语言的人工智能便捷语言。它简单易学、并且开源免费&#xff0c;应用面广&#xff0c;所以&#xff0c;是自学门槛相对低的一种计算机语言&#xff0c;能为你的生活、工作提升效率。 为什么选择学习python的人越来越多&#xff1f; 全…

Python基础学习:都给我学起来,这样快速处理CSV、JSON和XML数据的方法太简便了

Python的卓越灵活性和易用性使其成为最受欢迎的编程语言之一&#xff0c;尤其是对于数据处理和机器学习方面来说&#xff0c;其强大的数据处理库和算法库使得python成为入门数据科学的首选语言。在日常使用中&#xff0c;CSV&#xff0c;JSON和XML三种数据格式占据主导地位。下…

Python 强大的信号库:必看的blinker 入门级教程,简单易懂

1 信号 信号是一种通知或者说通信的方式&#xff0c;信号分为发送方和接收方。发送方发送一种信号&#xff0c;接收方收到信号的进程会跳入信号处理函数&#xff0c;执行完后再跳回原来的位置继续执行。 常见的 Linux 中的信号&#xff0c;通过键盘输入 CtrlC&#xff0c;就是…

初学者别找了,Python Pandas快速入门教程,吐血总结全网最详细教程

Python 是开源的&#xff0c;它很棒&#xff0c;但是也无法避免开源的一些固有问题&#xff1a;很多包都在做&#xff08;或者在尝试做&#xff09;同样的事情。如果你是 Python 新手&#xff0c;那么你很难知道某个特定任务的最佳包是哪个&#xff0c;你需要有经验的人告诉你。…

吐血整理:大厂Python学习路线,从青铜到王者,从此打卡学习不迷茫

前言 小编知道 很多朋友都对成为技术大牛有着深深的向往 并有很多技术问题想要解决 因为市面上各种教程质量良莠不齐 而且想要掌握高阶的开发技术 需要耗费大量的时间和精力 现如今每天有数以百万计的人使用 Python &#xff0c;用户群呈现出指数级增长&#xff0c;几乎没…