CompletableFuture的基本使用和原理

news/2024/4/20 6:09:38/文章来源:https://blog.csdn.net/qq_35448165/article/details/130255243

CompletableFuture

CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

CompletableFuture实现了Future和CompletionStage两个接口在这里插入图片描述

  • 通过Future同步等待执行结果
  • CompletionStage,增强异步回调的功能。

将CompletableFuture当作简单的Future来使用

可以用一个无参数构造函数创建这个类的实例来表示Future的结果,将它分发给使用者,并在将来的某个时候使用complete方法完成它。使用者可以使用get方法阻塞当前线程,直到获取返回结果。

public Future<String> calculateAsync() throws InterruptedException {CompletableFuture<String> completableFuture = new CompletableFuture<>();Executors.newCachedThreadPool().submit(() -> {Thread.sleep(500);completableFuture.complete("Hello");return null;});return completableFuture;
}

CompletableFuture构建方法

构建一个CompletableFuture有以下四种办法

  • supplyAsync(runnable) 异步执行一个任务,提供返回值
  • supplyAsync(runnable,Executor executor) 提供返回值
  • runAsync(runnable,Executor executor) -> 通过自定义线程池异步执行一个任务,没有返回值
  • runAsync(runnable) -> 异步执行一个任务, 默认用ForkJoinPool.commonPool(), 没有返回值

注意在没有返回值的情形下,CompletableFuture也还是提供了get方法来阻塞获取执行结果,只是最后返回的结果为null

CompletionStage

CompletionStage定义了很多方法,大致可以分为以下几类

纯消费类型的方法

纯消费类型的方法,指依赖上一个异步任务的结果作为当前函数的参数进行下一步计算,它的特点是不返回新的计算值,这类的方法都包含 Accept 这个关键字
在CompletionStage中包含9个Accept关键字的方法,这9个方法又可以分为三类:

  • 依赖单个CompletionStage任务完成,
  • 依赖两个CompletionStage任务都完成
  • 依赖两个CompletionStage中的任何一个完成
//当前线程同步执行
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
//使用ForkJoinPool.commonPool线程池执行action
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
//使用自定义线程池执行action
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T>
action,Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U>
other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<?
extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<?
extends U> other,BiConsumer<? super T, ? super U> action,Executor executor);
public CompletionStage<Void> acceptEither(CompletionStage<? extends T>
other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T>
other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T>
other,Consumer<? super T> action,Executor executor);

有返回值类型的方法

有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有返回值的CompletionStage对象。

在CompletionStage中,定义了9个带有返回结果的方法,也可以根据依赖几个CompletionStage任务的完成来分成三类

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U>
fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U>
fn,Executor executor);
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U>
other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends
U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends
U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T>
other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends
T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends
T> other,Function<? super T, U> fn,Executor executor);

不消费也不返回的方法

该方法的执行,带run关键字,下一步的执行不依赖上一步的执行结果,也不返回结果,只是有执行的先后顺序

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor
executor);
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable
action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?>
other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?>
other,Runnable action,Executor executor);
public CompletionStage<Void> runAfterEither(CompletionStage<?>
other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?>
other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?>
other,Runnable action,Executor executor);

多任务组合

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends
CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends
CompletionStage<U>> fn,Executor executor)

并行执行

  • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
  • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture

结果/异常处理

  • whenComplete
    whenComplete表示当任务执行完成后,会触发的方法,它的特点是,不论前置的
    CompletionStage任务是正常执行结束还是出现异常,都能够触发特定的 action 方法

  • handle
    handle表示前置任务执行完成后,不管前置任务执行状态是正常还是异常,都会执行handle中的
    fn 函数,它和whenComplete的作用几乎一致,不同点在于,handle是一个有返回值类型的方
    法。

  • exceptionally
    exceptionally接受一个 fn 函数,当上一个CompletionStage出现异常时,会把该异常作为参数传
    递到 fn 函数

       CompletableFuture.runAsync(()-> {
//            int i=1/0;System.out.println("执行某些操作");}).whenComplete((r, e) -> {if (e != null) {System.out.println("执行过程出现异常...");} else {System.out.println("任务执行完成");}});}

thenCompose和thenApply的异同

thenApply和thenCompose都是对一个CompletableFuture返回的结果进行后续操作,返回一个新的CompletableFuture。

对于thenApplyfn函数是一个对一个已完成的stage或者说CompletableFuture的返回值进行计算、操作;
对于thenComposefn函数是对另一个CompletableFuture进行计算、操作

        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> 100).thenApply(num -> num + " to String");CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> 100).thenCompose(num -> CompletableFuture.supplyAsync(() -> num + " to String"));System.out.println(f1.join()); // 100 to StringSystem.out.println(f2.join()); // 100 to String

