1.创建线程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
10–核心线程数
20–最大线程数
1 TimeUnit.MINUTES 非核心线程数存活时间 1分钟
new ArrayBlockingQueue(100) 阻塞队列类型 数组类型传入队列长度,需要无限长度可以使用链表类型LinkedBlockingDeque
线程工厂ThreadFactory和超出队列的策略ThreadPoolExecutor.AbortPolicy(抛出异常)暂时先按默认来。
ThreadPoolExecutor executor = new ThreadPoolExecutor
(10,20,1, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(100));
创建线程池的时候是不会启动线程的,需要在执行具体业务逻辑时候才会执行
2.ThreadPoolExecutor重要参数及方法介绍
//ctl Int原子操作类,32位,前三位代表线程池状态,后28位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//线程池5种状态
//RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
private static final int SHUTDOWN = 0 << COUNT_BITS;//执行shutDown()方法
//STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】
private static final int STOP = 1 << COUNT_BITS;//执行shutDownNow()方法
//TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED状态 terminated()执行完之后就会转变为TERMINATED
private static final int TERMINATED = 3 << COUNT_BITS;//获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//获取当前工作线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.1线程的五种状态
- RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】 11100000 00000000 00000000 00000000
- SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
00000000 00000000 00000000 00000000 - STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】 00100000 00000000 00000000 00000000
- TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
00100000 00000000 00000000 00000000 - TERMINATED状态 terminated()执行完之后就会转变为TERMINATED 01100000 00000000 00000000 00000000
private static boolean runStateLessThan(int c, int s) {return c < s;
}private static boolean runStateAtLeast(int c, int s) {return c >= s;
}private static boolean isRunning(int c) {return c < SHUTDOWN;
}/*** Attempts to CAS-increment the workerCount field of ctl.* 通过CAS来对当前工作线程数增加*/
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}/*** Attempts to CAS-decrement the workerCount field of ctl.* 通过CAS来对当前工作线程数减少*/
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}
任务执行流程图
3.提交任务execute
executor.execute(new Runnable() {@Overridepublic void run() {//业务代码}
});
public void execute(Runnable command) {if (command == null)throw new NullPointerException();//获取ctl 初始值为ctlOf(RUNNING, 0) 运行状态,工作线程数0int c = ctl.get();//计算获取工作线程数<核心线程数if (workerCountOf(c) < corePoolSize) {//当前command增加为核心工作线程,添加失败下面会进行入队操作if (addWorker(command, true))return;c = ctl.get();}//判断线程池状态(判断是因为防止别的线程把状态进行修改)//workQueue.offer(command) 加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再对线程池状态二次检查,如果不是running则移除队列if (! isRunning(recheck) && remove(command))//拒绝策略,默认抛出异常reject(command);else if (workerCountOf(recheck) == 0)//这里就是执行队列中的任务,下面addWorker里面有体现和讲解addWorker(null, false);}//线程池达到最大了的maxPool,添加失败执行拒绝策略else if (!addWorker(command, false))reject(command);}
3.1 submit
Future<?> submit = executor.submit(new Runnable() {@Overridepublic void run() {//业务代码}
});
这个里面执行了execute,多了一个返回Future
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
4.addWorker
这里不同版本jdk有差异
private boolean addWorker(Runnable firstTask, boolean core) {//类似于gotoretry:for (int c = ctl.get();;) {// 线程池状态>=SHUTDOWN 并且 线程池状态>=STOP或者传入的任务!=null或者阻塞队列为空则返回if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {////判断工作的线程是否超过核心线程数或者最大线程数,addWork时候会传入coreif (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;//如果没有超过核心线程数或者最大线程数,这里通过cas对工作线程数量增加,多个竞争失败的话循环cas操作if (compareAndIncrementWorkerCount(c))break retry;//跳出外层循环c = ctl.get(); // Re-read ctl//如果线程池状态>=SHUTDOWN 跳到外层循环继续执行if (runStateAtLeast(c, SHUTDOWN))continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建任务Worker,会利用线程工厂去创建一个线程默认的是/*** Worker(Runnable firstTask) {* //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0* // 正常应该是acquire时候+1 release时候-1 这里重写过方法* setState(-1); * this.firstTask = firstTask;* this.thread = getThreadFactory().newThread(this); }**/w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//这里的mainLock是对Workers进行操作的,防止出现并发问题//用锁是因为private final HashSet<Worker> workers = new HashSet<>(); 这个不是线程安全的final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int c = ctl.get();//线程池如果是RUNNING状态// 或者状态<STOP并且传入的任务为空 这个是从阻塞队列里面拿任务执行if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive()) // 如果线程已经在运行,就抛出异常throw new IllegalThreadStateException();//添加任务到工作线程的容器里workers.add(w);int s = workers.size();//largestPoolSize 这个是记录工作线程数,没看到具体作用,但既然有肯定是有用的if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//这里才到线程运行if (workerAdded) {t.start();workerStarted = true;}}} finally {//这里类似于一个回滚操作,异常情况会对worker进行移除,修改ctlif (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
5.Worker相关
5.1 构造器
Worker(Runnable firstTask) {//这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0// 正常应该是acquire时候+1 release时候-1 这里重写过方法setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); }
5.2 tryAcquire和tryRelease
重写过从+1,-1变成cas为1和设置为0,0代表执行完任务空闲,1代表在执行任务,里面有个
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;
}
5.3 runWork
public void run() {runWorker(this);}final void runWorker(Worker w) {//获取当前工作线程Thread wt = Thread.currentThread();//获取需要执行的任务Runnable task = w.firstTask;w.firstTask = null;w.unlock(); boolean completedAbruptly = true;try {//执行任务不为空 或者 队列中获取到了需要执行的任务//如果没有获取到getTask是会阻塞的while (task != null || (task = getTask()) != null) {w.lock();//如果线程池状态>=STOP 并且当前线程没有被打断//线程池被打断并且线程池状态>=STOP 并且当前线程没有被打断//这里是对线程池状态作验证,如果状态发生了变更则要去尝试中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行前切面 可以用来记录工作中线程和计算空闲线程,Tomcat线程池有这个行为beforeExecute(wt, task);try {task.run();//执行后或异常切面afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;//执行任务数w.completedTasks++;w.unlock();}}//正常执行才会为false,表示正常退出completedAbruptly = false;} finally {//执行失败completedAbruptly为trueprocessWorkerExit(w, completedAbruptly);}}
5.4 getTask()
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 线程池状态不为RUNNING,队列为空就不需要处理任务了,直接返回空,上层runWorker也会正常退出循环if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}//工作中的线程数量int wc = workerCountOf(c);// 核心线程是否超时回收标志,可以通过executor.allowCoreThreadTimeOut(true);设置//工作线程数量>核心线程数量//用来判断是否是无限阻塞boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//大于最大线程数或者超时 并且 工作线程数量>1或者队列为空 则ctl减少// && (wc > 1 || workQueue.isEmpty()) 这个判断就是要留下至少一个线程去处理队列中的任务if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//超时阻塞和无限阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//超时,循环时会去处理返回nulltimedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
6.shutdown()
shutdown会把线程池状态修改为SHUTDOWN,提交新任务会抛出异常,但会继续执行队列中的任务。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为SHOUTDOWN,并修改ctladvanceRunState(SHUTDOWN);//这里会中断工作中的线程interruptIdleWorkers();onShutdown(); // 空方法} finally {mainLock.unlock();}tryTerminate();
}
//中间还有个方法,传入的onlyOne为false
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//遍历Workers,遍历前加锁for (Worker w : workers) {Thread t = w.thread;//把没有被打断并且没有在工作中的线程打断//获取到锁说明线程是空闲的,没有获取到锁说明在执行任务if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}
7.shutdownNow()
shutdownNow会把线程池状态修改为STOP,提交新任务会抛出异常,也不执行队列中的任务。但会返回队列中的任务。
List<Runnable> runnables = executor.shutdownNow();
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为STOP,并修改ctladvanceRunState(STOP);//中断线程interruptWorkers();//返回队列中的任务tasks = drainQueue();} finally {mainLock.unlock();}//最后一个线程结束时候会把线程池状态改为TERMINATEDtryTerminate();return tasks;
}
private void interruptWorkers() {//中断所有工作线程for (Worker w : workers)w.interruptIfStarted();
}void interruptIfStarted() {Thread t;//getState() >= 0 代表空闲线程和正常执行中的线程,不为空并且没有被打断的就执行打断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}