ThreadPoolExecutor源码阅读流程图

news/2024/4/20 2:53:21/文章来源:https://blog.csdn.net/weixin_40353578/article/details/130332987

1.创建线程池

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

10–核心线程数
20–最大线程数
1 TimeUnit.MINUTES 非核心线程数存活时间 1分钟
new ArrayBlockingQueue(100) 阻塞队列类型 数组类型传入队列长度,需要无限长度可以使用链表类型LinkedBlockingDeque
线程工厂ThreadFactory和超出队列的策略ThreadPoolExecutor.AbortPolicy(抛出异常)暂时先按默认来。

ThreadPoolExecutor executor = new ThreadPoolExecutor
(10,20,1, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(100));

创建线程池的时候是不会启动线程的,需要在执行具体业务逻辑时候才会执行

2.ThreadPoolExecutor重要参数及方法介绍

//ctl Int原子操作类,32位,前三位代表线程池状态,后28位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//线程池5种状态
//RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】
private static final int RUNNING    = -1 << COUNT_BITS;
//SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
private static final int SHUTDOWN   =  0 << COUNT_BITS;//执行shutDown()方法
//STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】
private static final int STOP       =  1 << COUNT_BITS;//执行shutDownNow()方法
//TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
private static final int TIDYING    =  2 << COUNT_BITS;
//TERMINATED状态 terminated()执行完之后就会转变为TERMINATED
private static final int TERMINATED =  3 << COUNT_BITS;//获取线程池状态    
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
//获取当前工作线程数
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.1线程的五种状态

  • RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】 11100000 00000000 00000000 00000000
  • SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
    00000000 00000000 00000000 00000000
  • STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】 00100000 00000000 00000000 00000000
  • TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
    00100000 00000000 00000000 00000000
  • TERMINATED状态 terminated()执行完之后就会转变为TERMINATED 01100000 00000000 00000000 00000000
private static boolean runStateLessThan(int c, int s) {return c < s;
}private static boolean runStateAtLeast(int c, int s) {return c >= s;
}private static boolean isRunning(int c) {return c < SHUTDOWN;
}/*** Attempts to CAS-increment the workerCount field of ctl.* 通过CAS来对当前工作线程数增加*/
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}/*** Attempts to CAS-decrement the workerCount field of ctl.* 通过CAS来对当前工作线程数减少*/
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}

任务执行流程图

在这里插入图片描述

3.提交任务execute

