新来个阿里 P7,仅花 2 小时,做出一个多线程永动任务,看完直接跪了

news/2024/5/5 17:56:05/文章来源:https://blog.csdn.net/Candyz7/article/details/127065251

今天教大家做一个 Java 的多线程永动任务,这个示例的原型是公司自研的多线程异步任务项目,我把里面涉及到多线程的代码抽离出来,然后进行一定的改造。

里面涉及的知识点非常多,特别适合有一定工作经验的同学学习,或者可以直接拿到项目中使用。

文章结构非常简单:

1. 功能说明

做这个多线程异步任务,主要是因为我们有很多永动的异步任务,什么是永动呢?就是任务跑起来后,需要一直跑下去。

比如消息 Push 任务,因为一直有消息过来,所以需要一直去消费 DB 中的未推送消息,就需要整一个 Push 的永动异步任务。

我们的需求其实不难,简单总结一下:

  • 能同时执行多个永动的异步任务;
  • 每个异步任务,支持开多个线程去消费这个任务的数据;
  • 支持永动异步任务的优雅关闭,即关闭后,需要把所有的数据消费完毕后,再关闭。

完成上面的需求,需要注意几个点:

  • 每个永动任务,可以开一个线程去执行;
  • 每个子任务,因为需要支持并发,需要用线程池控制;
  • 永动任务的关闭,需要通知子任务的并发线程,并支持永动任务和并发子任务的优雅关闭。

2. 多线程任务示例

2.1 线程池

对于子任务,需要支持并发,如果每个并发都开一个线程,用完就关闭,对资源消耗太大,所以引入线程池:

public class TaskProcessUtil {private static Map<String, ExecutorService> executors = new ConcurrentHashMap<>();private static ExecutorService init(String poolName, int poolSize) {return new ThreadPoolExecutor(poolSize, poolSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("Pool-" + poolName).setDaemon(false).build(),new ThreadPoolExecutor.CallerRunsPolicy());}public static ExecutorService getOrInitExecutors(String poolName,int poolSize) {ExecutorService executorService = executors.get(poolName);if (null == executorService) {synchronized (TaskProcessUtil.class) {executorService = executors.get(poolName);if (null == executorService) {executorService = init(poolName, poolSize);executors.put(poolName, executorService);}}}return executorService;}public static void releaseExecutors(String poolName) {ExecutorService executorService = executors.remove(poolName);if (executorService != null) {executorService.shutdown();}}
}

这是一个线程池的工具类,这里初始化线程池和回收线程资源很简单,我们主要讨论获取线程池。

获取线程池可能会存在并发情况,所以需要加一个 synchronized 锁,然后锁住后,需要对 executorService 进行二次判空校验。

2.2 单个任务

为了更好讲解单个任务的实现方式,我们的任务主要就是把 Cat 的数据打印出来,Cat 定义如下:

@Data
@Service
public class Cat {private String catName;public Cat setCatName(String name) {this.catName = name;return this;}
}

单个任务主要包括以下功能:

获取永动任务数据:这里一般都是扫描 DB,我直接就简单用 queryData() 代替。

多线程执行任务:需要把数据拆分成 4 份,然后分别由多线程并发执行,这里可以通过线程池支持;

永动任务优雅停机:当外面通知任务需要停机,需要执行完剩余任务数据,并回收线程资源,退出任务;

永动执行:如果未收到停机命令,任务需要一直执行下去。

直接看代码:

public class ChildTask {private final int POOL_SIZE = 3; private final int SPLIT_SIZE = 4; private String taskName;protected volatile boolean terminal = false;public ChildTask(String taskName) {this.taskName = taskName;}public void doExecute() {int i = 0;while(true) {System.out.println(taskName + ":Cycle-" + i + "-Begin");List<Cat> datas = queryData();taskExecute(datas);System.out.println(taskName + ":Cycle-" + i + "-End");if (terminal) {break;}i++;}TaskProcessUtil.releaseExecutors(taskName);}public void terminal() {terminal = true;System.out.println(taskName + " shut down");}private void doProcessData(List<Cat> datas, CountDownLatch latch) {try {for (Cat cat : datas) {System.out.println(taskName + ":" + cat.toString() + ",ThreadName:" + Thread.currentThread().getName());Thread.sleep(1000L);}} catch (Exception e) {System.out.println(e.getStackTrace());} finally {if (latch != null) {latch.countDown();}}}private void taskExecute(List<Cat> sourceDatas) {if (CollectionUtils.isEmpty(sourceDatas)) {return;}List<List<Cat>> splitDatas = Lists.partition(sourceDatas, SPLIT_SIZE);final CountDownLatch latch = new CountDownLatch(splitDatas.size());for (final List<Cat> datas : splitDatas) {ExecutorService executorService = TaskProcessUtil.getOrInitExecutors(taskName, POOL_SIZE);executorService.submit(new Runnable() {@Overridepublic void run() {doProcessData(datas, latch);}});}try {latch.await();} catch (Exception e) {System.out.println(e.getStackTrace());}}private List<Cat> queryData() {List<Cat> datas = new ArrayList<>();for (int i = 0; i < 5; i ++) {datas.add(new Cat().setCatName("罗小黑" + i));}return datas;}
}

简单解释一下:

  • queryData:用于获取数据,实际应用中其实是需要把 queryData 定为抽象方法,然后由各个任务实现自己的方法。
  • doProcessData:数据处理逻辑,实际应用中其实是需要把 doProcessData 定为抽象方法,然后由各个任务实现自己的方法。
  • taskExecute:将数据拆分成 4 份,获取该任务的线程池,并交给线程池并发执行,然后通过 latch.await() 阻塞。当这 4 份数据都执行成功后,阻塞结束,该方法才返回。
  • terminal:仅用于接受停机命令,这里该变量定义为 volatile,所以多线程内存可见;

doExecute:程序执行入口,封装了每个任务执行的流程,当 terminal=true 时,先执行完任务数据,然后回收线程池,最后退出。

2.3 任务入口

直接上代码:

public class LoopTask {private List<ChildTask> childTasks;public void initLoopTask() {childTasks = new ArrayList();childTasks.add(new ChildTask("childTask1"));childTasks.add(new ChildTask("childTask2"));for (final ChildTask childTask : childTasks) {new Thread(new Runnable() {@Overridepublic void run() {childTask.doExecute();}}).start();}}public void shutdownLoopTask() {if (!CollectionUtils.isEmpty(childTasks)) {for (ChildTask childTask : childTasks) {childTask.terminal();}}}public static void main(String args[]) throws Exception{LoopTask loopTask = new LoopTask();loopTask.initLoopTask();Thread.sleep(5000L);loopTask.shutdownLoopTask();}
}

每个任务都开一个单独的 Thread,这里我初始化了 2 个永动任务,分别为 childTask1 和 childTask2,然后分别执行,后面 Sleep 了 5 秒后,再关闭任务,我们可以看看是否可以按照我们的预期优雅退出。

2.4 结果分析

执行结果如下:

childTask1:Cycle-0-Begin
childTask2:Cycle-0-Begin
childTask1:Cat(catName=罗小黑0),ThreadName:Pool-childTask1
childTask1:Cat(catName=罗小黑4),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑4),ThreadName:Pool-childTask2
childTask2:Cat(catName=罗小黑0),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑1),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑1),ThreadName:Pool-childTask2
childTask2:Cat(catName=罗小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑3),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑3),ThreadName:Pool-childTask1
childTask2:Cycle-0-End
childTask2:Cycle-1-Begin
childTask1:Cycle-0-End
childTask1:Cycle-1-Begin
childTask2:Cat(catName=罗小黑0),ThreadName:Pool-childTask2
childTask2:Cat(catName=罗小黑4),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑4),ThreadName:Pool-childTask1
childTask1:Cat(catName=罗小黑0),ThreadName:Pool-childTask1
childTask1 shut down
childTask2 shut down
childTask2:Cat(catName=罗小黑1),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑1),ThreadName:Pool-childTask1
childTask1:Cat(catName=罗小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=罗小黑3),ThreadName:Pool-childTask1
childTask2:Cat(catName=罗小黑3),ThreadName:Pool-childTask2
childTask1:Cycle-1-End
childTask2:Cycle-1-End

输出数据:

  • “Pool-childTask” 是线程池名称;
  • “childTask” 是任务名称;
  • “Cat(catName=罗小黑)” 是执行的结果;
  • “childTask shut down” 是关闭标记;
  • “childTask:Cycle-X-Begin” 和“childTask:Cycle-X-End” 是每一轮循环的开始和结束标记。

