Java8 CompletableFuture异步编程-入门篇

news/2024/7/27 10:57:01/文章来源:https://blog.csdn.net/qq_35716689/article/details/136522991

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

目录

前言

1、Future vs CompletableFuture

1.1 准备工作

1.2 Future 的局限性

PS: 后续的演示文件,就不一一展示,我们可以自己定义一些内容,用于处理即可

1.3 CompletableFuture 的优势

2、创建异步任务

2.1 runAsync

2.2 supplyAsync

2.3 异步任务中的线程池

2.4 异步编程思想

3、异步任务回调

3.1 thenApply

3.2 thenAccept

3.3 thenRun

3.4 更进一步提升并行化

4、异步任务编排

4.1 编排2个依赖关系的异步任务 thenCompose()

4.2 编排2个非依赖关系的异步任务 thenCombine()

4.3 合并多个异步任务 allOf / anyOf

5、异步任务的异常处理

5.1 exceptionally()

5.2 handle()

总结


前言

JDK 5引入了Future模式。Future接口是Java多线程Future模式的实现,在java.util.concurrent包中,可以来进行异步计算。

Future模式是多线程设计常用的一种设计模式。Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。

Java 8新增的CompletableFuture类正是吸收了所有Google Guava中ListenableFuture和SettableFuture的特征,还提供了其它强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise 和 Callback(在Java8之前,只有无Callback 的Future)。

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

1、Future vs CompletableFuture

1.1 准备工作

为了便于后续更好地调试,我们需要定义一个工具类辅助我们对知识的理解。

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
​
public class CommonUtils {
​public static String readFile(String pathToFile) {try {return Files.readString(Paths.get(pathToFile));} catch(Exception e) {e.printStackTrace();return "";}}
​//当前线程休眠-毫秒public static void sleepMillis(long millis) {try {TimeUnit.MILLISECONDS.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}
​//当前线程休眠-秒public static void sleepSecond(int seconds) {try {TimeUnit.SECONDS.sleep(seconds);} catch (InterruptedException e) {e.printStackTrace();}}
​//打印日志public static void printThreadLog(String message) {//时间戳|线程id|线程名|日志信息String result = new StringJoiner(" | ").add(String.valueOf(System.currentTimeMillis())).add(String.format("%2d",Thread.currentThread().getId())).add(String.valueOf(Thread.currentThread().getName())).add(message).toString();System.out.println(result);}
}
​

1.2 Future 的局限性

需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中 news.txt

oh my god!completablefuture真tmd好用呀
PS: 后续的演示文件,就不一一展示,我们可以自己定义一些内容,用于处理即可
public class FutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {
​ExecutorService executor = Executors.newFixedThreadPool(5);// step 1: 读取敏感词汇Future<String[]> filterWordFuture = executor.submit(() -> {String str = CommonUtils.readFile("filter_words.txt");String[] filterWords = str.split(",");return filterWords;});
​// step 2: 读取新闻稿文件内容Future<String> newsFuture = executor.submit(() -> {return CommonUtils.readFile("news.txt");});
​// step 3: 替换操作(当敏感词汇很多,文件很多,替换也会是个耗时的任务)Future<String> replaceFuture = executor.submit(() -> {String[] words = filterWordFuture.get();String news = newsFuture.get();
​// 替换敏感词汇for (String word : words) {if (news.indexOf(word) >= 0) {news = news.replace(word, "**");}}return news;});
​String filteredNews = replaceFuture.get();System.out.println("过滤后的新闻稿:" + filteredNews);
​executor.shutdown();}
}
​

通过上面的代码,我们会发现,Future相比于所有任务都直接在主线程处理,有很多优势,但同时也存在不足,至少表现如下:

  • 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告知你它什么时候完成,你如果想要得到结果,必须通过一个get()方法,该方法会阻塞直到结果可用为止。 它不具备将回调函数附加到Future后并在Future的结果可用时自动调用回调的能力。
  • 无法解决任务相互依赖的问题。filterWordFuture和newsFuture的结果不能自动发送给replaceFuture,需要在replaceFuture中手动获取,所以使用Future不能轻而易举地创建异步工作流。
  • 不能将多个Future合并在一起。假设你有多种不同的Future,你想在它们全部并行完成后然后再运行某个函数,Future很难独立完成这一需要。
  • 没有异常处理。Future提供的方法中没有专门的API应对异常处理,还是需要开发者自己手动异常处理。

