CountDownLatch实现原理全面解析

news/2024/7/27 8:35:41/文章来源:https://blog.csdn.net/weixin_43759352/article/details/136545417

简介

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步(即:用于线程之间的通信而不是互斥)。它允许一个或多个线程进入等待状态,直到其他线程执行完毕后,这些等待的线程才继续执行。

CountDownLatch通过一个计数器来实现,其中维护了一个count变量和操作该变量的两个主要方法:

  • await()方法:线程调用await()方法,会使调用该方法的线程进入阻塞状态,并将其加入到阻塞队列中。
  • countDown()方法:线程调用countDown()方法,会将CountDownLatch中count的值-1。当count变量的值递减为0,会唤醒阻塞队列中调用await()方法的线程继续执行业务处理。

应用场景

CountDownLatch是一种非常实用的并发控制工具,它的主要应用场景:

  • 主线程等待多个子线程完成任务处理。如:主线程等待其他线程各自完成任务处理后,再继续执行。

  • 实现多个线程开始执行任务处理的最大并行性(注意:是并行而非并发)。如:多个线程需要在同一时刻开始执行任务处理,可以通过如下方式实现:

    1)初始化一个的CountDownLatch变量(计数器的初始化值为1)。

    2)需要在同一时刻执行任务处理的所有线程调用CountDownLatch.await()方法进入阻塞状态。

    3)主线程调用CountDownLatch.countDown()方法将计数器-1(此时计数器的值为0),唤醒所有调用CountDownLatch.await()方法进入阻塞状态的线程开始执行任务处理。

实现原理

CountDownLatch中定义了一个Sync类型的变量和操作该变量的方法。

源码如下:

// Sync类型的同步变量
private final Sync sync;
// 构造函数,用于初始化CountDownLatch计数器
public CountDownLatch(int count) {...}
// 当前线程进入阻塞状态,直到AQS中的state(计数器)值为0,或者当前线程被其他线程中断。
public void await() throws InterruptedException {...}
// 当前线程进入阻塞状态,直到AQS中的state(计数器)值为0,或者当前线程等待超时或者被其他线程中断。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {...}
// 递减AQS中的state(计数器)值,如果state的值递减为0,则唤醒调用await()方法进入阻塞的线程。
public void countDown() {...}
// 返回state的值。
public long getCount() {...}
// 返回标识CountDownLatch及其计数器值的字符串。
public String toString() {...}

其中,最重要的是sync类型的变量、await()和countDown()方法。

Sync

Sync是CountDownLatch的静态内部类器,它继承了
AbstractQueuedSynchronizer(AQS),主要用于CountDownLatch的同步状态,创建CountDownLatch时进行初始化。

CountDownLatch构造函数:

public CountDownLatch(int count) {// 如果传入的count值小于0,则抛出IllegalArgumentException异常if (count < 0) throw new IllegalArgumentException("count < 0");// 初始化Syncthis.sync = new Sync(count);
}

初始化Sync时,将传入的count参数值赋值给AQS的同步状态state,state是一个volatile修饰的int值,一个线程修改了state值,其他线程能够立刻感知,从而保证state值在并发场景下的可见性。

同时,Sync实现了AQS的tryAcquireShared()和tryReleaseShared()方法:

java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
// 尝试获取共享资源
protected int tryAcquireShared(int acquires) {/*** 用于根据state(计数器)的值来尝试获取共享资源:*   state的值为0,返回1,表示可以获取共享资源。*   state的值不为0,返回-1,表示无法获取共享资源。*/return (getState() == 0) ? 1 : -1;
}
// 尝试释放共享资源(AQS)
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {// 获取当前state的值int c = getState();// 如果state的值为0,则返回false(即:没有需要释放的资源)if (c == 0)return false;// 如果state的值大于0,则将state的值-1,并通过CAS的方式更新state的最新值int nextc = c-1;if (compareAndSetState(c, nextc))/** * 返回资源释放结果:*   释放资源后state的值为0,则返回true,表示可以唤醒调用await()方法进入阻塞的线程。*   释放资源后state的值不为0,则返回false,表示继续阻塞调用await()方法的线程,直到state的值被减为0。*/return nextc == 0;}
}

await方法

CountDownLatch通过CountDownLatch#await方法调用CountDownLatch.Sync#tryAcquireShared方法尝试获取共享资源:

  • 获取到共享资源,则唤醒调用await()方法的线程执行业务处理。
  • 获取不到共享资源,则继续阻塞调用await()方法的线程,直到state的值递减为0(即:其他线程释放完共享资源)。

CountDownLatch#await方法源码解析:

// java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {// 获取共享资源(可中断)sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
// 获取共享资源(AQS)
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 如果线程被其他线程中断,则抛出InterruptedException异常if (Thread.interrupted())throw new InterruptedException();// 具体由继承AQS的Sync的tryAcquireShared()方法实现if (tryAcquireShared(arg) < 0)// 如果获取共享资源锁失败,则将当前线程封装成Node节点追加到CLH队列的末尾,等待被唤醒(即:进入阻塞)doAcquireSharedInterruptibly(arg);
}

其中,
doAcquireSharedInterruptibly()方法源码解析请移步主页查阅->「一文搞懂」AQS(抽象队列同步器)实现原理及源码解析。

countDown方法

CountDownLatch通过CountDownLatch#countDown方法调用CountDownLatch.Sync#tryReleaseShared方法尝试释放共享资源:

  • 如果释放某个共享资源后state的值为0,则唤醒调用await()方法的线程执行业务处理。
  • 如果释放某个共享资源后state的值不为0,则继续阻塞调用await()方法的线程,直到state的值被减为0。

CountDownLatch#countDown方法源码解析:

// java.util.concurrent.CountDownLatch#countDown
public void countDown() {// 释放共享资源(数量为1)sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
// 释放共享资源(AQS)
public final boolean releaseShared(int arg) {// 具体由继承AQS的Sync的tryReleaseShared()方法实现if (tryReleaseShared(arg)) {// 唤醒后继节点doReleaseShared();return true;}return false;
}

使用示例

主线程等待子线程完成处理

/*** @author 南秋同学* 主线程等待多个子线程完成任务处理*/
@Slf4j
public class CountDownLatchExample {@SneakyThrowspublic static void main(String[] args) {// 初始化一个的CountDownLatch变量(计数器的初始化值为5)CountDownLatch cdl = new CountDownLatch(5);// 初始化一个固定大小的线程池ExecutorService service = Executors.newFixedThreadPool(5);for(int i = 0; i < 5 ; i++){// 创建Runnable线程Runnable runnable = new Runnable() {@SneakyThrows@Overridepublic void run() {log.info("子线程-{}开始执行...", Thread.currentThread().getName());Thread.sleep((long) (Math.random() * 10000));log.info("子线程-{}执行完成", Thread.currentThread().getName());cdl.countDown();}};service.execute(runnable);}log.info("主线程-{}等待所有子线程执行完成...",Thread.currentThread().getName());cdl.await();log.info("所有子线程执行完成,开始执行主线程-{}",Thread.currentThread().getName());}
}

执行结果:

14:13:01.299 [pool-1-thread-3]  - 子线程-pool-1-thread-3开始执行...
14:13:01.299 [pool-1-thread-2]  - 子线程-pool-1-thread-2开始执行...
14:13:01.299 [main]  - 主线程-main等待所有子线程执行完成...
14:13:01.299 [pool-1-thread-1]  - 子线程-pool-1-thread-1开始执行...
14:13:01.299 [pool-1-thread-5]  - 子线程-pool-1-thread-5开始执行...
14:13:01.299 [pool-1-thread-4]  - 子线程-pool-1-thread-4开始执行...
14:13:02.739 [pool-1-thread-3]  - 子线程-pool-1-thread-3执行完成
14:13:03.792 [pool-1-thread-1]  - 子线程-pool-1-thread-1执行完成
14:13:04.752 [pool-1-thread-5]  - 子线程-pool-1-thread-5执行完成
14:13:07.761 [pool-1-thread-4]  - 子线程-pool-1-thread-4执行完成
14:13:10.384 [pool-1-thread-2]  - 子线程-pool-1-thread-2执行完成
14:13:10.385 [main]  - 所有子线程执行完成,开始执行主线程-main

多线程最大并行处理

/*** @author 南秋同学* 实现多个线程开始执行任务处理的最大并行性*/
@Slf4j
public class CountDownLatchExample {@SneakyThrowspublic static void main(String[] args) {// 初始化一个的CountDownLatch变量(计数器的初始化值为1)CountDownLatch referee = new CountDownLatch(1);// 初始化一个的CountDownLatch变量(计数器的初始化值为5)CountDownLatch sportsman = new CountDownLatch(5);// 初始化一个固定大小的线程池ExecutorService service = Executors.newFixedThreadPool(5);for(int i = 0; i < 5 ; i++){// 创建Runnable线程Runnable runnable = new Runnable() {@SneakyThrows@Overridepublic void run() {log.info("运动员-{},等待裁判发布开始口令", Thread.currentThread().getName());referee.await();log.info("运动员-{},收到裁判发布的开始口令,起跑...", Thread.currentThread().getName());Thread.sleep((long) (Math.random() * 10000));log.info("运动员-{}到达终点", Thread.currentThread().getName());sportsman.countDown();}};service.execute(runnable);}log.info("裁判-{}准备发布开始口令...",Thread.currentThread().getName());Thread.sleep((long) (Math.random() * 10000));referee.countDown();log.info("裁判-{}已经发布开始口令,等待所有选手达到终点...",Thread.currentThread().getName());sportsman.await();log.info("所有运动员达到终点,裁判-{}开始计分",Thread.currentThread().getName());}
}

执行结果:

13:56:14.683 [pool-1-thread-3]  - 运动员-pool-1-thread-3,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-2]  - 运动员-pool-1-thread-2,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-5]  - 运动员-pool-1-thread-5,等待裁判发布开始口令
13:56:14.683 [main]  - 裁判-main准备发布开始口令...
13:56:14.683 [pool-1-thread-1]  - 运动员-pool-1-thread-1,等待裁判发布开始口令
13:56:14.683 [pool-1-thread-4]  - 运动员-pool-1-thread-4,等待裁判发布开始口令
13:56:18.205 [main]  - 裁判-main已经发布开始口令,等待所有选手达到终点...
13:56:18.205 [pool-1-thread-2]  - 运动员-pool-1-thread-2,收到裁判发布的开始口令,起跑...
13:56:18.205 [pool-1-thread-3]  - 运动员-pool-1-thread-3,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-5]  - 运动员-pool-1-thread-5,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-4]  - 运动员-pool-1-thread-4,收到裁判发布的开始口令,起跑...
13:56:18.206 [pool-1-thread-1]  - 运动员-pool-1-thread-1,收到裁判发布的开始口令,起跑...
13:56:22.110 [pool-1-thread-4]  - 运动员-pool-1-thread-4到达终点
13:56:23.866 [pool-1-thread-1]  - 运动员-pool-1-thread-1到达终点
13:56:26.803 [pool-1-thread-3]  - 运动员-pool-1-thread-3到达终点
13:56:28.019 [pool-1-thread-5]  - 运动员-pool-1-thread-5到达终点
13:56:28.178 [pool-1-thread-2]  - 运动员-pool-1-thread-2到达终点
13:56:28.179 [main]  - 所有运动员达到终点,裁判-main开始计分

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_997302.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】终の指针(前篇)

