八、异步编程

news/2024/5/15 5:56:00/文章来源:https://blog.csdn.net/weixin_36091079/article/details/129267428

文章目录

  • 异步编程
  • FutureTask应用&源码分析
    • FutureTask介绍
    • FutureTask应用
    • FutureTask源码分析
      • FutureTask中的核心属性
      • FutureTask的run方法
      • FutureTask的set&setException方法
      • FutureTask的cancel方法
      • FutureTask的get方法
      • FutureTask的finishCompletion方法
  • CompletableFuture应用&源码分析
    • CompletableFuture介绍
    • CompletableFuture应用
      • supplyAsync
      • runAsync
      • thenApply、thenApplyAsync
      • thenAccept、thenAcceptAsync
      • thenRun、thenRunAsync
      • thenCombine、thenAcceptBoth、runAfterBoth
      • applyToEither、acceptEither、runAfterEither
      • exceptionally、henCompose、handle
      • allOf,anyOf
    • CompletableFuture源码分析
      • 当前任务执行方式
      • 任务编排的存储&执行方式
      • 任务编排流程
      • 查看后置任务执行时机
    • CompletableFuture执行流程图

异步编程

FutureTask应用&源码分析

FutureTask介绍

FutureTask是一个可以取消异步任务的类。FutureTask对Future做的一个基本实现。可以调用方法区开始和取消一个任务。
一般是配合Callable去使用。
异步任务启动之后,可以获取一个绑定当前异步任务的FutureTask。
可以基于FutureTask的方法去取消任务,查看任务是否结果,以及获取任务的返回结果。
FutureTask内部的整体结构中,实现了RunnableFuture的接口,这个接口又继承了Runnable, Future这个两个接口。所以FutureTask也可以作为任务直接交给线程池去处理。

FutureTask应用

大方向是FutureTask对任务的控制:

  • 任务执行过程中状态的控制
  • 任务执行完毕后,返回结果的获取
    FutureTask的任务在执行run方法后,是无法被再次运行,需要使用runAndReset方法才可以。
    public static void main(String[] args) throws InterruptedException {FutureTask<String> futureTask = new FutureTask<>(() -> {System.out.println("任务开始执行......");Thread.sleep(2000);System.out.println("任务执行完毕......");return "OK!";});// 构建线程池ExecutorService threadPool = Executors.newFixedThreadPool(10);// 构建线程池threadPool.execute(futureTask);// futureTask提供了run方法,一般不会自己去调用run方法,让线程池去执行任务,由线程池去执行run方法// run方法在执行时,是有任务状态的。任务已经执行了,再次调用run方法无效的。// 如果希望任务可以反复被执行,需要去调用runAndReset方法futureTask.run();// 对返回结果的获取,类似阻塞队列的poll方法// 如果在指定时间内,没有拿到方法的返回结果,直接扔TimeoutException
//        try {
//            String s = futureTask.get(3000, TimeUnit.MICROSECONDS);
//            System.out.println("返回结果:" + s);
//        } catch (Exception e) {
//            System.out.println("异常返回:" + e.getMessage());
//            e.printStackTrace();
//        }// 对返回结果的获取,类似阻塞队列的take方法,死等结果
//        try {
//            String s = futureTask.get();
//            System.out.println("任务结果:" + s);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        } catch (ExecutionException e) {
//            e.printStackTrace();
//        }// 对任务状态的控制System.out.println("任务结束了么?:" + futureTask.isDone());Thread.sleep(1000);System.out.println("任务结束了么?:" + futureTask.isDone());Thread.sleep(1000);System.out.println("任务结束了么?:" + futureTask.isDone());}

FutureTask源码分析

看FutureTask的源码,要从几个方向去看:

  • 先查看FutureTask中提供的一些状态
  • 再查看任务的执行过程

FutureTask中的核心属性

清楚任务的流转流转状态是怎样的,其次对于核心属性要追到是干嘛的。

/**
FutureTask的核心属性
FutureTask任务的状态流转
* NEW -> COMPLETING -> NORMAL 任务正常执行,并且返回结果也正常返回 
* NEW -> COMPLETING -> EXCEPTIONAL 任务正常执行,但是结果是异常 
* NEW -> CANCELLED 任务被取消
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// 记录任务的状态
private volatile int state;
// 任务被构建之后的初始状态
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;/** 需要执行任务,会被赋值到这个属性 */
private Callable<V> callable;
/** 任务的任务结果要存储在这几个属性中 */
private Object outcome;
/** 执行任务的线程 */
private volatile Thread runner;
/** 等待返回结果的线程Node对象, */
private volatile WaitNode waiters;static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}