1.3 CompletableFuture 的优势

CompletableFuture 实现了FutureCompletionStage接口

CompletableFuture 相对于 Future 具有以下优势:

  • 为快速创建、链接依赖和组合多个Future提供了大量的便利方法。
  • 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
  • 无缝衔接和亲和 lambda 表达式 和 Stream - API 。
  • 我见过的真正意义上的异步编程,把异步编程和函数式编程、响应式编程多种高阶编程思维集于一身,设计上更优雅。

2、创建异步任务

2.1 runAsync

如果你要异步运行某些耗时的后台任务,并且不想从任务中返回任何内容,则可以使用CompletableFuture.runAsync()方法。它接受一个Runnable接口的实现类对象,方法返回CompletableFuture<Void> 对象

static CompletableFuture<Void> runAsync(Runnable runnable);

演示案例:开启一个不从任务中返回任何内容的CompletableFuture异步任务

public class RunAsyncDemo {public static void main(String[] args) {// runAsync 创建异步任务CommonUtils.printThreadLog("main start");// 使用Runnable匿名内部类CompletableFuture.runAsync(new Runnable() {@Overridepublic void run() {CommonUtils.printThreadLog("读取文件开始");// 使用睡眠来模拟一个长时间的工作任务(例如读取文件,网络请求等)CommonUtils.sleepSecond(3);CommonUtils.printThreadLog("读取文件结束");}});
​CommonUtils.printThreadLog("here are not blocked,main continue");CommonUtils.sleepSecond(4); //  此处休眠为的是等待CompletableFuture背后的线程池执行完成。CommonUtils.printThreadLog("main end");}
}

我们也可以以Lambda表达式的形式传递Runnable接口实现类对象

public class RunAsyncDemo2 {public static void main(String[] args) {// runAsync 创建异步任务CommonUtils.printThreadLog("main start");// 使用Lambda表达式CompletableFuture.runAsync(() -> {CommonUtils.printThreadLog("读取文件开始");CommonUtils.sleepSecond(3);CommonUtils.printThreadLog("读取文件结束");});
​CommonUtils.printThreadLog("here are not blocked,main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}
​

需求:使用CompletableFuture开启异步任务读取 news.txt 文件中的新闻稿,并打印输出。

public class RunAsyncDemo3 {public static void main(String[] args) {// 需求:使用多线程异步读取 words.txt 中的敏感词汇,并打印输出。CommonUtils.printThreadLog("main start");
​CompletableFuture.runAsync(()->{String news = CommonUtils.readFile("news.txt");CommonUtils.printThreadLog(news);});
​CommonUtils.printThreadLog("here are not blocked,main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

在后续的章节中,我们会经常使用Lambda表达式。

2.2 supplyAsync

CompletableFuture.runAsync() 开启不带返回结果异步任务。但是,如果您想从后台的异步任务中返回一个结果怎么办?此时,CompletableFuture.supplyAsync()是你最好的选择了。

static CompletableFuture<U> supplyAsync(Supplier<U> supplier)

它入参一个 Supplier 供给者,用于供给带返回值的异步任务 并返回CompletableFuture<U>,其中U是供给者给程序供给值的类型。

需求:开启异步任务读取 news.txt 文件中的新闻稿,返回文件中内容并在主线程打印输出

public class SupplyAsyncDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CommonUtils.printThreadLog("main start");
​CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {String news = CommonUtils.readFile("news.txt");return news;}});
​CommonUtils.printThreadLog("here are not blocked,main continue");// 阻塞并等待newsFuture完成String news = newsFuture.get();CommonUtils.printThreadLog("news = " + news);CommonUtils.printThreadLog("main end");}
}

如果想要获取newsFuture结果,可以调用completableFuture.get()方法,get()方法将阻塞,直到newsFuture完成。

我们依然可以使用Java 8的Lambda表达式使上面的代码更简洁。

CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {String news = CommonUtils.readFile("news.txt");return news;
});

2.3 异步任务中的线程池

大家已经知道,runAsync()和supplyAsync()方法都是开启单独的线程中执行异步任务。但是,我们从未创建线程对吗? 不是吗!

CompletableFuture 会从全局的ForkJoinPool.commonPool() 线程池获取线程来执行这些任务

当然,你也可以创建一个线程池,并将其传递给runAsync()和supplyAsync()方法,以使它们在从您指定的线程池获得的线程中执行任务。

CompletableFuture API中的所有方法都有两种变体,一种是接受传入的Executor参数作为指定的线程池,而另一种则使用默认的线程池 (ForkJoinPool.commonPool()) 。

// runAsync() 的重载方法 
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
// supplyAsync() 的重载方法 
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

需求:指定线程池,开启异步任务读取 news.txt 中的新闻稿,返回文件中内容并在主线程打印输出

ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("异步读取文件开始");String news = CommonUtils.readFile("news.txt");CommonUtils.printThreadLog("异步读取文件完成");return news;
},executor);

最佳实践:创建属于自己的业务线程池

如果所有CompletableFuture共享一个线程池,那么一旦有异步任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。

所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

2.4 异步编程思想

综合上述,看到了吧,我们没有显式地创建线程,更没有涉及线程通信的概念,整个过程根本就没涉及线程知识吧,以上专业的说法是:线程的创建和线程负责的任务进行解耦,它给我们带来的好处线程的创建和启动全部交给线程池负责,具体任务的编写就交给程序员,专人专事

异步编程是可以让程序并行( 也可能是并发 )运行的一种手段,其可以让程序中的一个工作单元作为异步任务与主线程分开独立运行,并且在异步任务运行结束后,会通知主线程它的运行结果或者失败原因,毫无疑问,一个异步任务其实就是开启一个线程来完成的,使用异步编程可以提高应用程序的性能和响应能力等。

作为开发者,只需要有一个意识:

开发者只需要把耗时的操作交给CompletableFuture开启一个异步任务,然后继续关注主线程业务,当异步任务运行完成时会通知主线程它的运行结果。我们把具备了这种编程思想的开发称为异步编程思想

3、异步任务回调

CompletableFuture.get()方法是阻塞的。调用时它会阻塞等待 直到这个Future完成,并在完成后返回结果。 但是,很多时候这不是我们想要的。

对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调应自动被调用。 这样,我们就不必等待结果了,然后在Future的回调函数内编写完成Future之后需要执行的逻辑。 您可以使用thenApply(),thenAccept()和thenRun()方法,它们可以把回调函数附加到CompletableFuture

3.1 thenApply

使用 thenApply() 方法可以处理和转换CompletableFuture的结果。 它以Function<T,R>作为参数。 Function<T,R>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果

CompletableFuture<R> thenApply(Function<T,R> fn)

需求:异步读取 filter_words.txt 文件中的内容,读取完成后,把内容转换成数组( 敏感词数组 ),异步任务返回敏感词数组

public class ThenApplyDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {  CommonUtils.printThreadLog("main start");CompletableFuture<String> readFileFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;});CompletableFuture<String[]> filterWordsFuture = readFileFuture.thenApply((content) -> {CommonUtils.printThreadLog("文件内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;});CommonUtils.printThreadLog("main continue");String[] filterWords = filterWordsFuture.get();CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));CommonUtils.printThreadLog("main end");}
}

你还可以通过附加一系列thenApply()回调方法,在CompletableFuture上编写一系列转换序列。一个thenApply()方法的结果可以传递给序列中的下一个,如果你对链式操作很了解,你会发现结果可以在链式操作上传递。

CompletableFuture<String[]> filterWordsFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;
}).thenApply((content) -> {CommonUtils.printThreadLog("转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;
});

3.2 thenAccept

如果你不想从回调函数返回结果,而只想在Future完成后运行一些代码,则可以使用thenAccept()

这些方法是入参一个 Consumer,它可以对异步任务的执行结果进行消费使用,方法返回CompletableFuture。

CompletableFuture<Void>	thenAccept(Consumer<T> action)

通常用作回调链中的最后一个回调。

需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,然后打印敏感词数组

public class ThenAcceptDemo {public static void main(String[] args) {CommonUtils.printThreadLog("main start");CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;}).thenApply((content) -> {CommonUtils.printThreadLog("转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;}).thenAccept((filterWords) -> {CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));});CommonUtils.printThreadLog("main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

3.3 thenRun

前面我们已经知道,通过thenApply( Function<T,R> ) 对链式操作中的上一个异步任务的结果进行转换,返回一个新的结果;

通过thenAccept( Consumer ) 对链式操作中上一个异步任务的结果进行消费使用,不返回新结果;

如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一步链式操作的结果,那么 CompletableFuture.thenRun() 会是你最佳的选择,它需要一个Runnable并返回CompletableFuture<Void>。

CompletableFuture<Void> thenRun(Runnable action);

演示案例:我们仅仅想知道 filter_words.txt 的文件是否读取完成

public class ThenRunDemo {public static void main(String[] args) {CommonUtils.printThreadLog("main start");CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;}).thenRun(() -> {CommonUtils.printThreadLog("读取filter_words文件读取完成");});CommonUtils.printThreadLog("main continue");CommonUtils.sleepSecond(4);CommonUtils.printThreadLog("main end");}
}

3.4 更进一步提升并行化

CompletableFuture 提供的所有回调方法都有两个异步变体

CompletableFuture<U> thenApply(Function<T,U> fn)
// 回调方法的异步变体(异步回调)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn)
CompletableFuture<U> thenApplyAsync(Function<T,U> fn, Executor executor)

注意:这些带了Async的异步回调 通过在单独的线程中执行回调任务 来帮助您进一步促进并行化计算。

回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组,主线程获取结果打印输出这个数组

public class ThenApplyAsyncDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CommonUtils.printThreadLog("main start");CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {/*CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;*/// 此时,立即返回结果return "尼玛, NB, tmd";}).thenApply((content) -> {/*** 一般而言,thenApply任务的执行和supplyAsync()任务执行可以使用同一线程执行* 如果supplyAsync()任务立即返回结果,则thenApply的任务在主线程中执行*/CommonUtils.printThreadLog("把内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;});CommonUtils.printThreadLog("main continue");String[] filterWords = filterWordFuture.get();CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));CommonUtils.printThreadLog("main end");}
}

要更好地控制执行回调任务的线程,可以使用异步回调。如果使用thenApplyAsync()回调,那么它将在从ForkJoinPool.commonPool() 获得的另一个线程中执行

public class ThenApplyAsyncDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {CommonUtils.printThreadLog("main start");CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;}).thenApplyAsync((content) -> {CommonUtils.printThreadLog("把内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;});CommonUtils.printThreadLog("main continue");String[] filterWords = filterWordFuture.get();CommonUtils.printThreadLog("filterWords = " + Arrays.toString(filterWords));CommonUtils.printThreadLog("main end");}
}

以上程序一种可能的运行结果(需要多运行几次):

1672885914481 |  1 | main | main start
1672885914511 | 16 | ForkJoinPool.commonPool-worker-1 | 读取filter_words.txt文件
1672885914511 |  1 | main | main continue
1672885914521 | 17 | ForkJoinPool.commonPool-worker-2 | 把内容转换成敏感词数组
1672885914521 |  1 | main | filterWords = [尼玛, NB, tmd]
1672885914521 |  1 | main | main end

此外,如果将Executor传递给thenApplyAsync()回调,则该回调的异步任务将在从Executor的线程池中获取的线程中执行;

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String[]> filterWordFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取filter_words文件");String filterWordsContent = CommonUtils.readFile("filter_words.txt");return filterWordsContent;
}).thenApplyAsync((content) -> {CommonUtils.printThreadLog("把内容转换成敏感词数组");String[] filterWords = content.split(",");return filterWords;
},executor);
executor.shutdown();

其他两个回调的变体版本如下:

// thenAccept和其异步回调
CompletableFuture<Void>	thenAccept(Consumer<T> action)
CompletableFuture<Void>	thenAcceptAsync(Consumer<T> action)
CompletableFuture<Void>	thenAcceptAsync(Consumer<T> action, Executor executor)// thenRun和其异步回调
CompletableFuture<Void>	thenRun(Runnable action)
CompletableFuture<Void>	thenRunAsync(Runnable action)
CompletableFuture<Void>	thenRunAsync(Runnable action, Executor executor)

4、异步任务编排

4.1 编排2个依赖关系的异步任务 thenCompose()

回顾需求:异步读取 filter_words.txt 文件中的内容,读取完成后,转换成敏感词数组让主线程待用。

关于读取和解析内容,假设使用以下的 readFileFuture(String) 和 splitFuture(String) 方法完成。

public static CompletableFuture<String> readFileFuture(String fileName) {return CompletableFuture.supplyAsync(() -> {String filterWordsContent = CommonUtils.readFile(fileName);return filterWordsContent;});
}public static CompletableFuture<String[]> splitFuture(String context) {return CompletableFuture.supplyAsync(() -> {String[] filterWords = context.split(",");return filterWords;});
}

现在,让我们先了解如果使用thenApply() 结果会发生什么

CompletableFuture<CompletableFuture<String[]>> future = readFileFuture("filter_words.txt").thenApply((context) -> {return splitFuture(context);});

回顾在之前的案例中,thenApply(Function<T,R>) 中Function回调会对上一步任务结果转换后得到一个简单值 ,但现在这种情况下,最终结果是嵌套的CompletableFuture,所以这是不符合预期的,那怎么办呢?

我们想要的是:把上一步异步任务的结果,转成一个CompletableFuture对象,这个CompletableFuture对象中包含本次异步任务处理后的结果。也就是说,我们想组合上一步异步任务的结果到下一个新的异步任务中, 结果由这个新的异步任务返回

此时,你需要使用thenCompose()方法代替,我们可以把它理解为 异步任务的组合

CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> func)

