线程池详解与异步任务编排使用案例
1.初始化线程的4种方式
1 )、继承Thread
2 )、实现 Runnable 接口
3 )、实现 Callable 接口+ FutureTask (可以拿到返回结果,可以处理异常)
4 )、线程池区别:1 、2 不能得到返回值。3 可以获取返回值1 、2 、3 都不能控制资源(无法控制线程数【高并发时线程数耗尽资源】)4 可以控制资源,性能稳定,不会一下子所有线程一起运行结论:实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】
2.创建线程池的方式
创建固定线程数的线程池ExecutorService
固定线程数的线程池
Executors . newFixedThreadPool ( 10 ) ;
execute和submit区别
作用:都是提交异步任务的execute:只能提交Runnable任务,没有返回值
submit:可以提交Runnable、Callable,返回值是FutureTask
创建原生线程池ThreadPoolExecutor
new ThreadPoolExecutor ( 5 , 200 , 10 , TimeUnit . SECONDS, new LinkedBlockingDeque < > ( 100000 ) , Executors . defaultThreadFactory ( ) , new ThreadPoolExecutor. AbortPolicy ( ) ) ; 7 个参数:corePoolSize: 核心线程数,不会被回收,接收异步任务时才会创建maximumPoolSize:最大线程数量,控制资源keepAliveime: maximumPoolSize- corePoolSize 无任务存活超过空闲时间则线程被释放TimeUnitunit : 时间单位workQueue: 阻塞队列,任务被执行之前保存在任务队列中,只要有线程空闲,就会从队列取出任务执行threadFactory: 线程的创建工厂【可以自定义】RejectedExecutionHandler handler:队列满后执行的拒绝策略线程池任务执行流程当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行当workQueue已满,且maximumPoolSize> corePoolSize时,新提交任务会创建新线程执行任务当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler 处理(默认策略抛出异常)当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,释放空闲线程当设置allowCoreThreadTimeOut ( true ) 时,该参数默认false ,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
拒绝策略
DiscardOldestPolicy :丢弃最老的任务
AbortPolicy :丢弃当前任务,抛出异常【默认策略】
CallerRunsPolicy :同步执行run方法
DiscardPolicy :丢弃当前任务,不抛出异常
阻塞队列
1.new LinkedBlockingDeque<>();// 默认大小是Integer.Max会导致内存不足,所以要做压力测试给出适当的队列大小
线程池
1.常见的4种默认线程池
注意:回收线程 = maximumPoolSize - corePoolSize可缓冲线程池【CachedThreadPool 】:corePoolSize= 0 , maximumPoolSize= Integer . MAX_VALUE
定长线程池【FixedThreadPool 】:corePoolSize= maximumPoolSize
周期线程池【ScheduledThreadPool 】:指定核心线程数, maximumPoolSize= Integer . MAX_VALUE, 支持定时及周期性任务执行(一段时间之后再执行)
单任务线程池【SingleThreadPool 】:corePoolSize= maximumPoolSize= 1 ,从队列中获取任务(一个核心线程)Executors . newCachedThreadPool ( ) ;
Executors . newFixedThreadPool ( 10 ) ;
Executors . newScheduledThreadPool ( 10 ) ;
Executors . newSingleThreadExecutor ( ) ;
2.为什么使用线程池?
1.降低资源的消耗【减少创建销毁操作】通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗高并发状态下过多创建线程可能将资源耗尽
2.提高响应速度【控制线程个数】因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行(线程个数过多导致CPU调度慢)
3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【例如发送短信】,显存告警时关闭非核心线程池释放内存资源】线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
异步编排CompletableFuture
1.runXXX都是没有返回结果的,supplyXXX可以获取返回结果
2.可以传入自定义线程池,否则使用默认线程池
1.业务场景
4、5、6依赖1,得先知道sku是哪个spu下的
2.测试异步操作
supplyAsync
CompletableFuture < String > future1 = CompletableFuture . supplyAsync ( ( ) -> "测试使用" , executor) ;
System . out. println ( future1. get ( ) ) ;
thenRunAsync串行化
thenAcceptAsync串行化
thenApplyAsync串行化
CompletableFuture < String > future2 = future1. thenApplyAsync ( s -> s + " 链式调用" , executor) ;
System . out. println ( future2. get ( ) ) ;
whenCompleteAsync
CompletableFuture < String > future3 = future2. whenCompleteAsync ( ( result, exception) -> System . out. println ( "结果是:" + result + "----异常是:" + exception) ) ;
exceptionally
CompletableFuture < Integer > future4 = future3. thenApplyAsync ( ( s -> 1 / 0 ) , executor) ;
CompletableFuture < Integer > future5 = future4. exceptionally ( exception -> {
System . out. println ( "出现异常:" + exception) ;
return 10 ;
} ) ;
System . out. println ( "默认值:" + future5. get ( ) ) ;
handle
CompletableFuture < Integer > future6 = future3. thenApplyAsync ( ( s -> 1 / 0 ) , executor) . handle ( ( result, exception) -> { if ( exception == null ) { return result; } System . out. println ( "handle处理异常:" + exception) ; return 1 ;
} ) ;
System . out. println ( "handle处理返回结果:" + future6. get ( ) ) ;
两任务组合-都要完成
runAfterBothAsync
CompletableFuture < Integer > future01 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( "任务1执行" ) ; return 10 / 2 ;
} , executor) ;
CompletableFuture < String > future02 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( "任务2执行" ) ; return "hello" ;
} , executor) ;
CompletableFuture < Void > future03 = future01. runAfterBothAsync ( future02, ( ) -> { System . out. println ( "任务3执行" ) ;
} , executor) ;
thenAcceptBothAsync
CompletableFuture < Integer > future01 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( "任务1执行" ) ; return 10 / 2 ;
} , executor) ;
CompletableFuture < String > future02 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( "任务2执行" ) ; return "hello" ;
} , executor) ;
CompletableFuture < Void > future03 = future01. thenAcceptBothAsync ( future02, ( result1, result2) -> { System . out. println ( "任务3执行" ) ; System . out. println ( "任务1返回值:" + result1) ; System . out. println ( "任务2返回值:" + result2) ; } , executor) ;
thenCombineAsync
CompletableFuture < Integer > future01 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( "任务1执行" ) ; return 10 / 2 ;
} , executor) ;
CompletableFuture < String > future02 = CompletableFuture . supplyAsync ( ( ) -> { System . out. println ( "任务2执行" ) ; return "hello" ;
} , executor) ;
CompletableFuture < String > future03 = future01. thenCombineAsync ( future02, ( result1, result2) -> { System . out. println ( "任务3执行" ) ; System . out. println ( "任务1返回值:" + result1) ; System . out. println ( "任务2返回值:" + result2) ; return "任务3返回值" ; } , executor) ;
System . out. println ( future03. get ( ) ) ;
两任务组合-任一完成
runAfterEitherAsync
acceptEitherAsync
applyToEitherAsync
多任务组合
allOf
CompletableFuture < Void > allOf = CompletableFuture . allOf ( future01, future02, future03) ;
allOf. get ( ) ;
anyOf
CompletableFuture < Object > anyOf = CompletableFuture . anyOf ( future01, future02, future03) ;
anyOf. get ( ) ;