上面thenApplythenCompose都是将一个CompletableFuture<Integer>转换为CompletableFuture<String>。不同的是,thenApply中的传入函数的返回值是String,而thenCompose的传入函数的返回值是CompletableFuture<String>。就好像stream中学到的mapflatMap。回想我们做过的二维数组转一维数组,使用stream().flatMap映射时,我们是把流中的每个数据(数组)又展开为了流。

CompletableFuture原理介绍

以下述代码为例,简单了解下CompletableFuture的实现原理

 public static void main(String[] args) throws InterruptedException, ExecutionException {CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello";}).thenAccept(e -> {System.out.println("执行结果为" + e);});f.get();}

先看一下CompletableFuture里定义了哪些重要的变量

 //CompletableFuture的结果值或者是一个异常的包装对象AltResultvolatile Object result;       // 依赖操作栈的栈顶volatile Completion stack;    // Top of Treiber stack of dependent actions

然后看下我的例子里调用的supplyAsync方法

supplyAsync

会将我们的Supplier参数封装成AsyncSupply对象,然后交给线程池执行,
AsyncSupply有两个参数,一个是源码里创建的CompletableFuture对象,一个是用户定义的Supplier参数

 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {//asyncPool是一个全局的ForkJoinPool.commonPool线程池return asyncSupplyStage(asyncPool, supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();//创建一个新的CompletableFuture并返回(1)CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}
 public void run() {CompletableFuture<T> d; Supplier<T> f;//如果dep和fn不为空if ((d = dep) != null && (f = fn) != null) {dep = null; fn = null;//如果CompletableFuture的result为空(表示当前任务还没执行完),则等待直接完成后执行postCompleteif (d.result == null) {try {//通过get()方法获取返回结果并设置给resultd.completeValue(f.get());} catch (Throwable ex) {d.completeThrowable(ex);}}//在执行完自己的方法获取到返回值之后,会执行所有依赖此任务的其他任务,这些任务存储在一个无锁并发栈里d.postComplete();}}

thenAccept

我们先来看下thenAccept的实现

 private CompletableFuture<Void> uniAcceptStage(Executor e,Consumer<? super T> f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();      //这里的this就是前面supplyAsync方法里创建的CompletableFuture//如果为异步任务,则将任务压栈后直接返回,因为源任务结束后会触发异步线程执行对应逻辑//如果为同步任务(e==null)会调用d.uniAccept方法 这个方法的逻辑:如果源任务完成,则直接调用f并返回true,否则进入下面的if代码块if (e != null || !d.uniAccept(this, f, null)) {//封装一个UniAccept对象,并压入到栈中UniAccept<T> c = new UniAccept<T>(e, d, this, f);push(c);c.tryFire(SYNC);}return d;}/** Pushes the given completion (if it exists) unless done. */final void push(UniCompletion<?,?> c) {if (c != null) {while (result == null && !tryPushStack(c))lazySetNext(c, null); // clear on failure}}final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d; CompletableFuture<T> a;if ((d = dep) == null ||//如果是异步调用(mode>0),传入null。否则传入this!d.uniAccept(a = src, fn, mode > 0 ? null : this))return null;dep = null; src = null; fn = null;return d.postFire(a, mode);
}final <S> boolean uniAccept(CompletableFuture<S> a,Consumer<? super S> f, UniAccept<S> c) {Object r; Throwable x;//判断当前CompletableFuture是否已完成,如果没有完成则返回falseif (a == null || (r = a.result) == null || f == null)return false;tryComplete: if (result == null) {//判断任务执行结果是否为异常类型if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {completeThrowable(x, r);break tryComplete;}r = null;}try {//判断当前任务是否可以执行(d.uniAccept(this, f, null)传入的c为null)if (c != null && !c.claim())return false;@SuppressWarnings("unchecked") S s = (S) r;//获取CompletableFuture执行的任务结果并执行consumerf.accept(s);completeNull();} catch (Throwable ex) {completeThrowable(ex);}}return true;}

postComplete

再回过头看下在一个任务执行完成后调用的postComplete 方法

 /*** Pops and tries to trigger all reachable dependents.  Call only* when known to be done.*/final void postComplete() {//无锁并发栈,(Completion有一个next指针), 保存的是依赖当前的CompletableFuture的一串任务CompletableFuture<?> f = this; Completion h;//判断stack是否为空while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;//非空则通过CAS出栈if (f.casStack(h, t = h.next)) {if (t != null) {//如果f不是this,将刚出栈的h压入this的栈顶if (f != this) {//通过CAS入栈pushStack(h);continue;}// 如果是当前CompletableFuture, 解除头节点与栈的联系, help GCh.next = null;   }f = (d = h.tryFire(NESTED)) == null ? this : d;}}}final void pushStack(Completion c) {do {} while (!tryPushStack(c));}

CompletableFuture实现链式调用的核心原理就是通过一个无锁并发栈(Treiber Stack)来存储任务。

依赖任务执行的时候先判断源任务是否完成,如果完成,直接在对应线程执行以来任务(如果是同步,则在当前线程处理,否则在异步线程处理)
如果任务没有完成,直接返回,因为等任务完成之后会通过postComplete去触发调用依赖任务。

借用下在别人的博客看到的原理图:

public static void main(String[] args) {CompletableFuture<String> baseFuture = CompletableFuture.completedFuture("Base Future");log.info(baseFuture.thenApply((r) -> r + " Then Apply").join());baseFuture.thenAccept((r) -> log.info(r)).thenAccept((Void) -> log.info("Void"));
}

在这里插入图片描述

实战例子: 烧水泡茶
CompletableFuture使用详解

全网最详细CompletableFuture使用教程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_102273.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

web事件循环

事件循环的应用&#xff1a;计时器 promise ajax node 单线程是异步产生的原因&#xff0c;事件循环时异步的实现方式 1.浏览器进程模型 进程&#xff1a;程序运行需要自己专属的内存空间&#xff0c;可以把这块内存空间简单的理解为进程。 每个应用至少又一个进程&#xff…

模板方法设计模式解读

目录 豆浆制作问题 模板方法模式基本介绍 基本介绍 模板方法模式的原理类图 模板方法模式解决豆浆制作问题 应用实例要求 思路分析和图解(类图) 模板方法模式的钩子方法 模板方法模式的注意事项和细节 豆浆制作问题 编写制作豆浆的程序&#xff0c;说明如下: 1) 制作豆…

【LeetCode】剑指 Offer 67. 把字符串转换成整数 p318 -- Java Version

题目链接&#xff1a;https://leetcode.cn/problems/ba-zi-fu-chuan-zhuan-huan-cheng-zheng-shu-lcof/ 1. 题目介绍&#xff08;67. 把字符串转换成整数&#xff09; 写一个函数 StrToInt&#xff0c;实现把字符串转换成整数这个功能。不能使用 atoi 或者其他类似的库函数。 …

研究生考试 之 计算机网络第七版(谢希仁) 第一章 课后答案

研究生考试 之 计算机网络第七版(谢希仁) 第一章 课后答案 目录 研究生考试 之 计算机网络第七版(谢希仁) 第一章 课后答案 一、简单介绍 二、计算机网络第七版(谢希仁) 第一章 课后答案 1、 计算机网络向用户可以提供哪些服务&#xff1f; 2、 试简述分组交换的要点。 3…

Kali下部署-Nessus漏扫工具

Nessus 是全世界最多人使用的系统漏洞扫描与分析软件。总共有超过75,000个机构使用Nessus 作为扫描该机构电脑系统的软件。 特点&#xff1a; 1、提供完整的电脑漏洞扫描服务&#xff0c;并随时更新漏洞库。 2、可以在本机或者是远端上进行遥控&#xff0c;进行系统的漏洞扫…

常见的四种排名函数的用法(sql)

四个排名函数&#xff1a; 1.row_number 2.rank 3.dense_rank 4.ntile 1. ROW_NUMBER&#xff08;排名场景推荐&#xff09; 1.1 介绍 在 SQL 中&#xff0c;ROW_NUMBER() 是一个窗口函数&#xff0c;它为结果集中的每一行分配一个唯一的序号。该函数的语法如下&#xff1a; …

JavaSE-part1

文章目录 Day01 面向对象特性1.java继承注意点2.多态2.1多态概述2.2多态中成员的特点:star::star:2.3多态的转型:star::star: 3.Super4.方法重写:star::star:5.Object类:star::star: Day02 面向对象特性1.代码块:star:(主要是初始化变量&#xff0c;先于构造器)2.单例设计模式:…

【移动端网页布局】移动端网页布局基础概念 ⑦ ( 在 PhotoShop 中使用 Cutterman 切二倍图 | 使用二倍图作为背景图像 )

文章目录 一、在 PhotoShop 中使用 Cutterman 切二倍图二、使用二倍图作为背景图像 一、在 PhotoShop 中使用 Cutterman 切二倍图 参考 【CSS】PhotoShop 切图 ③ ( PhotoShop 切图插件 - Cutterman | 下载、安装、启动、注册、登录 Cutterman - 切图神奇 插件 | 使用插件进行切…

3自由度并联绘图机器人实现写字功能(一)

1. 功能说明 本文示例将实现R305样机3自由度并联绘图机器人写字的功能。 2. 电子硬件 在这个示例中&#xff0c;采用了以下硬件&#xff0c;请大家参考&#xff1a; 主控板 Basra主控板&#xff08;兼容Arduino Uno&#xff09; 扩展板Bigfish2.1扩展板电池7.4V锂电池 3. 功能…

远程访问及控制ssh

SSH远程管理 OpenSSH服务器 SSH(Secure Shell) 协议 是一种安全通道协议。主要用来实现字符界面的远程登录、远程复制等功能。对通信数据进行了加密处理&#xff0c;用于远程管理其中包括用户登录时输入的用户口令。因此SSH协议具有很好的安全性------------&#xff08;同样…

d2l Transformer

终于到变形金刚了&#xff0c;他的主要特征在于多头自注意力的使用&#xff0c;以及摒弃了rnn的操作。 目录 1.原理 2.多头注意力 3.逐位前馈网络FFN 4.层归一化 5.残差连接 6.Encoder 7.Decoder 8.训练 9.预测 1.原理 主要贡献&#xff1a;1.纯使用attention的Enco…

Android程序员向音视频进阶,有前景吗

随着移动互联网的普及和发展&#xff0c;Android开发成为了很多人的就业选择&#xff0c;希望在这个行业能获得自己的一席之地。然而&#xff0c;随着时间的推移&#xff0c;越来越多的人进入到了Android开发行业&#xff0c;就导致目前Android开发的工作越来越难找&#xff0c…

EFI Driver Model(下)-USB 驱动设计

1、USB简介 通用串行总线&#xff08;英语&#xff1a;Universal Serial Bus&#xff0c;缩写&#xff1a;USB&#xff09;是一种串口总线标准&#xff0c;也是一种输入输出接口的技术规范&#xff0c;被广泛地应用于个人电脑和移动设备等信息通讯产品&#xff0c;并扩展至摄影…

我看谁没看过

vue在新窗口打开页面方法 const { href } this.$router.resolve({path: "/officePlatform/addPrompt"});window.open(href, "_blank"); 添加圆形标志 h3::before {content: "";display: inline-block;width: 13px;height: 13px;background: va…

NFT介绍及监管规则

什么是NFT NFT是Non-Fungible Token&#xff08;非同质化代币&#xff09;的缩写。 NFT是“Non-Fungible Token”的缩写&#xff0c;即非同质化代币。不同于FT&#xff08;Fungible Token&#xff0c;同质化代币&#xff09;&#xff0c;每一个NFT都是独一无二且不可相互替代的…

第二章 Maven 核心程序解压和配置

第一节 Maven核心程序解压与配置 1、Maven 官网地址 首页&#xff1a; Maven – Welcome to Apache Maven(opens new window) 下载页面&#xff1a; Maven – Download Apache Maven(opens new window) 下载链接&#xff1a; 具体下载地址&#xff1a;https://dlcdn.apac…

【云原生】Java 应用程序在 Kubernetes 上棘手的内存管理

文章目录 引言JVM 内存模型简介非 Heap 内存Heap 堆内存Kubernetes 内存管理JVM 和 Kubernetes场景 1 — Java Out Of Memory 错误场景 2 — Pod 超出内存 limit 限制场景 3 — Pod 超出节点的可用内存场景 4 — 参数配置良好&#xff0c;应用程序运行良好 结语 引言 如何结合…

三月、四月总计面试碰壁15次,作为一个27岁的测试工程师.....

3年测试经验原来什么都不是&#xff0c;只是给你的简历上画了一笔&#xff0c;一直觉得经验多&#xff0c;无论在哪都能找到满意的工作&#xff0c;但是现实却是给我打了一个大巴掌&#xff01;事后也不会给糖的那种... 先说一下自己的个人情况&#xff0c;普通二本计算机专业…

JVM调优最佳参数

项目背景 C端的项目&#xff0c;用户量比较多&#xff0c;请求比较多。 启动参数表 Xmx指定应用程序可用的最大堆大小。 Xms指定应用程序可用的最小堆大小。 &#xff08;一般情况下&#xff0c;需要设置Xmx和Xms为相等的值&#xff0c;且为一个固定的值&#xff09; 如果该值…

图像处理:均值滤波算法

目录 前言 概念介绍 基本原理 Opencv实现中值滤波 Python手写实现均值滤波 参考文章 前言 在此之前&#xff0c;我曾在此篇中推导过图像处理&#xff1a;推导五种滤波算法&#xff08;均值、中值、高斯、双边、引导&#xff09;。这在此基础上&#xff0c;我想更深入地研…