我们分析一下执行结果:

  • childTask1 和 childTask2 分别执行,在第一轮循环中都正常输出了 5 条罗小黑数据;
  • 第二轮执行过程中,我启动了关闭指令,这次第二轮执行没有直接停止,而是先执行完任务中的数据,再执行退出,所以完全符合我们的优雅退出结论。

2.5 源码地址

GitHub 地址:

https://github.com/lml200701158/java-study/tree/master/src/main/java/com/java/parallel/pool/ofc

3. 写在最后

对于这个经典的线程池使用示例,原项目是我好友一灰写的,技术水平对标阿里 P7,实现得也非常优雅,涉及的知识点非常多,非常值得大家学习。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_203674.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ Reference: Standard C++ Library reference: C Library: cmath: logb

C官网参考链接&#xff1a;https://cplusplus.com/reference/cmath/logb/ 函数 <cmath> <ctgmath> logb C99 double logb(double x); float logbf(float x); long double logbl(long double x); C11 double logb(double x); float logb(float x); lo…

C++ Reference: Standard C++ Library reference: C Library: cmath: atanh

C官网参考链接&#xff1a;https://cplusplus.com/reference/cmath/atanh/ 函数 <cmath> <ctgmath> atanh C99 double atanh (double x); float atanhf (float x); long double atanhl (long double x); C11 double atanh (double x); float atanh (floa…

迅为IMX8MM开发板视频硬解码H264解码

我们解码并播放 H264 视频文件&#xff0c;输入以下命令 gst-launch-1.0 filesrc location/gstreamer/video/1080p_60fps_h264.mp4 typefindtrue ! \ video/quicktime ! aiurdemux ! queue max-size-time0 ! vpudec ! imxvideoconvert_g2d ! \ video/x-raw, formatRGB16, wi…

ffmpeg播放器(一) 视频解码与播放

1、环境搭建 首先需要导入所需要的包include、armeabi-v7a。 然后跟项目建立连接&#xff0c;在CMakeList.txt&#xff0c;并做了相关的解释&#xff1a; cmake_minimum_required(VERSION 3.4.1)file(GLOB source_file src/main/cpp/*.cpp) //cpp文件下所有的包 # Declares a…

打印字符串、排序、引用调用

设计一个函数print打印字符串&#xff0c;如果只传string型参数s&#xff0c;则字符串长度跟10比较&#xff0c;大于10&#xff0c;打印前10个字符&#xff0c;小于10&#xff0c;全部输出s&#xff1b;如果传string型参数s和int型n&#xff0c;则字符串长度跟n比较&#xff0c…

教师在初中数学课堂该如何有效提问(内有示例)

数学是中学数学的一个重要组成部分&#xff0c;它在培养学生的思维和创造力方面具有很大的作用。 在我国当前的教育体制改革与发展中&#xff0c;中学数学教学应按照新课标的要求&#xff0c;改变教学方法&#xff0c;提高学生的数学综合素质。但是&#xff0c;在实际的教学过…

高性能数据访问中间件 OBProxy(五):一文讲透数据路由

上篇文章我们介绍了 OBProxy 的连接管理&#xff0c;通过连接管理功能&#xff0c;OBProxy 和 OBServer 联系起来&#xff0c;同时 OBProxy 屏蔽了连接的复杂性&#xff0c;让用户使用起来和单机数据库一样简单。完成接入后&#xff0c;接下来的一个重要功能就是数据路由&#…

[luogu3980]志愿者招募