个人主页点这里~ 指针初阶点这里~ 指针初阶2.0点这里~ 指针进阶点这里~ 终の指针 一、回调函数二、qsort函数1、整形比较2、结构数据比较①结构体②-> 的使用③结构数据比较 一、回调函数 回调函数就是⼀个通过函数指针调用的函数。 把一个函数的指针作为参数传递给另一…

仓储管理系统(WMS) 的研发历程-PRD撰写

题外话&#xff1a;PRD的展现形式有多种&#xff0c;有的人喜欢在axure上直接做产品描述&#xff0c;觉得word较为过时&#xff0c;有的人认为axure不专业&#xff0c;任何展现形式都无可厚非&#xff0c;重要的达到PRD的目的&#xff0c;PRD的目标是让团队知道需求实现细节&am…

vue基础教程(4)——深入理解vue项目各目录

博主个人微信小程序已经上线&#xff1a;【中二少年工具箱】。欢迎搜索试用 正文开始 专栏简介1. 总览2. node_modules3.public4.src5.assets6.components7.router8.stores9.views10.App.vue11.main.js12.index.html 专栏简介 本系列文章由浅入深&#xff0c;从基础知识到实战…

Ajax、Axios、Vue、Element与其案例

目录 一.Ajax 二.Axios 三.Vue 四.Element 五.增删改查案例 一.依赖&#xff1a;数据库&#xff0c;mybatis&#xff0c;servlet&#xff0c;json-对象转换器 二.资源&#xff1a;elementvueaxios 三.pojo 四.mapper.xml与mapper接口 五.service 六.servlet 七.html页…

vue3的基本使用(1)

Vue3的基本使用&#xff08;1&#xff09; 初识vue31. vue3简介2. 性能提升3. 源码升级 Vue3的创建1. vue-cli创建2. vite创建 Composition API的区别&#xff08;组合式&#xff09;setup函数响应式数据1. ref响应式2. reactive响应式 toRefs与toRef简单介绍 初识vue3 1. vue…

Easticsearch性能优化之索引优化

Easticsearch性能优化之索引优化 一、合理的索引设计二、合理的分片和副本三、合理的索引设置 对于性能优化&#xff0c;Elasticsearch&#xff08;以下简称ES&#xff09;的索引优化是提高性能的关键因素之一。合理的设计索引&#xff0c;合理的分片和副本以及合理的缓存设置等…

Matlab 机器人工具箱 例程:运动学+动力学+路径规划+可视化

文章目录 1 创建机器人2 机器人显示3 机器人示教4 机器人路径规划&#xff1a;给定关节角路径5 机器人路径规划&#xff1a;给定末端位姿&#xff0c;求关节角路径6 工作空间可视化参考链接 1 创建机器人 clc;clear;close all; deg pi/180;L1 Revolute(d, 0, a, 0, alpha, 0,…