FutureTask的run方法

任务执行前的一些判断,以及调用任务封装结果的方式,还有最后的一些后续处理。

// 当线程池执行FutureTask任务时,会调用的方法
public void run() {// 如果当前任务状态不是NEW,直接return告辞if (state != NEW ||// 如果状态正确是NEW,这边需要基于CAS将runner属性设置为当前线程// 如果CAS失败,直接return告辞!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 将要执行的任务拿到Callable<V> c = callable;// 健壮性判断,保证任务不是null// 再次判断任务的状态是NEW(DCL)if (c != null && state == NEW) {// 执行任务// result:任务的返回结果// ran:如果为true,任务正常结束。 如果为false,任务异常结束。V result;boolean ran;try {// 执行任务result = c.call();// 正常结果,ran设置为trueran = true;} catch (Throwable ex) {// 如果任务执行期间出了异常 // 返回结果置位nullresult = null;// ran设置为falseran = false;// 封装异常结果setException(ex);}if (ran)// 封装正常结果set(result);}} finally {// 将执行任务的线程置位nullrunner = null;// 拿到任务的状态int s = state;// 如果状态大于等于INTERRUPTINGif (s >= INTERRUPTING)// 进来代表任务中断,做一些后续处理handlePossibleCancellationInterrupt(s);}
}

FutureTask的set&setException方法

任务执行完毕后,修改任务的状态以及封装任务的结果。

// 没有异常的时候,正常返回结果
protected void set(V v) {// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将返回结果赋值给 outcome 属性outcome = v;// 将任务状态变为NORMAL,正常结束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}// 任务执行期间出现了异常,这边要封装结果
protected void setException(Throwable t) {// 因为任务执行完毕,需要将任务的状态从NEW,修改为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将异常信息封装到 outcome 属性outcome = t;// 将任务状态变为EXCEPTIONAL,异常结束UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}
}

FutureTask的cancel方法

任务取消的一个方式

  • 任务直接从NEW状态转换为CANCEL
  • 任务从NEW状态变成INTERRUPTING,然后再转换为INTERRUPTED
// 取消任务操作
public boolean cancel(boolean mayInterruptIfRunning) {// 查看任务的状态是否是NEW,如果NEW状态,就基于传入的参数mayInterruptIfRunning // 决定任务是直接从NEW转换为CANCEL,还是从NEW转换为INTERRUPTINGif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // in case call to interrupt throws exception// 如果mayInterruptIfRunning为true // 就需要中断线程if (mayInterruptIfRunning) {try {// 拿到任务线程Thread t = runner;if (t != null)// 如果线程不为null,直接interruptt.interrupt();} finally { // final state// 将任务状态设置为INTERRUPTEDUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 任务结束后的一些处理~~ 一会看~~finishCompletion();}return true;
}

FutureTask的get方法

这个是线程获取FutureTask任务执行结果的方法

// 拿任务结果
public V get() throws InterruptedException, ExecutionException {// 获取任务的状态int s = state;// 要么是NEW,任务还没执行完// 要么COMPLETING,任务执行完了,结果还没封装好。if (s <= COMPLETING)// 让当前线程阻塞,等待结果s = awaitDone(false, 0L);// 最终想要获取结果,需要执行report方法return report(s);
}// 线程等待FutureTask结果的过程
private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 针对get方法传入了等待时长时,需要计算等到什么时间点final long deadline = timed ? System.nanoTime() + nanos : 0L;// 声明好需要的Node,queued:放到链表中了么?WaitNode q = null;boolean queued = false;for (;;) {// 查看线程是否中断,如果中断,从等待链表中移除,甩个异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 拿到状态int s = state;// 到这,说明任务结束了。if (s > COMPLETING) {if (q != null)// 如果之前封装了WaitNode,现在要清空q.thread = null;return s;}// 如果任务状态是COMPLETING,这就不需要去阻塞线程,让步一下,等待一小会,结果就有了else if (s == COMPLETING) // cannot time out yetThread.yield();// 如果还没初始化WaitNode,初始化else if (q == null)q = new WaitNode();// 没放队列的话,直接放到waiters的前面else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 准备挂起线程,如果timed为true,挂起一段时间else if (timed) {// 计算出最多可以等待多久nanos = deadline - System.nanoTime();// 如果等待的时间没了if (nanos <= 0L) {// 移除当前的Node,返回任务状态removeWaiter(q);return state;}// 等一会LockSupport.parkNanos(this, nanos);}else// 死等LockSupport.park(this);}
}// get的线程已经可以阻塞结束了,基于状态查看能否拿到返回结果
private V report(int s) throws ExecutionException {// 拿到outcome 返回结果Object x = outcome;// 如果任务状态是NORMAL,任务正常结束,返回结果if (s == NORMAL)return (V)x;// 如果任务状态大于等于取消if (s >= CANCELLED)// 直接抛出异常throw new CancellationException();// 到这就是异常结束throw new ExecutionException((Throwable)x);
}

