文章目录
- 异步编程
- FutureTask应用&源码分析
- FutureTask介绍
- FutureTask应用
- FutureTask源码分析
- FutureTask中的核心属性
- FutureTask的run方法
- FutureTask的set&setException方法
- FutureTask的cancel方法
- FutureTask的get方法
- FutureTask的finishCompletion方法
- CompletableFuture应用&源码分析
- CompletableFuture介绍
- CompletableFuture应用
- supplyAsync
- runAsync
- thenApply、thenApplyAsync
- thenAccept、thenAcceptAsync
- thenRun、thenRunAsync
- thenCombine、thenAcceptBoth、runAfterBoth
- applyToEither、acceptEither、runAfterEither
- exceptionally、henCompose、handle
- allOf,anyOf
- CompletableFuture源码分析
- 当前任务执行方式
- 任务编排的存储&执行方式
- 任务编排流程
- 查看后置任务执行时机
- CompletableFuture执行流程图
异步编程
FutureTask应用&源码分析
FutureTask介绍
FutureTask是一个可以取消异步任务的类。FutureTask对Future做的一个基本实现。可以调用方法区开始和取消一个任务。
一般是配合Callable去使用。
异步任务启动之后,可以获取一个绑定当前异步任务的FutureTask。
可以基于FutureTask的方法去取消任务,查看任务是否结果,以及获取任务的返回结果。
FutureTask内部的整体结构中,实现了RunnableFuture的接口,这个接口又继承了Runnable, Future这个两个接口。所以FutureTask也可以作为任务直接交给线程池去处理。
FutureTask应用
大方向是FutureTask对任务的控制:
- 任务执行过程中状态的控制
- 任务执行完毕后,返回结果的获取
FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以。
public static void main(String[] args) throws InterruptedException {FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println("任务开始执行......");Thread.sleep(2000);System.out.println("任务执行完毕......");return "OK!";});// 构建线程池ExecutorService threadPool = Executors.newFixedThreadPool(10);// 构建线程池threadPool.execute(futureTask);// futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法// run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。// 如果希望任务可以反复被执行,需要去调用runAndReset方法futureTask.run();// 对返回结果的获取,类似阻塞队列的poll方法// 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
// try {
// String s = futureTask.get(3000, TimeUnit.MICROSECONDS);
// System.out.println("返回结果:" + s);
// } catch (Exception e) {
// System.out.println("异常返回:" + e.getMessage());
// e.printStackTrace();
// }// 对返回结果的获取,类似阻塞队列的take方法,死等结果
// try {
// String s = futureTask.get();
// System.out.println("任务结果:" + s);
// } catch (InterruptedException e) {
// e.printStackTrace();
// } catch (ExecutionException e) {
// e.printStackTrace();
// }// 对任务状态的控制System.out.println("任务结束了么?:" + futureTask.isDone());Thread.sleep(1000);System.out.println("任务结束了么?:" + futureTask.isDone());Thread.sleep(1000);System.out.println("任务结束了么?:" + futureTask.isDone());}
FutureTask源码分析
看FutureTask的源码,要从几个方向去看:
- 先查看FutureTask中提供的一些状态
- 再查看任务的执行过程
FutureTask中的核心属性
清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的。
/**
FutureTask的核心属性
FutureTask任务的状态流转
* NEW -> COMPLETING -> NORMAL 任务正常执行,并且返回结果也正常返回
* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是结果是异常
* NEW -> CANCELLED 任务被取消
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;/** 需要执行任务,会被赋值到这个属性 */
private Callable<V> callable;
/** 任务的任务结果要存储在这几个属性中 */
private Object outcome;
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待返回结果的线程Node对象, */
private volatile WaitNode waiters;static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}
FutureTask的run方法
任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理。
// 当线程池执行FutureTask任务时,会调用的方法
public void run() {// 如果当前任务状态不是NEW,直接return告辞if (state != NEW ||// 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程// 如果CAS失败,直接return告辞!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 将要执行的任务拿到Callable<V> c = callable;// 健壮性判断,保证任务不是null// 再次判断任务的状态是NEW(DCL)if (c != null && state == NEW) {// 执行任务// result:任务的返回结果// ran:如果为true,任务正常结束。 如果为false,任务异常结束。V result;boolean ran;try {// 执行任务result = c.call();// 正常结果,ran设置为trueran = true;} catch (Throwable ex) {// 如果任务执行期间出了异常 // 返回结果置位nullresult = null;// ran设置为falseran = false;// 封装异常结果setException(ex);}if (ran)// 封装正常结果set(result);}} finally {// 将执行任务的线程置位nullrunner = null;// 拿到任务的状态int s = state;// 如果状态大于等于INTERRUPTINGif (s >= INTERRUPTING)// 进来代表任务中断,做一些后续处理handlePossibleCancellationInterrupt(s);}
}
FutureTask的set&setException方法
任务执行完毕后,修改任务的状态以及封装任务的结果。
// 没有异常的时候,正常返回结果
protected void set(V v) {// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将返回结果赋值给 outcome 属性outcome = v;// 将任务状态变为NORMAL,正常结束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}// 任务执行期间出现了异常,这边要封装结果
protected void setException(Throwable t) {// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将异常信息封装到 outcome 属性outcome = t;// 将任务状态变为EXCEPTIONAL,异常结束UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}
}
FutureTask的cancel方法
任务取消的一个方式
- 任务直接从NEW状态转换为CANCEL
- 任务从NEW状态变成INTERRUPTING,然后再转换为INTERRUPTED
// 取消任务操作
public boolean cancel(boolean mayInterruptIfRunning) {// 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTINGif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // in case call to interrupt throws exception// 如果mayInterruptIfRunning为true // 就需要中断线程if (mayInterruptIfRunning) {try {// 拿到任务线程Thread t = runner;if (t != null)// 如果线程不为null,直接interruptt.interrupt();} finally { // final state// 将任务状态设置为INTERRUPTEDUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 任务结束后的一些处理~~ 一会看~~finishCompletion();}return true;
}
FutureTask的get方法
这个是线程获取FutureTask任务执行结果的方法
// 拿任务结果
public V get() throws InterruptedException, ExecutionException {// 获取任务的状态int s = state;// 要么是NEW,任务还没执行完// 要么COMPLETING,任务执行完了,结果还没封装好。if (s <= COMPLETING)// 让当前线程阻塞,等待结果s = awaitDone(false, 0L);// 最终想要获取结果,需要执行report方法return report(s);
}// 线程等待FutureTask结果的过程
private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 针对get方法传入了等待时长时,需要计算等到什么时间点final long deadline = timed ? System.nanoTime() + nanos : 0L;// 声明好需要的Node,queued:放到链表中了么?WaitNode q = null;boolean queued = false;for (;;) {// 查看线程是否中断,如果中断,从等待链表中移除,甩个异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 拿到状态int s = state;// 到这,说明任务结束了。if (s > COMPLETING) {if (q != null)// 如果之前封装了WaitNode,现在要清空q.thread = null;return s;}// 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了else if (s == COMPLETING) // cannot time out yetThread.yield();// 如果还没初始化WaitNode,初始化else if (q == null)q = new WaitNode();// 没放队列的话,直接放到waiters的前面else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 准备挂起线程,如果timed为true,挂起一段时间else if (timed) {// 计算出最多可以等待多久nanos = deadline - System.nanoTime();// 如果等待的时间没了if (nanos <= 0L) {// 移除当前的Node,返回任务状态removeWaiter(q);return state;}// 等一会LockSupport.parkNanos(this, nanos);}else// 死等LockSupport.park(this);}
}// get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果
private V report(int s) throws ExecutionException {// 拿到outcome 返回结果Object x = outcome;// 如果任务状态是NORMAL,任务正常结束,返回结果if (s == NORMAL)return (V)x;// 如果任务状态大于等于取消if (s >= CANCELLED)// 直接抛出异常throw new CancellationException();// 到这就是异常结束throw new ExecutionException((Throwable)x);
}
FutureTask的finishCompletion方法
只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法。
而这个方法其实就是唤醒那些执行get方法等待任务结果的线程。
// 任务结束后触发
private void finishCompletion() {// assert state > COMPLETING;// 在任务结束后,需要唤醒for (WaitNode q; (q = waiters) != null;) {// 第一步直接以CAS的方式将WaitNode置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 拿到了Node中的线程Thread t = q.thread;// 如果线程不为nullif (t != null) {// 第一步先置位nullq.thread = null;// 直接唤醒这个线程LockSupport.unpark(t);}// 拿到当前Node的nextWaitNode next = q.next;// next为null,代表已经将全部节点唤醒了吗,跳出循环if (next == null)break;// 将next置位nullq.next = null; // unlink to help gc// q的引用指向nextq = next;}break;}}// 任务结束后,可以基于这个扩展方法,记录一些信息done();// 任务执行完,把callable具体任务置位nullcallable = null; // to reduce footprint
}
CompletableFuture应用&源码分析
CompletableFuture介绍
平时多线程开发一般就是使用Runnable,Callable,Thread,FutureTask,ThreadPoolExecutor 这些内容和并发编程息息相关。相对来对来说成本都不高,多多使用是可以熟悉这些内容。这些内容组合在一起去解决一些并发编程的问题时,很多时候没有办法很方便的去完成异步编程的操作。
Thread + Runnable:执行异步任务,但是没有返回结果。
Thread + Callable + FutureTask:完整一个可以有返回结果的异步任务。
- 获取返回结果,如果基于get方法获取,线程需要挂起在WaitNode里。
- 获取返回结果,也可以基于isDone判断任务的状态,但是这里需要不断轮询
上述的方式都是有一定的局限性的。
比如说任务A,任务B,还有任务C。其中任务B还有任务C执行的前提是任务A先完成,再执行任务B和任务C。
如果任务的执行方式逻辑比较复杂,可能需要业务线程导出阻塞等待,或者是大量的任务线程去编一些任务执行的业务逻辑。对开发成本来说比较高。
CompletableFuture就是帮你处理这些任务之间的逻辑关系,编排好任务的执行方式后,任务会按照规划好的方式一步一步执行,不需要让业务线程去频繁的等待。
CompletableFuture应用
CompletableFuture应用还是需要一点点的成本的。
首先对CompletableFuture提供的函数式编程中三个函数有一个掌握。
Supplier<U> // 生产者,没有入参,有返回结果
Consumer<T> // 消费者,有入参,但是没有返回结果
Function<T,U>// 函数,有入参,又有返回结果
supplyAsync
CompletableFuture如果不提供线程池的话,默认使用的ForkJoinPool,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。
public static void main(String[] args) {// 生产者,可以指定返回结果CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务开始执行");System.out.println("异步任务执行结束");return "返回结果";});String result1 = firstTask.join();String result2 = null;try {result2 = firstTask.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(result1 + "," + result2);
}
runAsync
当前方式既不会接收参数,也不会返回任何结果,非常基础的任务编排方式。
public static void main(String[] args) throws IOException {CompletableFuture.runAsync(()->{System.out.println("任务go");System.out.println("任务done");});System.in.read();
}
thenApply、thenApplyAsync
有任务A,还有任务B。
任务B需要在任务A执行完毕后再执行。
而且任务B需要任务A的返回结果。
任务B自身也有返回结果。
thenApply可以拼接异步任务,前置任务处理完之后,将返回结果交给后置任务,然后后置任务再执行。
thenApply提供了带有Async的方法,可以指定每个任务使用的具体线程池。
public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(10);/*CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {String id = UUID.randomUUID().toString();System.out.println("执行任务A:" + id);return id;});CompletableFuture<String> taskB = taskA.thenApply(result -> {System.out.println("任务B获取到任务A结果:" + result);result = result.replace("-", "");return result;});System.out.println("main线程拿到结果:" + taskB.join());*/CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {String id = UUID.randomUUID().toString();System.out.println("执行任务A:" + id + "," + Thread.currentThread().getName());return id;}).thenApplyAsync(result -> {System.out.println("任务B获取到任务A结果:" + result + "," + Thread.currentThread().getName());result = result.replace("-", "");return result;}, executor);System.out.println("main线程拿到结果:" + taskB.join());
}
thenAccept、thenAcceptAsync
套路和thenApply一样,都是任务A和任务B的拼接。
前置任务需要有返回结果,后置任务会接收前置任务的结果,返回后置任务没有返回值。
public static void main(String[] args) throws IOException {CompletableFuture.supplyAsync(() -> {System.out.println("任务A");return "abcdefg";}).thenAccept(result -> {System.out.println("任务b,拿到结果处理:" + result);});System.in.read();
}
thenRun、thenRunAsync
套路和thenApply,thenAccept一样,都是任务A和任务B的拼接。
前置任务没有返回结果,后置任务不接收前置任务结果,后置任务也会有返回结果。
public static void main(String[] args) throws IOException {CompletableFuture.runAsync(() -> {System.out.println("任务A!!");}).thenRun(() -> {System.out.println("任务B!!");});System.in.read();
}
thenCombine、thenAcceptBoth、runAfterBoth
比如有任务A,任务B,任务C。任务A和任务B并行执行,等到任务A和任务B全部执行完毕后,再执行任务C。
A+B ------ C
基于前面thenApply,thenAccept,thenRun知道了一般情况三种任务的概念。
thenCombine以及thenAcceptBoth还有runAfterBoth的区别是一样的。
public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 78;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 66;}), (resultA, resultB) -> {System.out.println("任务C");int resultC = resultA + resultB;return resultC;});System.out.println(taskC.join());System.in.read();
}
applyToEither、acceptEither、runAfterEither
比如有任务A,任务B,任务C。任务A和任务B并行执行,只要任务A或者任务B执行完毕,开始执行任务C。
A or B ----- C
applyToEither,acceptEither,runAfterEither三个方法拼接任务的方式都是一样的。
区别依然是,可以接收结果并且返回结果,可以接收结果没有返回结果,不接收结果也没返回结果。
public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");return 78;}).applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");return 66;}), resultFirst -> {System.out.println("任务C");return resultFirst;});System.out.println(taskC.join());System.in.read();
}
exceptionally、henCompose、handle
exceptionally
这个也是拼接任务的方式,但是只有前面业务执行时出现异常了,才会执行当前方法来处理。
只有异常出现时,CompletableFuture的编排任务没有处理完时,才会触发。
thenCompose,handle
这两个也是异常处理的套路,可以根据方法描述发现,他的功能方向比exceptionally要更加丰富。
thenCompose可以拿到返回结果同时也可以拿到出现的异常信息,但是thenCompose本身是Consumer不能返回结果。无法帮你捕获异常,但是可以拿到异常返回的结果。
handle可以拿到返回结果同时也可以拿到出现的异常信息,并且也可以指定返回托底数据。可以捕获异常的,异常不会抛出去。
public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");// int i = 1 / 0;return 78;}).applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");return 66;}), resultFirst -> {System.out.println("任务A");return resultFirst;}).handle((r, ex) -> {System.out.println("handle:" + r);System.out.println("handle:" + ex);return -1;});/*.exceptionally(ex -> {System.out.println("exceptionally:" + ex);return -1;})*//*.whenComplete((r, ex) -> {System.out.println("whenComplete:" + r);System.out.println("whenComplete:" + ex);});*/System.out.println(taskC.join());System.in.read();
}
allOf,anyOf
allOf的方式是让内部编写多个CompletableFuture的任务,多个任务都执行完后,才会继续执行你后续拼接的任务。
allOf返回的CompletableFuture是void,没有返回结果。
public static void main(String[] args) throws IOException {CompletableFuture.allOf(CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务A");}),CompletableFuture.runAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务B");}),CompletableFuture.runAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务C");})).thenRun(() -> {System.out.println("任务D");});System.in.read();
}
anyOf是基于多个CompletableFuture的任务,只要有一个任务执行完毕就继续执行后续,最先执行完的任务做作为返回结果的入参。
public static void main(String[] args) throws IOException {CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务A");return "A";}),CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务B");return "B";}),CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务C");return "C";})).thenAccept(r -> {System.out.println("任务D执行," + r + "先执行完毕的");});System.in.read();
}
CompletableFuture源码分析
CompletableFuture的源码内容特别多。不需要把所有源码都看了,更多的是要掌握整个CompletableFuture的源码执行流程,以及任务的执行时机。
从CompletableFuture中比较简单的方法作为分析的入口,从而掌握整体执行的流程。
当前任务执行方式
将任务和CompletableFuture封装到一起,再执行封住好的具体对象的run方法即可。
// 提交任务到CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable) {// asyncPool:执行任务的线程池// runnable:具体任务。return asyncRunStage(asyncPool, runnable);
}// 内部执行的方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {// 对任务做非空校验if (f == null) throw new NullPointerException();// 直接构建了CompletableFuture的对象,作为最后的返回结果CompletableFuture<Void> d = new CompletableFuture<Void>();// 将任务和CompletableFuture对象封装为了AsyncRun的对象// 将封装好的任务交给了线程池去执行e.execute(new AsyncRun(d, f));// 返回构建好的CompletableFuturereturn d;
}// 封装任务的AsyncRun类信息
static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {// 声明存储CompletableFuture对象以及任务的成员变量CompletableFuture<Void> dep;Runnable fn;// 将传入的属性赋值给成员变量AsyncRun(CompletableFuture<Void> dep, Runnable fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}public final boolean exec() { run(); return true; }// 当前对象作为任务提交给线程池之后,必然会执行当前方法public void run() {// 声明局部变量CompletableFuture<Void> d; Runnable f;// 将成员变量赋值给局部变量,并且做非空判断if ((d = dep) != null && (f = fn) != null) {// help GC,将成员变量置位null,只要当前任务结束后,成员变量也拿不到引用。dep = null; fn = null;// 先确认任务没有执行。if (d.result == null) {try {// 直接执行任务f.run();// 当前方法是针对Runnable任务的,不能将结果置位null // 要给没有返回结果的Runnable做一个返回结果d.completeNull();} catch (Throwable ex) {// 异常结束!d.completeThrowable(ex);}}d.postComplete();}}
}
任务编排的存储&执行方式
首先如果要在前继任务处理后,执行后置任务的话。
有两种情况:
- 前继任务如果没有执行完毕,后置任务需要先放在stack栈结构中存储。
- 前继任务已经执行完毕了,后置任务就应该直接执行,不需要在往stack中存储了。
如果单独采用thenRun在一个任务后面指定多个后继任务,CompletableFuture无法保证具体的执行顺序,而影响执行顺序的是前继任务的执行时间,以及后置任务编排的时机。
任务编排流程
// 编排任务,前继任务搞定,后继任务再执行
public CompletableFuture<Void> thenRun(Runnable action) {// 执行了内部的uniRunStage方法, // null:线程池,现在没给。// action:具体要执行的任务return uniRunStage(null, action);
}// 内部编排任务方法
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {// 后继任务不能为null,健壮性判断if (f == null) throw new NullPointerException();// 创建CompletableFuture对象d,与后继任务f绑定CompletableFuture<Void> d = new CompletableFuture<Void>();// 如果线程池不为null,代表异步执行,将任务压栈// 如果线程池是null,先基于uniRun尝试下,看任务能否执行if (e != null || !d.uniRun(this, f, null)) {// 如果传了线程池,这边需要走一下具体逻辑 // e:线程池// d:后继任务的CompletableFuture// this:前继任务的CompletableFuture// f:后继任务UniRun<T> c = new UniRun<T>(e, d, this, f);// 将封装好的任务,push到stack栈结构// 只要前继任务没结束,这边就可以正常的将任务推到栈结构中// 放入栈中可能会失败push(c);// 无论压栈成功与否,都要尝试执行以下。c.tryFire(SYNC);}// 无论任务执行完毕与否,都要返回后继任务的CompletableFuturereturn d;
}
查看后置任务执行时机
任务在编排到前继任务时,因为前继任务已经结束了,这边后置任务会主动的执行。
// 后置任务无论压栈成功与否,都需要执行tryFire方法
static final class UniRun<T> extends UniCompletion<T,Void> {Runnable fn;// executor:线程池// dep:后置任务的CompletableFuture// src:前继任务的CompletableFuture// fn:具体的任务UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<Void> tryFire(int mode) {// 声明局部变量CompletableFuture<Void> d; CompletableFuture<T> a;// 赋值局部变量// (d = dep) == null:赋值加健壮性校验if ((d = dep) == null ||// 调用uniRun。// a:前继任务的CompletableFuture// fn:后置任务// 第三个参数:传入的是this,是UniRun对象!d.uniRun(a = src, fn, mode > 0 ? null : this))// 进到这,说明前继任务没结束,等!return null;dep = null; src = null; fn = null;return d.postFire(a, mode);}
}// 是否要主动执行任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {// 方法要么正常结束,要么异常结束Object r; Throwable x;// a == null:健壮性校验// (r = a.result) == null:判断前继任务结束了么? // f == null:健壮性校验if (a == null || (r = a.result) == null || f == null)// 到这代表任务没结束。return false;// 后置任务执行了没? == null,代表没执行if (result == null) {// 如果前继任务的结果是异常结束。如果前继异常结束,直接告辞,封装异常结果if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)completeThrowable(x, r);else// 到这,前继任务正常结束,后置任务正常执行try {// 如果基于tryFire(SYNC)进来,这里的C不为null,执行c.claim // 如果是因为没有传递executor,c就是null,不会执行c.claimif (c != null && !c.claim())// 如果返回false,任务异步执行了,直接return falsereturn false;// 如果claim没有基于线程池运行任务,那这里就是同步执行// 直接f.run了。f.run();// 封装Null结果completeNull();} catch (Throwable ex) {// 封装异常结果completeThrowable(ex);}}return true;
}// 异步的线程池处理任务
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// 只要有线程池对象,不为nullif (e == null)return true;executor = null; // disable// 基于线程池的execute去执行任务e.execute(this);}return false;
}
前继任务执行完毕后,基于嵌套的方式执行后置。
// A:嵌套了B+C, B:嵌套了D+E
// 前继任务搞定,遍历stack执行后置任务
// A任务处理完,解决嵌套的B和C
final void postComplete() {// f:前继任务的CompletableFuture// h:存储后置任务的栈结构CompletableFuture<?> f = this; Completion h;// (h = f.stack) != null:赋值加健壮性判断,要确保栈中有数据while ((h = f.stack) != null ||// 循环一次后,对后续节点的赋值以及健壮性判断,要确保栈中有数据(f != this && (h = (f = this).stack) != null)) {// t:当前栈中任务的后续任务CompletableFuture<?> d; Completion t;// 拿到之前的栈顶h后,将栈顶换数据if (f.casStack(h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}h.next = null; // detach}// 执行tryFire方法,f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}// 回来了 NESTED == -1
final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a;if ((d = dep) == null ||!d.uniHandle(a = src, fn, mode > 0 ? null : this))return null;dep = null; src = null; fn = null;// 内部会执行postComplete,运行B内部嵌套的D和Ereturn d.postFire(a, mode);
}