sudo command not found

文章目录 一句话Intro其他操作 一句话 sudo 某命令 改成 sudo -i 某命令 试试。 -i 会把当前用户的环境变量带过去&#xff0c;这样在sudo的时候&#xff0c;有更高的权限&#xff0c;有本用户的环境变量(下的程序命令)。 -i, --login run login shell as the target user; a …

蓝桥杯嵌入式模板构建——RCT时钟

在CubeMX里的RTC模块启用RTC时钟和日历功能 输入到RTC的时钟要配置成1HZ,这样的话RTC每经过1s走时一次 由于RTC时钟默认配置为32Khz 所以我们需要将异步分频值与同步分频值的乘积调整为32K分频即可一秒走时一次 频率&#xff1a;32000hz / 32000hz 1hz 必须是31和999&#…

数字化转型导师坚鹏:如何制定证券公司数字化转型年度培训规划

如何制定与实施证券公司数字化转型年度培训规划 ——以推动证券公司数字化转型战略落地为核心&#xff0c;实现知行果合一 课程背景&#xff1a; 很多证券公司都在开展数字化转型培训工作&#xff0c;目前存在以下问题急需解决&#xff1a; 缺少针对性的证券公司数字化转型…

Java定时调度

在Java应用程序中&#xff0c;定时调度是一项重要的任务。它允许你安排代码执行的时间&#xff0c;以便在将来的某个时刻自动执行任务。Java提供了多种方式来实现定时调度&#xff0c;其中最常用的是Java的Timer和ScheduledExecutorService。 在本教程中&#xff0c;我们将学习…

力扣新思路:改变字符串进行返回操作

我们在对字符串进行判断操作和返回操作时&#xff0c;可以增加\0来简化返回操作 char* longestCommonPrefix(char** strs, int strsSize) {if(strsSize0){return"";}for(int i0;i<strlen(strs[0]);i){for(int j1;j<strsSize;j){if(strs[0][i]!strs[j][i]) {s…

自动化测试之web自动化(Selenium)

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

【JavaEE初阶】 JVM 运行时数据区简介

文章目录 &#x1f343;前言&#x1f332;堆&#xff08;线程共享&#xff09;&#x1f384;Java虚拟机栈&#xff08;线程私有&#xff09;&#x1f38b;本地方法栈&#xff08;线程私有&#xff09;&#x1f333;程序计数器&#xff08;线程私有&#xff09;&#x1f334;方法…

尤雨溪:Vue 未来展望新的一轮

十年&#xff0c;一个既漫长又短暂的时光跨度&#xff0c;对于技术世界来说&#xff0c;更是沧海桑田的瞬间。在这十年里&#xff0c;Vue.js 从无到有&#xff0c;从默默无闻到蜚声全球&#xff0c;不仅改变了前端开发的面貌&#xff0c;更成为了无数开发者手中的得力工具。 在…

【漏洞复现】-用友CRM 任意文件读取漏洞

免责声明&#xff1a; 本文内容为学习笔记分享&#xff0c;仅供技术学习参考&#xff0c;请勿用作违法用途&#xff0c;未授权的攻击属于非法行为&#xff01;文章中敏感信息均已做多层打马处理。任何个人和组织利用此文所提供的信息而造成的直接或间接后果和损失&#xff0c;…

MySQL面试题纯享版

基础内容 1、MySQL的架构分层 2、一条 SQL 查询语句的执行流程 3、如何查看 MySQL 服务被多少个客户端连接了&#xff1f; 4、 空闲连接会一直占用着吗&#xff1f; 5、MySQL 的连接数有限制吗&#xff1f; 6、 怎么解决长连接占用内存的问题&#xff1f; 7、执行器与存储引擎…

Java输入和输出处理

一、Java I/O 文件、内存、键盘--->程序--->文件、内存、控制台 二、文件 相关记录或放在一起的数据的集合 思考&#xff1a; Java程序如何访问文件属性&#xff1f; 解答&#xff1a; Java API:java.io.File类 三、File类 File类的常用方法 方法名称说明boole…

el-dialog封装组件

父页面 <template><div><el-button type"primary" click"visible true">展示弹窗</el-button><!-- 弹窗组件 --><PlayVideo v-if"visible" :visible.syncvisible /></div> </template><sc…

Hive的性能优化

1.调优概述 Hive 作为大数据领域常用的数据仓库组件&#xff0c;在设计和查询时要特别注意效率。影响 Hive 效率的几乎从不是数据量过大&#xff0c;而是数据倾斜、数据冗余、Job或I/O过多、MapReduce分配不合理等等。对 Hive 的调优既包含 Hive 的建表设计方面&#xff0c;对H…