FutureTask的finishCompletion方法

只要任务结束了,无论是正常返回,异常返回,还是任务被取消都会执行这个方法。
而这个方法其实就是唤醒那些执行get方法等待任务结果的线程。

// 任务结束后触发
private void finishCompletion() {// assert state > COMPLETING;// 在任务结束后,需要唤醒for (WaitNode q; (q = waiters) != null;) {// 第一步直接以CAS的方式将WaitNode置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 拿到了Node中的线程Thread t = q.thread;// 如果线程不为nullif (t != null) {// 第一步先置位nullq.thread = null;// 直接唤醒这个线程LockSupport.unpark(t);}// 拿到当前Node的nextWaitNode next = q.next;// next为null,代表已经将全部节点唤醒了吗,跳出循环if (next == null)break;// 将next置位nullq.next = null; // unlink to help gc// q的引用指向nextq = next;}break;}}// 任务结束后,可以基于这个扩展方法,记录一些信息done();// 任务执行完,把callable具体任务置位nullcallable = null;        // to reduce footprint
}

CompletableFuture应用&源码分析

CompletableFuture介绍

平时多线程开发一般就是使用Runnable,Callable,Thread,FutureTask,ThreadPoolExecutor 这些内容和并发编程息息相关。相对来对来说成本都不高,多多使用是可以熟悉这些内容。这些内容组合在一起去解决一些并发编程的问题时,很多时候没有办法很方便的去完成异步编程的操作。
Thread + Runnable:执行异步任务,但是没有返回结果。
Thread + Callable + FutureTask:完整一个可以有返回结果的异步任务。

  • 获取返回结果,如果基于get方法获取,线程需要挂起在WaitNode里。
  • 获取返回结果,也可以基于isDone判断任务的状态,但是这里需要不断轮询

上述的方式都是有一定的局限性的。
比如说任务A,任务B,还有任务C。其中任务B还有任务C执行的前提是任务A先完成,再执行任务B和任务C。
如果任务的执行方式逻辑比较复杂,可能需要业务线程导出阻塞等待,或者是大量的任务线程去编一些任务执行的业务逻辑。对开发成本来说比较高。
CompletableFuture就是帮你处理这些任务之间的逻辑关系,编排好任务的执行方式后,任务会按照规划好的方式一步一步执行,不需要让业务线程去频繁的等待。

CompletableFuture应用

CompletableFuture应用还是需要一点点的成本的。
首先对CompletableFuture提供的函数式编程中三个函数有一个掌握。

Supplier<U> // 生产者,没有入参,有返回结果 
Consumer<T> // 消费者,有入参,但是没有返回结果 
Function<T,U>// 函数,有入参,又有返回结果

supplyAsync

CompletableFuture如果不提供线程池的话,默认使用的ForkJoinPool,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。

public static void main(String[] args) {// 生产者,可以指定返回结果CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务开始执行");System.out.println("异步任务执行结束");return "返回结果";});String result1 = firstTask.join();String result2 = null;try {result2 = firstTask.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(result1 + "," + result2);
}

runAsync