所以,thenCompose()用来连接两个有依赖关系的异步任务,结果由第二个任务返回

CompletableFuture<String[]> future = readFileFuture("filter_words.txt").thenCompose((context) -> { return splitFuture(context);});

因此,这里积累了一个经验:

如果我们想连接( 编排 ) 两个依赖关系的异步任务( CompletableFuture 对象 ) ,请使用 thenCompose() 方法

当然,thenCompose 也存在异步回调变体版本:

CompletableFuture<R> thenCompose(Function<T,CompletableFuture<R>> fn)CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn)
CompletableFuture<R> thenComposeAsync(Function<T,CompletableFuture<R>> fn, Executor executor)

4.2 编排2个非依赖关系的异步任务 thenCombine()

我们已经知道,当其中一个Future依赖于另一个Future,使用thenCompose()用于组合两个Future。如果两个Future之间没有依赖关系,你希望两个Future独立运行并在两者都完成之后执行回调操作时,则使用thenCombine();

// T是第一个任务的结果 U是第二个任务的结果 V是经BiFunction应用转换后的结果
CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)

需求:替换新闻稿 ( news.txt ) 中敏感词汇 ,把敏感词汇替换成*,敏感词存储在 filter_words.txt 中

public class ThenCombineDemo {public static void main(String[] args) throws Exception {// 读取敏感词汇的文件并解析到数组中CompletableFuture<String[]> future1 = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取敏感词汇并解析");String context = CommonUtils.readFile("filter_words.txt");String[] words = context.split(",");return words;});// 读取news文件内容CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {CommonUtils.printThreadLog("读取news文件内容");String context = CommonUtils.readFile("news.txt");return context;});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (words, context) -> {// 替换操作CommonUtils.printThreadLog("替换操作");for (String word : words) {if(context.indexOf(word) > -1) {context = context.replace(word, "**");}}return context;});String filteredContext = combinedFuture.get();System.out.println("filteredContext = " + filteredContext);}
}

注意:当两个Future都完成时,才将两个异步任务的结果传递给thenCombine()的回调函数做进一步处理。

和以往一样,thenCombine 也存在异步回调变体版本

CompletableFuture<V> thenCombine(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func)
CompletableFuture<V> thenCombineAsync(CompletableFuture<U> other, BiFunction<T,U,V> func,Executor executor)

4.3 合并多个异步任务 allOf / anyOf

我们使用thenCompose()和thenCombine()将两个CompletableFuture组合和合并在一起。

如果要编排任意数量的CompletableFuture怎么办?可以使用以下方法来组合任意数量的CompletableFuture

public static CompletableFuture<Void>	allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

CompletableFuture.allOf()用于以下情形中:有多个需要独立并行运行的Future,并在所有这些Future 都完成后执行一些操作。

需求:统计news1.txt、new2.txt、new3.txt 文件中包含CompletableFuture关键字的文件的个数

public class AllOfDemo {public static CompletableFuture<String> readFileFuture(String fileName) {return CompletableFuture.supplyAsync(() -> {String content = CommonUtils.readFile(fileName);return content;});}public static void main(String[] args) {// step 1: 创建List集合存储文件名List<String> fileList = Arrays.asList("news1.txt", "news2.txt", "news3.txt");// step 2: 根据文件名调用readFileFuture创建多个CompletableFuture,并存入List集合中List<CompletableFuture<String>> readFileFutureList = fileList.stream().map(fileName -> {return readFileFuture(fileName);}).collect(Collectors.toList());// step 3: 把List集合转换成数组待用,以便传入allOf方法中int len = readFileFutureList.size();CompletableFuture[] readFileFutureArr = readFileFutureList.toArray(new CompletableFuture[len]);// step 4: 使用allOf方法合并多个异步任务CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(readFileFutureArr);// step 5: 当多个异步任务都完成后,使用回调操作文件结果,统计符合条件的文件个数CompletableFuture<Long> countFuture = allOfFuture.thenApply(v -> {return readFileFutureList.stream().map(future -> future.join()).filter(content -> content.contains("CompletableFuture")).count();});// step 6: 主线程打印输出文件个数Long count = countFuture.join();System.out.println("count = " + count);}
}

顾名思义,当给定的多个异步任务中的有任意Future一个完成时,需要执行一些操作,可以使用 anyOf 方法

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

anyOf()返回一个新的CompletableFuture,新的CompletableFuture的结果和 cfs中已完成的那个异步任务结果相同。

演示案例:anyOf 执行过程

public class AnyOfDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {Tools.sleepMillis(2);return "Future1的结果";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {Tools.sleepMillis(1);return "Future2的结果";});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {Tools.sleepMillis(3);return "Future3的结果";});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);// 输出Future2的结果System.out.println(anyOfFuture.get());}
}