executor.execute(new Runnable() {@Overridepublic void run() {//业务代码}
});
   public void execute(Runnable command) {if (command == null)throw new NullPointerException();//获取ctl 初始值为ctlOf(RUNNING, 0) 运行状态,工作线程数0int c = ctl.get();//计算获取工作线程数<核心线程数if (workerCountOf(c) < corePoolSize) {//当前command增加为核心工作线程,添加失败下面会进行入队操作if (addWorker(command, true))return;c = ctl.get();}//判断线程池状态(判断是因为防止别的线程把状态进行修改)//workQueue.offer(command) 加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再对线程池状态二次检查,如果不是running则移除队列if (! isRunning(recheck) && remove(command))//拒绝策略,默认抛出异常reject(command);else if (workerCountOf(recheck) == 0)//这里就是执行队列中的任务,下面addWorker里面有体现和讲解addWorker(null, false);}//线程池达到最大了的maxPool,添加失败执行拒绝策略else if (!addWorker(command, false))reject(command);}

3.1 submit

Future<?> submit = executor.submit(new Runnable() {@Overridepublic void run() {//业务代码}
});

这个里面执行了execute,多了一个返回Future

    public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

4.addWorker

这里不同版本jdk有差异

private boolean addWorker(Runnable firstTask, boolean core) {//类似于gotoretry:for (int c = ctl.get();;) {// 线程池状态>=SHUTDOWN 并且 线程池状态>=STOP或者传入的任务!=null或者阻塞队列为空则返回if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {////判断工作的线程是否超过核心线程数或者最大线程数,addWork时候会传入coreif (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;//如果没有超过核心线程数或者最大线程数,这里通过cas对工作线程数量增加,多个竞争失败的话循环cas操作if (compareAndIncrementWorkerCount(c))break retry;//跳出外层循环c = ctl.get();  // Re-read ctl//如果线程池状态>=SHUTDOWN 跳到外层循环继续执行if (runStateAtLeast(c, SHUTDOWN))continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建任务Worker,会利用线程工厂去创建一个线程默认的是/***  Worker(Runnable firstTask) {*  //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0*  // 正常应该是acquire时候+1  release时候-1 这里重写过方法*  setState(-1); *  this.firstTask = firstTask;*  this.thread = getThreadFactory().newThread(this); }**/w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//这里的mainLock是对Workers进行操作的,防止出现并发问题//用锁是因为private final HashSet<Worker> workers = new HashSet<>(); 这个不是线程安全的final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int c = ctl.get();//线程池如果是RUNNING状态// 或者状态<STOP并且传入的任务为空 这个是从阻塞队列里面拿任务执行if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive()) // 如果线程已经在运行,就抛出异常throw new IllegalThreadStateException();//添加任务到工作线程的容器里workers.add(w);int s = workers.size();//largestPoolSize 这个是记录工作线程数,没看到具体作用,但既然有肯定是有用的if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//这里才到线程运行if (workerAdded) {t.start();workerStarted = true;}}} finally {//这里类似于一个回滚操作,异常情况会对worker进行移除,修改ctlif (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

5.Worker相关

Worker类

5.1 构造器

   Worker(Runnable firstTask) {//这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0// 正常应该是acquire时候+1  release时候-1 这里重写过方法setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); }

5.2 tryAcquire和tryRelease

重写过从+1,-1变成cas为1和设置为0,0代表执行完任务空闲,1代表在执行任务,里面有个

protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;
}

5.3 runWork

public void run() {runWorker(this);}final void runWorker(Worker w) {//获取当前工作线程Thread wt = Thread.currentThread();//获取需要执行的任务Runnable task = w.firstTask;w.firstTask = null;w.unlock(); boolean completedAbruptly = true;try {//执行任务不为空 或者 队列中获取到了需要执行的任务//如果没有获取到getTask是会阻塞的while (task != null || (task = getTask()) != null) {w.lock();//如果线程池状态>=STOP 并且当前线程没有被打断//线程池被打断并且线程池状态>=STOP 并且当前线程没有被打断//这里是对线程池状态作验证,如果状态发生了变更则要去尝试中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行前切面 可以用来记录工作中线程和计算空闲线程,Tomcat线程池有这个行为beforeExecute(wt, task);try {task.run();//执行后或异常切面afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;//执行任务数w.completedTasks++;w.unlock();}}//正常执行才会为false,表示正常退出completedAbruptly = false;} finally {//执行失败completedAbruptly为trueprocessWorkerExit(w, completedAbruptly);}}    

5.4 getTask()

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 线程池状态不为RUNNING,队列为空就不需要处理任务了,直接返回空,上层runWorker也会正常退出循环if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}//工作中的线程数量int wc = workerCountOf(c);// 核心线程是否超时回收标志,可以通过executor.allowCoreThreadTimeOut(true);设置//工作线程数量>核心线程数量//用来判断是否是无限阻塞boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//大于最大线程数或者超时 并且 工作线程数量>1或者队列为空 则ctl减少// && (wc > 1 || workQueue.isEmpty()) 这个判断就是要留下至少一个线程去处理队列中的任务if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//超时阻塞和无限阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//超时,循环时会去处理返回nulltimedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

6.shutdown()

shutdown会把线程池状态修改为SHUTDOWN,提交新任务会抛出异常,但会继续执行队列中的任务。

  public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为SHOUTDOWN,并修改ctladvanceRunState(SHUTDOWN);//这里会中断工作中的线程interruptIdleWorkers();onShutdown(); // 空方法} finally {mainLock.unlock();}tryTerminate();
}
//中间还有个方法,传入的onlyOne为false
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//遍历Workers,遍历前加锁for (Worker w : workers) {Thread t = w.thread;//把没有被打断并且没有在工作中的线程打断//获取到锁说明线程是空闲的,没有获取到锁说明在执行任务if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}

7.shutdownNow()

shutdownNow会把线程池状态修改为STOP,提交新任务会抛出异常,也不执行队列中的任务。但会返回队列中的任务。

List<Runnable> runnables = executor.shutdownNow();
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为STOP,并修改ctladvanceRunState(STOP);//中断线程interruptWorkers();//返回队列中的任务tasks = drainQueue();} finally {mainLock.unlock();}//最后一个线程结束时候会把线程池状态改为TERMINATEDtryTerminate();return tasks;
}
private void interruptWorkers() {//中断所有工作线程for (Worker w : workers)w.interruptIfStarted();
}void interruptIfStarted() {Thread t;//getState() >= 0 代表空闲线程和正常执行中的线程,不为空并且没有被打断的就执行打断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_103199.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Automa函数学习(三)

从变量中获取数据 当我们想要用automa获取文本标签获取到网页的文本内容后,想要将获取到的文本内容当做参数往后面的标签里进行传递时就需要用到automa提供的传参格式 {{ variables.自定义参数名}} 举例: 先建立打开百度首页工作流 前面自定义的变量名为text,所以这里参数拼接…

开放式耳机有什么好处,盘点几款性能不错的开放式耳机

随着人们对生活质量要求的提高&#xff0c;大家在运动的时候都喜欢戴上耳机&#xff0c;享受运动的乐趣。但是传统耳机戴久了之后就会出现耳朵酸痛的情况&#xff0c;这是因为传统耳机佩戴方式是通过空气振动来传递声音&#xff0c;而人在运动时就会伴随着大量的汗水&#xff0…

深入学习RabbitMQ五种模式(一)

1.安装erlang 下载otp_win64_25.3.exe https://www.erlang.org/downloads erlang安装完成&#xff0c;需要配置erlang环境变量 ERLANG_HOMEE:\software\Erlang OTPPATH%PATH%;%ERLANG_HOME%\bin; 2.安装RabbitMQ 下载rabbitmq-server-3.11.13.exe https://www.rabbitmq.com/dow…

【Python 协程详解】

0.前言 前面讲了线程和进程&#xff0c;其实python还有一个特殊的线程就是协程。 协程不是计算机提供的&#xff0c;计算机只提供&#xff1a;进程、线程。协程是人工创造的一种用户态切换的微进程&#xff0c;使用一个线程去来回切换多个进程。 为什么需要协程&#xff1f; …

IntelliJ IDEA 接入ChatGPT (免费,无需注册)生产力被干爆了!

IntelliJ IDEA 接入ChatGPT 前言 : 今天给大家介绍一款好用的 IntelliJ IDEA ChatGPT 插件 可以帮助我们写代码&#xff0c;以及语言上的处理工作&#xff0c;以及解释代码。让我们的生产力大大提高&#xff01; 一. ChatGPT-Plus 功能介绍 支持最新idea版本AI询问功能,写好…

Adobe Photoshop 软件下载

Adobe Photoshop&#xff0c;简称“PS”&#xff0c;是由Adobe Systems开发和发行的图像处理软件。Photoshop主要处理以像素所构成的数字图像。 时至今日&#xff0c;Adobe Photoshop 已经成为当今世界上最流行、应用最广泛的图像处理软件。不但设计专业的学生要系统的学习这个…

智能建筑中电力监控系统的应用与产品选型

摘要&#xff1a;近几十年&#xff0c;中国现代化经济不断发展&#xff0c;计算机技术、信息技术等相关产业也取得了飞跃性的进步。随着商业、生活以及公共建筑不断提高智能管理和节能的要求&#xff0c;电力监控系统开始逐渐渗入人们的日常生活&#xff0c;发挥着不可替代的作…

算法刷题|0-1背包问题、416.分割等和子集

0-1背包问题 什么是0-1背包&#xff1f; 有i个物品和一个容量为j的背包&#xff0c;每个物品有重量和价值两个属性&#xff1b;求容量为j的背包能装的物品的最大价值是多少。每个物品智能使用一次。 二维dp数组 dp[i][j]的含义&#xff1a;表示从前i个物品中&#xff0c;当前…

C++中引用的基本内容

个人主页&#xff1a;平行线也会相交 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 平行线也会相交 原创 收录于专栏【C之路】 引用&#xff0c;其实没啥特别的&#xff0c;就是起外号&#xff0c;或者说起小名。就比如说孙悟空就有很多外号&#xff0c;如…

为何C语言的函数调用要用到堆栈,而汇编却不需要自定义栈

一 ≠ 汇编不需要堆栈 汇编中一般不初始化&#xff0c;也就是直接使用系统的堆栈而已&#xff0c;自己定义堆栈还是要初始化的。 之前看了很多关于uboot的分析&#xff0c;其中就有说要为C语言的运行&#xff0c;准备好堆栈。 而自己在Uboot的start.S汇编代码中&#xff0c…

一文详细介绍查看和启用nginx日志(access.log和error.log),nginx错误日志的安全级别,自定义访问日志中的格式

文章目录 1. 文章引言2. Nginx访问日志(access.log)2.1 简述访问日志2.2 启用Nginx访问日志2.3 自定义访问日志中的格式 3. Nginx错误日志(error.log)3.1 简述错误日志3.2 启用错误日志3.3 Nginx错误日志的安全级别 4. 文末总结 1. 文章引言 我们在实际工作中&#xff0c;经常使…

学习spark笔记

✨ 学习 Spark 和 Scala 一 ​ &#x1f426;Spark 算子 spark常用算子详解&#xff08;小部分算子使用效果与描述不同&#xff09; Spark常用的算子以及Scala函数总结 Spark常用Transformations算子(二) Transformation 算子(懒算子)&#xff1a;不会提交spark作业&#…

SLAM论文速递:SLAM—— 流融合:基于光流的动态稠密RGB-D SLAM—4.25(2)

论文信息 题目&#xff1a; FlowFusion:Dynamic Dense RGB-D SLAM Based on Optical Flow 流融合:基于光流的动态稠密RGB-D SLAM论文地址&#xff1a; https://arxiv.org/pdf/2003.05102.pdf发表期刊&#xff1a; 2020 IEEE International Conference on Robotics and Automa…

flex布局属性详解

Flex布局 flex-directionflex-wrapflex-flowjustify-contentalign-itemsalign-content其他orderflexalign-self 含义:Flex是Flexible Box的缩写&#xff0c;意为”弹性布局”&#xff0c;用来为盒状模型提供最大的灵活性。 flex-direction flex-direction属性决定主轴的方向&…

危险区域闯入识别系统 yolov8

危险区域闯入识别系统通过YOLOv8网络模型技术&#xff0c;危险区域闯入识别系统对现场画面中发现有人违规闯入禁区&#xff0c;系统立即抓拍告警同步回传后台。YOLOv8 提供了一个全新的 SOTA 模型&#xff0c;包括 P5 640 和 P6 1280 分辨率的目标检测网络和基于 YOLACT 的实例…

Model-Contrastive Federated Learning 论文解读(CVPR 2021)

Model-Contrastive Federated Learning 论文解读 对比学习SimCLR 对比学习的基本想法是同类相聚&#xff0c;异类相离 从不同的图像获得的表征应该相互远离&#xff0c;从相同的图像获得的表征应该彼此靠近 具体框架&#xff1a; T随机数据增强模块&#xff1a;随机裁剪然…

光波导相控阵技术

在简述电光效应和热光效应的基础上综述了国内外光波导相控阵技术研究进展&#xff0c;包括一维和二维光波导相控阵的技术途径、结构特点和性能指标&#xff0c;给出了光波导相控阵的优势以及在激光雷达、成像等领域的应用前景。结果表明&#xff0c;光波导相控阵技术正向着大扫…

JavaScript Debugger 调试断点模式

在代码中加入debugger&#xff0c;相当于断点停顿&#xff0c;可用于查看变量传递情况&#xff0c;比如&#xff1a;Vue组件中生命周期onLoad(options) &#xff0c;在上一页面进入下一页面后&#xff0c;传递进来的参数值。 备注 &#xff1a;options 参数为字符串&#xff0…

从需求分析到上线发布,一步步带你开发收废品小程序

在如今的环保和可持续性的大趋势下&#xff0c;废品回收已经成为了人们日常生活中不可或缺的一部分。收废品小程序的开发可以帮助人们更方便地找到回收废品的地点&#xff0c;并有效减少废品对环境造成的污染。因此&#xff0c;我们的收废品小程序需要满足以下需求&#xff1a;…

2023年电信推出新套餐:月租19元=135G流量+长期套餐+无合约期!

在三大运营商推出的流量卡当中&#xff0c;电信可以说是性价比最高的一个&#xff0c;相对于其他两家运营商&#xff0c;完全符合我们低月租&#xff0c;大流量的要求&#xff0c;所以&#xff0c;今天小编介绍的还是电信流量卡。 在这里说一下&#xff0c;小编推荐的卡都是免…