当前方式既不会接收参数,也不会返回任何结果,非常基础的任务编排方式。

public static void main(String[] args) throws IOException {CompletableFuture.runAsync(()->{System.out.println("任务go");System.out.println("任务done");});System.in.read();
}

thenApply、thenApplyAsync

有任务A,还有任务B。
任务B需要在任务A执行完毕后再执行。
而且任务B需要任务A的返回结果。
任务B自身也有返回结果。
thenApply可以拼接异步任务,前置任务处理完之后,将返回结果交给后置任务,然后后置任务再执行。
thenApply提供了带有Async的方法,可以指定每个任务使用的具体线程池。

public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(10);/*CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {String id = UUID.randomUUID().toString();System.out.println("执行任务A:" + id);return id;});CompletableFuture<String> taskB = taskA.thenApply(result -> {System.out.println("任务B获取到任务A结果:" + result);result = result.replace("-", "");return result;});System.out.println("main线程拿到结果:" + taskB.join());*/CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {String id = UUID.randomUUID().toString();System.out.println("执行任务A:" + id + "," + Thread.currentThread().getName());return id;}).thenApplyAsync(result -> {System.out.println("任务B获取到任务A结果:" + result + "," + Thread.currentThread().getName());result = result.replace("-", "");return result;}, executor);System.out.println("main线程拿到结果:" + taskB.join());
}

thenAccept、thenAcceptAsync

套路和thenApply一样,都是任务A和任务B的拼接。
前置任务需要有返回结果,后置任务会接收前置任务的结果,返回后置任务没有返回值。

public static void main(String[] args) throws IOException {CompletableFuture.supplyAsync(() -> {System.out.println("任务A");return "abcdefg";}).thenAccept(result -> {System.out.println("任务b,拿到结果处理:" + result);});System.in.read();
}

thenRun、thenRunAsync

套路和thenApply,thenAccept一样,都是任务A和任务B的拼接。
前置任务没有返回结果,后置任务不接收前置任务结果,后置任务也会有返回结果。

public static void main(String[] args) throws IOException {CompletableFuture.runAsync(() -> {System.out.println("任务A!!");}).thenRun(() -> {System.out.println("任务B!!");});System.in.read();
}

thenCombine、thenAcceptBoth、runAfterBoth

比如有任务A,任务B,任务C。任务A和任务B并行执行,等到任务A和任务B全部执行完毕后,再执行任务C。
A+B ------ C
基于前面thenApply,thenAccept,thenRun知道了一般情况三种任务的概念。
thenCombine以及thenAcceptBoth还有runAfterBoth的区别是一样的。

public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 78;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 66;}), (resultA, resultB) -> {System.out.println("任务C");int resultC = resultA + resultB;return resultC;});System.out.println(taskC.join());System.in.read();
}

applyToEither、acceptEither、runAfterEither

比如有任务A,任务B,任务C。任务A和任务B并行执行,只要任务A或者任务B执行完毕,开始执行任务C。
A or B ----- C
applyToEither,acceptEither,runAfterEither三个方法拼接任务的方式都是一样的。
区别依然是,可以接收结果并且返回结果,可以接收结果没有返回结果,不接收结果也没返回结果。

public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");return 78;}).applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");return 66;}), resultFirst -> {System.out.println("任务C");return resultFirst;});System.out.println(taskC.join());System.in.read();
}

exceptionally、henCompose、handle

exceptionally
这个也是拼接任务的方式,但是只有前面业务执行时出现异常了,才会执行当前方法来处理。
只有异常出现时,CompletableFuture的编排任务没有处理完时,才会触发。
thenCompose,handle
这两个也是异常处理的套路,可以根据方法描述发现,他的功能方向比exceptionally要更加丰富。
thenCompose可以拿到返回结果同时也可以拿到出现的异常信息,但是thenCompose本身是Consumer不能返回结果。无法帮你捕获异常,但是可以拿到异常返回的结果。
handle可以拿到返回结果同时也可以拿到出现的异常信息,并且也可以指定返回托底数据。可以捕获异常的,异常不会抛出去。