在上面的示例中,当三个CompletableFuture中的任意一个完成时,anyOfFuture就完成了。 由于future2的睡眠时间最少,因此它将首先完成,最终结果将是"Future2的结果"。

注意:

  • anyOf() 方法返回类型必须是 CompletableFuture <Object>。
  • anyOf()的问题在于,如果您拥有返回不同类型结果的CompletableFuture,那么您将不知道最终CompletableFuture的类型。

5、异步任务的异常处理

在前面的章节中,我们并没有更多地关心异常处理的问题,其实,CompletableFuture 提供了优化处理异常的方式。

首先,让我们了解异常如何在回调链中传播

public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result1";}).thenApply(result -> {return result + " result2";}).thenApply(result -> {return result + " result3";}).thenAccept((result)->{System.out.println(result);});}

如果在 supplyAsync 任务中出现异常,后续的 thenApply 和 thenAccept 回调都不会执行,CompletableFuture 将转入异常处理

如果在第一个 thenApply 任务中出现异常,第二个 thenApply 和 最后的 thenAccept 回调不会被执行,CompletableFuture 将转入异常处理,依次类推。

5.1 exceptionally()

exceptionally 用于处理回调链上的异常,回调链上出现的任何异常,回调链不继续向下执行,都在exceptionally中处理异常。

