1.使用线程池
1.1.ExecutorService介绍
Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。
如果可以复用一组线程:
那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。
Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:
// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);
因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:
- FixedThreadPool:线程数固定的线程池;
- CachedThreadPool:线程数根据任务动态调整的线程池;
- SingleThreadExecutor:仅单线程执行的线程池。
创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:
// thread pool
import java.util.concurrent.*;public class Main {public static void main(String[] args) {// 创建一个固定大小的线程池:ExecutorService es = Executors.newFixedThreadPool(4);for (int i = 0; i < 6; i++) {es.submit(new Task("" + i));}// 关闭线程池:es.shutdown();}
}class Task implements Runnable {private final String name;public Task(String name) {this.name = name;}@Overridepublic void run() {System.out.println("start task " + name);try {Thread.sleep(1000);} catch (InterruptedException e) {}System.out.println("end task " + name);}
}
我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。
线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。
如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。
如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
因此,想创建指定动态范围的线程池,可以这么写:
int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
1.2.ScheduledThreadPool
还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。
创建一个ScheduledThreadPool仍然是通过Executors类:
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
我们可以提交一次性任务,它会在指定延迟后只执行一次:
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
如果任务以固定的每3秒执行,我们可以这样写:
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
如果任务以固定的3秒为间隔执行,我们可以这样写:
// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:
而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:
因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。
Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer。
1.3.小结
JDK提供了ExecutorService实现了线程池功能:
- 线程池内部维护一组线程,可以高效执行大量小任务;
- Executors提供了静态方法创建不同类型的ExecutorService;
- 必须调用shutdown()关闭ExecutorService;
- ScheduledThreadPool可以定期调度多个任务。
2.使用Future
2.1.Runnable和Callable接口介绍
在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行:
class Task implements Runnable {public String result;public void run() {this.result = longTimeCalculation(); }
}
Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:
class Task implements Callable<String> {public String call() throws Exception {return longTimeCalculation(); }
}
并且Callable接口是一个泛型接口,可以返回指定类型的结果。
现在的问题是,如何获得异步执行的结果?
如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:
- get():获取结果(可能会等待)
- get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
- cancel(boolean mayInterruptIfRunning):取消当前任务;
- isDone():判断任务是否已完成。
2.2.小结
- 对线程池提交一个Callable任务,可以获得一个Future对象;
- 可以用Future在将来某个时刻获取结果。
3.使用CompletableFuture【待理解】
3.1.CompletableFuture接口介绍示例
使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
我们以获取股票价格为例,看看如何使用CompletableFuture:
// CompletableFuture
import java.util.concurrent.CompletableFuture;public class Main {public static void main(String[] args) throws Exception {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);// 如果执行成功:cf.thenAccept((result) -> {System.out.println("price: " + result);});// 如果执行异常:cf.exceptionally((e) -> {e.printStackTrace();return null;});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static Double fetchPrice() {try {Thread.sleep(100);} catch (InterruptedException e) {}if (Math.random() < 0.3) {throw new RuntimeException("fetch price failed!");}return 5 + Math.random() * 20;}
}
创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:
public interface Supplier<T> {T get();
}
这里我们用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。
紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:
public interface Consumer<T> {void accept(T t);
}
异常时,CompletableFuture会调用Function对象:
public interface Function<T, R> {R apply(T t);
}
这里我们都用lambda语法简化了代码。
可见CompletableFuture的优点是:
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行。
3.2.多个CompletableFuture串行执行
如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:
// CompletableFuture
import java.util.concurrent.CompletableFuture;public class Main {public static void main(String[] args) throws Exception {// 第一个任务:CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油");});// cfQuery成功后继续执行下一个任务:CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {return fetchPrice(code);});// cfFetch成功后打印结果:cfFetch.thenAccept((result) -> {System.out.println("price: " + result);});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(2000);}static String queryCode(String name) {try {Thread.sleep(100);} catch (InterruptedException e) {}return "601857";}static Double fetchPrice(String code) {try {Thread.sleep(100);} catch (InterruptedException e) {}return 5 + Math.random() * 20;}
}
3.3.多个CompletableFuture并行执行
例如,我们考虑这样的场景:同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
// CompletableFuture
import java.util.concurrent.CompletableFuture;public class Main {public static void main(String[] args) throws Exception {// 两个CompletableFuture执行异步查询:CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油", "https://finance.sina.com.cn/code/");});CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {return queryCode("中国石油", "https://money.163.com/code/");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);// 两个CompletableFuture执行异步查询:CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {return fetchPrice((String) code, "https://finance.sina.com.cn/price/");});CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {return fetchPrice((String) code, "https://money.163.com/price/");});// 用anyOf合并为一个新的CompletableFuture:CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);// 最终结果:cfFetch.thenAccept((result) -> {System.out.println("price: " + result);});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static String queryCode(String name, String url) {System.out.println("query code from " + url + "...");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {}return "601857";}static Double fetchPrice(String code, String url) {System.out.println("query price from " + url + "...");try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {}return 5 + Math.random() * 20;}
}
上述逻辑实现的异步查询规则实际上是:
除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
最后我们注意CompletableFuture的命名规则:
- xxx():表示该方法将继续在已有的线程中执行;
- xxxAsync():表示将异步在线程池中执行。
3.4.小结
CompletableFuture可以指定异步处理流程:
- thenAccept()处理正常结果;
- exceptional()处理异常结果;
- thenApplyAsync()用于串行化另一个CompletableFuture;
- anyOf()和allOf()用于并行化多个CompletableFuture。
4.使用ForkJoin
Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
- Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
- ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask或RecursiveAction。
- 使用Fork/Join模式可以进行并行计算以提高效率。
在此不做详细展示,链接:https://www.liaoxuefeng.com/wiki/1252599548343744/1306581226487842
5.使用ThreadLocal
- ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;
- ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);
- 使用ThreadLocal要用try … finally结构,并在finally中清除。
详细链接:https://www.liaoxuefeng.com/wiki/1252599548343744/1306581251653666