public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");// int i = 1 / 0;return 78;}).applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");return 66;}), resultFirst -> {System.out.println("任务A");return resultFirst;}).handle((r, ex) -> {System.out.println("handle:" + r);System.out.println("handle:" + ex);return -1;});/*.exceptionally(ex -> {System.out.println("exceptionally:" + ex);return -1;})*//*.whenComplete((r, ex) -> {System.out.println("whenComplete:" + r);System.out.println("whenComplete:" + ex);});*/System.out.println(taskC.join());System.in.read();
}

allOf,anyOf

allOf的方式是让内部编写多个CompletableFuture的任务,多个任务都执行完后,才会继续执行你后续拼接的任务。
allOf返回的CompletableFuture是void,没有返回结果。

public static void main(String[] args) throws IOException {CompletableFuture.allOf(CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务A");}),CompletableFuture.runAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务B");}),CompletableFuture.runAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务C");})).thenRun(() -> {System.out.println("任务D");});System.in.read();
}

anyOf是基于多个CompletableFuture的任务,只要有一个任务执行完毕就继续执行后续,最先执行完的任务做作为返回结果的入参。

public static void main(String[] args) throws IOException {CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务A");return "A";}),CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务B");return "B";}),CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务C");return "C";})).thenAccept(r -> {System.out.println("任务D执行," + r + "先执行完毕的");});System.in.read();
}

CompletableFuture源码分析

CompletableFuture的源码内容特别多。不需要把所有源码都看了,更多的是要掌握整个CompletableFuture的源码执行流程,以及任务的执行时机。
从CompletableFuture中比较简单的方法作为分析的入口,从而掌握整体执行的流程。

当前任务执行方式

将任务和CompletableFuture封装到一起,再执行封住好的具体对象的run方法即可。

 // 提交任务到CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable) {// asyncPool:执行任务的线程池// runnable:具体任务。return asyncRunStage(asyncPool, runnable);
}// 内部执行的方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {// 对任务做非空校验if (f == null) throw new NullPointerException();// 直接构建了CompletableFuture的对象,作为最后的返回结果CompletableFuture<Void> d = new CompletableFuture<Void>();// 将任务和CompletableFuture对象封装为了AsyncRun的对象// 将封装好的任务交给了线程池去执行e.execute(new AsyncRun(d, f));// 返回构建好的CompletableFuturereturn d;
}// 封装任务的AsyncRun类信息
static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {// 声明存储CompletableFuture对象以及任务的成员变量CompletableFuture<Void> dep;Runnable fn;// 将传入的属性赋值给成员变量AsyncRun(CompletableFuture<Void> dep, Runnable fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}public final boolean exec() { run(); return true; }// 当前对象作为任务提交给线程池之后,必然会执行当前方法public void run() {// 声明局部变量CompletableFuture<Void> d; Runnable f;// 将成员变量赋值给局部变量,并且做非空判断if ((d = dep) != null && (f = fn) != null) {// help GC,将成员变量置位null,只要当前任务结束后,成员变量也拿不到引用。dep = null; fn = null;// 先确认任务没有执行。if (d.result == null) {try {// 直接执行任务f.run();// 当前方法是针对Runnable任务的,不能将结果置位null // 要给没有返回结果的Runnable做一个返回结果d.completeNull();} catch (Throwable ex) {// 异常结束!d.completeThrowable(ex);}}d.postComplete();}}
}

任务编排的存储&执行方式

首先如果要在前继任务处理后,执行后置任务的话。
有两种情况:

  • 前继任务如果没有执行完毕,后置任务需要先放在stack栈结构中存储。
  • 前继任务已经执行完毕了,后置任务就应该直接执行,不需要在往stack中存储了。

如果单独采用thenRun在一个任务后面指定多个后继任务,CompletableFuture无法保证具体的执行顺序,而影响执行顺序的是前继任务的执行时间,以及后置任务编排的时机。

任务编排流程