// Throwable表示具体的异常对象e
CompletableFuture<R> exceptionally(Function<Throwable, R> func)
public class ExceptionallyDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result1";}).thenApply(result -> {String str = null;int len = str.length();return result + " result2";}).thenApply(result -> {return result + " result3";}).exceptionally(ex -> {System.out.println("出现异常:" + ex.getMessage());return "Unknown";});String ret = future.get();Tools.printThreadLog("最终结果:" + ret);}
}

 因为exceptionally只处理一次异常,所以常常用在回调链的末端。

5.2 handle()

CompletableFuture API 还提供了一种更通用的方法 handle() 表示从异常中恢复

handle() 常常被用来恢复回调链中的一次特定的异常,回调链恢复后可以进一步向下传递。

CompletableFuture<R> handle(BiFunction<T, Throwable, R> fn)
public class HandleDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result";}).handle((ret, ex) -> {if(ex != null) {System.out.println("我们得到异常:" + ex.getMessage());return "Unknown!";}return ret;});String ret = future.get();CommonUtils.printThreadLog(ret);}
}

 如果发生异常,则res参数将为null,否则ex参数将为null。

需求:对回调链中的一次异常进行恢复处理

public class HandleExceptionDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {int r = 1 / 0;return "result1";}).handle((ret, ex) -> {if (ex != null) {System.out.println("我们得到异常:" + ex.getMessage());return "Unknown1";}return ret;}).thenApply(result -> {String str = null;int len = str.length();return result + " result2";}).handle((ret, ex) -> {if (ex != null) {System.out.println("我们得到异常:" + ex.getMessage());return "Unknown2";}return ret;}).thenApply(result -> {return result + " result3";});String ret = future.get();Tools.printThreadLog("最终结果:" + ret);}
}