记$x_{i}$为第$i$类志愿者数量$,y_{j}=\sum_{j\in [s_{i},t_{i}]}x_{i}-a_{j}$​,则问题即$$\forall i\in [1,m],x_{i}\ge 0\\\forall j\in [1,n],y_{j}\ge 0\\y_{1}-\sum_{s_{i}=1}x_{i}=-a_{1}\\\sum_{t_{i}=n}x_{i}-y_{n}=a_{n}\\\forall j\in [2,n],y_{j}+\sum_{t_{i}=j-1…

redis主从+哨兵+集群模式搭建详解

一、redis主从安装 1. 下载redis Download | Redis 我这里选择的是redis-6.2.7版本 这里三台机器&#xff0c;都需要安装redis node1 192.168.157.128 node2 192.168.157.129 node3 192.168.157.130 2. 安装redis # 解压redis tar -zxvf redis-6.2.7.tar.gz # 编译安装…

数据分析 面经(已拿到offer)

北航计算机专业&#xff08;计院太卷&#xff0c;现考虑转向信息安全方向&#xff09;本科二年级&#xff0c;闲来无事找份日常实习试试水 考虑数分岗也是因为楼主目前大二&#xff0c;专业课学习不够深入&#xff0c;开发技术尚不成熟&#xff0c;而sql、excel和数据可视化比…

四元数是什么

1、四元数的构成 四元数是简单的超复数&#xff0c;由实数加上三个虚数单位组成&#xff0c;主要用于在三维空间中表示旋转 四元数原理包含大量数学相关知识&#xff0c;较为复杂&#xff0c;比如&#xff1a;复数、四维空间等等 因此此文章只对其基本构成和基本公式进行学习…

多视图属性网络异常检测系列一

论文《Deep Anomaly Detection on Attributed Networks》近期会对多视图属性网络异常检测系列进行学习记录 这篇虽然不是多视图的,但可以说是属性网络上异常检测的典型,已是近年属性网络异常检测必参考的一篇文献。背景 由于属性网络中附加的节点属性补充了知识发现中的原始网…

.Net Redis的秒杀Dome和异步执行

1.先到官网下载Redis部署好 Redis 教程 | 菜鸟教程 2.创建一个上游业务项目&#xff08;这里用控制台项目了&#xff0c;Framwork4.7.2&#xff09; NuGet包下载SerivceStack.Redis 创建一个RedisMessgaeQueue(Redis连接帮助类) using ServiceStack.Redis; using System;name…

PCIe系列专题之三:3.0 数据链路层概述

一、故事前传 之前我们讲了对PCIe的一些基础概念作了一个宏观的介绍&#xff0c;了解了PCIe是一种封装分层协议&#xff08;packet-based layered protocol),主要包括事务层&#xff08;Transaction layer), 数据链路层&#xff08;Data link layer)和物理层&#xff08;Physi…

MySQL 常用数据类型说明

目录 MySQL中常用的数据类型 整型 整型声明 整型属性 整型的选择 浮点型 定点数类型 浮点数和定点数的区别 时间日期类型 DATE类型 TIME类型 DATETIME类型 YEAR类型 文本字符串 CHAR与VARCHAR类型 TEXT类型 ​编辑 枚举类型(ENUM) MySQL中常用的数据类型 数据类…

直播平台怎么搭建,实现js开光灯效果

直播平台怎么搭建,实现js开光灯效果<!DOCTYPE html><html><head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>点击切换灯亮</t…

Spring容器与依赖注入(DI)

1 Spring框架简介 1.1 什么是Spring Spring框架是一个开源的轻量级的DI和AOP容器框架&#xff0c;致力于简化企业级应用开发&#xff0c;让开发者使用简单的Java Bean来实现从前只有EJB才能实现的功能。 1.2 为什么要使用Spring Spring堪称Java世界中最强大的框架&#xff0c;…

单调栈题目:柱状图中最大的矩形

文章目录题目标题和出处难度题目描述要求示例数据范围解法思路和算法代码复杂度分析题目 标题和出处 标题&#xff1a;柱状图中最大的矩形 出处&#xff1a;84. 柱状图中最大的矩形 难度 7 级 题目描述 要求 给定整数数组 heights\texttt{heights}heights 表示柱状图中…

正弦信号发生器的设计

目 录 1 引言 1 2 总体结构设计 2 2.1 单片机概述 2 2.1.1 单片机的发展 2 2.1.2 单片机的用途 3 2.2 系统设计的功能 3 2.3 波形发生和输出频率的方法 4 2.3.1 波形发生的方法 4 2.3.2 输出频率的方法 4 3 系统硬件设计 5 3.1 硬件电路芯片的选择 5 3.1.1 CPU芯片 AT89C51 5 3…

MyBatis中的复杂映射

上一章中实现的MyBatis对象映射较为简单&#xff0c;对象中的属性和数据库中的表字段是一一对应的&#xff08;无论数量和名称都完全一样&#xff09;&#xff0c;如果对象中的属性名和表中的字段名不一致怎么办&#xff1f;又或者Java对象中存在复杂类型属性&#xff08;即类似…