// 编排任务,前继任务搞定,后继任务再执行
public CompletableFuture<Void> thenRun(Runnable action) {// 执行了内部的uniRunStage方法, // null:线程池,现在没给。// action:具体要执行的任务return uniRunStage(null, action);
}// 内部编排任务方法
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {// 后继任务不能为null,健壮性判断if (f == null) throw new NullPointerException();// 创建CompletableFuture对象d,与后继任务f绑定CompletableFuture<Void> d = new CompletableFuture<Void>();// 如果线程池不为null,代表异步执行,将任务压栈// 如果线程池是null,先基于uniRun尝试下,看任务能否执行if (e != null || !d.uniRun(this, f, null)) {// 如果传了线程池,这边需要走一下具体逻辑 // e:线程池// d:后继任务的CompletableFuture// this:前继任务的CompletableFuture// f:后继任务UniRun<T> c = new UniRun<T>(e, d, this, f);// 将封装好的任务,push到stack栈结构// 只要前继任务没结束,这边就可以正常的将任务推到栈结构中// 放入栈中可能会失败push(c);// 无论压栈成功与否,都要尝试执行以下。c.tryFire(SYNC);}// 无论任务执行完毕与否,都要返回后继任务的CompletableFuturereturn d;
}

查看后置任务执行时机

任务在编排到前继任务时,因为前继任务已经结束了,这边后置任务会主动的执行。

// 后置任务无论压栈成功与否,都需要执行tryFire方法
static final class UniRun<T> extends UniCompletion<T,Void> {Runnable fn;// executor:线程池// dep:后置任务的CompletableFuture// src:前继任务的CompletableFuture// fn:具体的任务UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<Void> tryFire(int mode) {// 声明局部变量CompletableFuture<Void> d; CompletableFuture<T> a;// 赋值局部变量// (d = dep) == null:赋值加健壮性校验if ((d = dep) == null ||// 调用uniRun。// a:前继任务的CompletableFuture// fn:后置任务// 第三个参数:传入的是this,是UniRun对象!d.uniRun(a = src, fn, mode > 0 ? null : this))// 进到这,说明前继任务没结束,等!return null;dep = null; src = null; fn = null;return d.postFire(a, mode);}
}// 是否要主动执行任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {// 方法要么正常结束,要么异常结束Object r; Throwable x;// a == null:健壮性校验// (r = a.result) == null:判断前继任务结束了么? // f == null:健壮性校验if (a == null || (r = a.result) == null || f == null)// 到这代表任务没结束。return false;// 后置任务执行了没? == null,代表没执行if (result == null) {// 如果前继任务的结果是异常结束。如果前继异常结束,直接告辞,封装异常结果if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)completeThrowable(x, r);else// 到这,前继任务正常结束,后置任务正常执行try {// 如果基于tryFire(SYNC)进来,这里的C不为null,执行c.claim // 如果是因为没有传递executor,c就是null,不会执行c.claimif (c != null && !c.claim())// 如果返回false,任务异步执行了,直接return falsereturn false;// 如果claim没有基于线程池运行任务,那这里就是同步执行// 直接f.run了。f.run();// 封装Null结果completeNull();} catch (Throwable ex) {// 封装异常结果completeThrowable(ex);}}return true;
}// 异步的线程池处理任务
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// 只要有线程池对象,不为nullif (e == null)return true;executor = null; // disable// 基于线程池的execute去执行任务e.execute(this);}return false;
}

前继任务执行完毕后,基于嵌套的方式执行后置。

// A:嵌套了B+C, B:嵌套了D+E
// 前继任务搞定,遍历stack执行后置任务
// A任务处理完,解决嵌套的B和C
final void postComplete() {// f:前继任务的CompletableFuture// h:存储后置任务的栈结构CompletableFuture<?> f = this; Completion h;// (h = f.stack) != null:赋值加健壮性判断,要确保栈中有数据while ((h = f.stack) != null ||// 循环一次后,对后续节点的赋值以及健壮性判断,要确保栈中有数据(f != this && (h = (f = this).stack) != null)) {// t:当前栈中任务的后续任务CompletableFuture<?> d; Completion t;// 拿到之前的栈顶h后,将栈顶换数据if (f.casStack(h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}h.next = null;    // detach}// 执行tryFire方法,f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}// 回来了 NESTED == -1
final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a;if ((d = dep) == null ||!d.uniHandle(a = src, fn, mode > 0 ? null : this))return null;dep = null; src = null; fn = null;// 内部会执行postComplete,运行B内部嵌套的D和Ereturn d.postFire(a, mode);
}

CompletableFuture执行流程图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75871.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于部标JT808的车载视频监控需求与EasyCVR视频融合平台解决方案设计