和以往一样,为了提升并行化,异常处理可以方法单独的线程执行,以下是它们的异步回调版本

CompletableFuture<R> exceptionally(Function<Throwable, R> fn)
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn)  // jdk17+
CompletableFuture<R> exceptionallyAsync(Function<Throwable, R> fn,Executor executor) // jdk17+CompletableFuture<R> handle(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn)
CompletableFuture<R> handleAsync(BiFunction<T,Throwable,R> fn, Executor executor)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_996513.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小程序API能力集成指南——画布API汇总(四)

CanvasContext canvas 组件的绘图上下文。 方法如下&#xff08;3&#xff09;&#xff1a; scale CanvasContext.scale CanvasContext.scale(number scaleWidth, number scaleHeight) 功能描述 在调用后&#xff0c;之后创建的路径其横纵坐标会被缩放。多次调用倍数会相…

什么是VR虚拟现实|虚拟科技博物馆|VR设备购买

虚拟现实&#xff08;Virtual Reality&#xff0c;简称VR&#xff09;是一种通过计算机技术模拟出的一种全新的人机交互方式。它可以通过专门的设备&#xff08;如头戴式显示器&#xff09;将用户带入一个计算机生成的虚拟环境之中&#xff0c;使用户能够与这个虚拟环境进行交互…