一、方案背景 众所周知&#xff0c;在TSINGSEE青犀视频解决方案中&#xff0c;EasyCVR视频智能融合共享平台主要作为视频汇聚平台使用&#xff0c;不仅能兼容安防标准协议RTSP/Onvif、国标GB28181&#xff0c;互联网直播协议RTMP&#xff0c;私有协议海康SDK、大华SDK&#xf…

虚拟局域网VLAN的实现机制

虚拟局域网VLAN的实现机制1.IEEE 802.1Q帧2.交换的端口类型AccessTrunkHybrid&#xff08;华为特有&#xff09;1.IEEE 802.1Q帧 IEEE802.1Q帧&#xff08;也称Dot One Q帧&#xff09;对以太网的MAC帧格式进行了扩展&#xff0c;插入了4字节的VLAN标记。 2.交换的端口类型 A…

Facebook广告成本过高?尝试这些成本控制技巧

在当今的数字营销领域中&#xff0c;Facebook广告已经成为许多企业的首选。但是&#xff0c;随着竞争的加剧&#xff0c;Facebook广告的成本也在不断攀升。如果您发现自己的Facebook广告成本过高&#xff0c;不要担心&#xff0c;下面将介绍一些成本控制技巧。一.利用Facebook的…

第四阶段05- 关于响应结果JsonResult对象,枚举,Spring MVC的统一处理异常机制

23. 关于响应结果 目前&#xff0c;当成功的添加相册后&#xff0c;服务器端响应的结果是&#xff1a; 添加相册成功&#xff01;如果相册名称已经被占用&#xff0c;服务器端响应的结果是&#xff1a; 添加相册失败&#xff0c;相册名称已经被占用&#xff01;以上的响应结…

机器学习100天(三十二):032 KD树的构造和搜索

机器学习100天,今天讲的是:KD树的构造和搜索! 《机器学习100天》完整目录:目录 在 K 近邻算法中,我们计算测试样本与所有训练样本的距离,类似于穷举法。如果数据量少的时候,算法运行时间没有大的影响,但是如果数据量很大,那么算法运行的时间就会很长。这在实际的应用…

4.排序算法之一:冒泡排序

排序算法稳定性假定在待排序的记录序列中&#xff0c;存在多个具有相同的关键字的记录&#xff0c;若经过排序&#xff0c;这些记录的相对次序保持不变&#xff0c;即在原序列中&#xff0c;r[i]r[j]&#xff0c;且r[i]在r[j]之前&#xff0c;而在排序后的序列中&#xff0c;r[…

柔性电路板的优点、分类和发展方向

柔性电路板是pcb电路板的一种&#xff0c;又称为软板、柔性印刷电路板&#xff0c;主要是由柔性基材制作而成的一种具有高可靠性、高可挠性的印刷电路板&#xff0c;具有厚度薄、可弯曲、配线密度高、重量轻、灵活度高等特点&#xff0c;主要用在手机、电脑、数码相机、家用电器…

二叉树——二叉树的最近公共祖先

二叉树的最近公共祖先 给定一个二叉树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个节点 p、q&#xff0c;最近公共祖先表示为一个节点 x&#xff0c;满足 x 是 p、q 的祖先且 x 的深度尽可能大&#xff08;一…

Guitar Pro8免费吉他曲谱mySongBook

每周都会发布新的谱子&#xff0c;目前已有有数千首歌曲可供选择&#xff0c;在谱库中&#xff0c;您能找到 Guns N Roses&#xff0c;Miles Davis&#xff0c;Ed Sheeran 等人的经典曲目。开头我们先做一个小实验&#xff1a;现在打开你电脑里存放曲谱的文件夹&#xff0c;里面…

[busybox] busybox生成一个最精简rootfs(上)

这篇文章是承接着[rootfs]用busybox做一个rootfs(根文件系统)来的&#xff0c;再回看这篇我很久之前写的文章的时候&#xff0c;有一个问题出现在我的脑海中&#xff0c;创建了这个文件那个文件&#xff0c;但确实是每个文件都是必需的吗&#xff1f; 这篇文章我们就来讨论下这…

【用Group整理目录结构 Objective-C语言】