基于“xxx” Androidx平台的驱动及系统开发 之 触摸板篇

目录 一、基于全志 A133 Android10平台&#xff0c;适配1366x768 - ilitek2511触摸1、原理图分析2、驱动移植与适配3、补丁和资源文件 二、基于瑞芯微 RK3566 Android11平台&#xff0c;适配GT9XX触摸1、原理图分析2、补丁及资源文件 三、遇到的问题与解决1、基于amlogic Andro…

Sodinokibi勒索病毒最新变种,勒索巨额赎金

前言 Sodinokibi勒索病毒在国内首次被发现于2019年4月份&#xff0c;2019年5月24日首次在意大利被发现&#xff0c;在意大利被发现使用RDP攻击的方式进行传播感染&#xff0c;这款病毒被称为GandCrab勒索病毒的接班人&#xff0c;在短短几个月的时间内&#xff0c;已经在全球大…

leetcode 3068. 最大节点价值之和【树形dp】

原题链接&#xff1a;3068. 最大节点价值之和 题目描述&#xff1a; 给你一棵 n 个节点的 无向 树&#xff0c;节点从 0 到 n - 1 编号。树以长度为 n - 1 下标从 0 开始的二维整数数组 edges 的形式给你&#xff0c;其中 edges[i] [ui, vi] 表示树中节点 ui 和 vi 之间有一…

Sentinel 面试题及答案整理,最新面试题

Sentinel的流量控制规则有哪些&#xff0c;各自的作用是什么&#xff1f; Sentinel的流量控制规则主要包括以下几种&#xff1a; 1、QPS&#xff08;每秒查询量&#xff09;限流&#xff1a; 限制资源每秒的请求次数&#xff0c;适用于控制高频访问。 2、线程数限流&#xf…

Linux系统——SElinux

目录 前言 一、SELinux 的作用及权限管理机制 1.SELinux 的作用 1.1DAC 1.2MAC 1.3DAC 和 MAC 的对比 2.SELinux 基本概念 2.1主体&#xff08;Subject&#xff09; 2.2对象&#xff08;Object&#xff09; 2.3政策和规则&#xff08;Policy & Rule&#xff09;…

数据结构/C++:二叉搜索树

数据结构/C&#xff1a;二叉搜索树 概念模拟实现结构分析插入中序遍历查找删除析构函数拷贝构造赋值重载递归查找递归插入递归删除 总代码展示 概念 二叉搜索树&#xff08;BST - Binary Search Tree&#xff09;是一种特殊的二叉树&#xff0c;每个顶点最多可以有两个子节点。…

scrapy 爬虫:多线程爬取去微博热搜排行榜数据信息,进入详情页面拿取第一条微博信息,保存到本地text文件、保存到excel

如果想要保存到excel中可以看我的这个爬虫 使用Scrapy 框架开启多进程爬取贝壳网数据保存到excel文件中&#xff0c;包括分页数据、详情页数据&#xff0c;新手保护期快来看&#xff01;&#xff01;仅供学习参考&#xff0c;别乱搞_爬取贝壳成交数据c端用户登录-CSDN博客 最终…

如何修炼成“神医”——《OceanBase诊断系列》之一

本系列是基于OcenaBase 开发工程师在工作中的一些诊断经验&#xff0c;也欢迎大家分享相关经验。 1. 关于神医的故事 扁鹊&#xff0c;中国古代第一个被正史记载的医生&#xff0c;他的成才之路非常传奇。年轻时&#xff0c;扁鹊是一家客栈的主管。有一位名叫长桑君的客人来到…

YOLOv8基础必需运用【目标检测、分割、姿势估计、跟踪和分类任务】

文章目录 前言1、环境安装2.1安装torch相关库2.2 获取yolov8最新版本&#xff0c;并安装依赖 3. 如何使用模型用于各种CV任务方式一&#xff1a;命令行形式方式二&#xff1a;python代码形式示例3.1 目标检测任务实现代码运行结果检测视频代码 3.2 分割任务实现代码运行效果分割…

蓝桥杯前端Web赛道-收集帛书碎片

蓝桥杯前端Web赛道-收集帛书碎片 题目链接&#xff1a;1.收集帛书碎片 - 蓝桥云课 (lanqiao.cn) 题目要求&#xff1a; 其实通过这个目标我们可以看出其实就是筛选出数组中多余的元素&#xff0c;那么我们可以通过includes来判断&#xff0c;当前数组是否已经拥有过这个碎片…

Maven入门(作用,安装配置,Idea基础maven,Maven依赖,Maven构建项目)【详解】

目录 一. Maven的作用 1.依赖管理 2.统一项目结构 3.项目构建 二.Maven安装配置 1. Maven的仓库类型 2 加载jar的顺序 3. Maven安装配置 4.安装Maven 5.配置仓库 三.idea集成maven 1.给当前project集成maven 2.给新建project集成maven 3.创建maven项目 4.pom…

【C语言】glibc

一、获取源码 apt install glibc-source 在Debian系统中&#xff0c;通过apt install glibc-source命令安装的glibc源码通常会被放置在/usr/src/glibc目录下。安装完成后&#xff0c;可能需要解压缩该源码包。以下是解压缩源码包的步骤&#xff1a; 1. 打开终端。 2. 切换到源…

图片在div完全显示

效果图&#xff1a; html代码&#xff1a; <div class"container" style" display: flex;width: 550px;height: 180px;"><div class"box" style" color: red; background-color:blue; width: 50%;"></div><div …

机器学习-面经(part7、无监督学习)

机器学习面经系列的其他部分如下所示&#xff1a; 机器学习-面经&#xff08;part1&#xff09; 机器学习-面经(part2)-交叉验证、超参数优化、评价指标等内容 机器学习-面经(part3)-正则化、特征工程面试问题与解答合集机器学习-面经(part4)-决策树共5000字的面试问题与解答…

Rust泛型与trait特性,模仿接口的实现

泛型是一个编程语言不可或缺的机制。 C 语言中用"模板"来实现泛型&#xff0c;而 C 语言中没有泛型的机制&#xff0c;这也导致 C 语言难以构建类型复杂的工程。 泛型机制是编程语言用于表达类型抽象的机制&#xff0c;一般用于功能确定、数据类型待定的类&#xf…

备战蓝桥杯————二分查找(二)

引言 在上一篇博客中&#xff0c;我们深入探讨了二分搜索算法及其在寻找数组左侧边界的应用。二分搜索作为一种高效的查找方法&#xff0c;其核心思想在于通过不断缩小搜索范围来定位目标值。在本文中&#xff0c;我们将继续这一主题&#xff0c;不仅会回顾二分搜索的基本原理&…

基于qt的图书管理系统----05其他优化

参考b站&#xff1a;视频连接 源码github&#xff1a;github 目录 1 优化借阅记录显示2 时间显示为年月日3 注册接口 1 优化借阅记录显示 现在只能显示部分信息&#xff0c;把接的书名和人的信息全部显示 在sql语句里替换为这一句即可实现查询相关联的所有信息 QString str…

【leetcode】相交链表

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家刷题&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 点击查看题目 思路: struct ListNode *getIntersectionNode(struct ListNode *headA, struct ListNode *he…