一、接下来,我们看另外一个知识点,怎么用Group把这一堆乱七八糟的文件给它整理一下,也算是封装一下吧, 1.这一堆杂乱无章的文件: 那么,哪些类是属于模型呢,哪些类是属于视图呢,哪些类是属于控制器呢, 我们接下来通过Group的方式,来给它们分一下类, 这样看起来就好…

蓝海彤翔执行副总裁张加廷接受【联播苏州】独家专访

今年春节档&#xff0c;科幻类电影《流浪地球2》票房口碑双丰收&#xff0c;截至目前&#xff0c;累计票房已破 38 亿&#xff0c;淘票票评分 9.6 &#xff0c;影片的特效质感可以媲美国际顶尖水平。其中&#xff0c;蓝海彤翔为影片的后期制作提供了出色的渲染服务。2月21日&am…

招投标管理系统-适合于招标代理、政府采购、企业采购、工程交易等业务的企业

招投标管理系统-适合于招标代理、政府采购、企业采购、工程交易等业务的企业 招投标管理系统是一个用于内部业务项目管理的应用平台。以项目为主线&#xff0c;从项目立项&#xff0c;资格预审&#xff0c;标书编制审核&#xff0c;招标公告&#xff0c;项目开标&#xff0c;项…

[chapter 11][NR Physical Layer][Layer Mapping]

前言&#xff1a;这里参考Curious Being系列 &#xff0c;简单介绍一下NR 5G 物理层核心技术层映射.我们主要讲了一下what is layer Mapping, why need layer Mapping, how layer Mapping 参考文档&#xff1a;3GPP 38.211- 6.3.1.3 Layer mapping《5G NR Physical Layer | Cha…

js几种对象创建方式

适用于不确定对象内部数据方式一&#xff1a;var p new Object(); p.name TOM; p.age 12 p.setName function(name) {this.name name; }// 测试 p.setName(jack) console.log(p.name,p.age)方式二&#xff1a; 对象字面量模式套路&#xff1a;使用{}创建对象&#xff0c;同…

发现新大陆——原来软件开发根本不需要会编码(看我10分钟应用上线)

目录 一、前言 二、官网基础功能及搭建 三、体验过程 01、连接数据源 02、设计表单 03、流程设计 04、图表呈现 05、组织架构设置 五、效率评价 六、小结 一、前言 众所周知&#xff0c;每家公司在发展过程中都需要构建大量的内部系统&#xff0c; 如运营使用的用户…

cnpm adduser 报错 409 Conflict

今天遇到一个问题&#xff0c;cnpm adduser 一直失败&#xff0c;返回 409 Conflict。 我们先来看下报错信息 409 Conflict - PUT http://registry.cnpm.xxxx.com.cn/-/user/org.couchdb.user:mingyu6 - conflict第一步 分析 http 错误码 409 Conflict&#xff1a;请求与服务…

数据结构初阶 -- 顺序表

数据结构初阶 链表的讲解 目录 一. 线性表 1.1 定义 1.2 特点 二. 顺序表 2.1 定义 2.2 代码 2.3 功能需求 2.4 静态顺序表的特点以及缺点 2.5 动态的顺序表 2.6 动态顺序表接口的实现 三. 代码 头文件 主文件 一. 线性表 1.1 定义 线性表&#xff08;linear li…

代码随想录算法训练营第九届期第十四天 | 二叉树理论基础 、递归遍历 、迭代遍历 、统一迭代

打卡第十四天&#xff0c;今天学习二叉树。 今日任务 理论基础递归遍历迭代遍历统一迭代 理论基础 二叉树是一种基础数据结构 二叉树的种类 满二叉树&#xff1a;只有度为0和为2的结点&#xff0c;而且度为0 的结点都在最后一层。完全二叉树&#xff1a;结点按顺序从上到下&…

电脑没有回收站找回删除文件的2种方法

最近后台收到了这样的咨询&#xff1a;”在网吧上网&#xff0c;删除东西的时候不小心把我的文件给删除了&#xff0c;但是桌面上没有回收站&#xff0c;怎么才能找回我的文件&#xff1f;“——针对“电脑没有回收站删除的东西怎么恢复”这种疑问&#xff1f;不妨看看下面数据…