多线程系列(十二) -生产者和消费者模型

news/2024/7/27 12:08:54/文章来源:https://blog.csdn.net/dxflqm_pz/article/details/136443791

一、简介

在 Java 多线程编程中,还有一个非常重要的设计模式,它就是:生产者和消费者模型。

这种模型可以充分发挥 cpu 的多线程特性,通过一些平衡手段能有效的提升系统整体处理数据的速度,减轻系统负载,提高程序的效率和稳定性,同时实现模块之间的解耦。

那什么是生产者和消费者模型呢?

简单的说,生产者和消费者之间不直接进行交互,而是通过一个缓冲区来进行交互,生产者负责生成数据,然后存入缓冲区;消费者则负责处理数据,从缓冲区获取。

大致流程图如下:

对于最简单的生产者和消费者模型,总结下来,大概有以下几个特点:

  • 缓冲区为空的时候,消费者不能消费,会进入休眠状态,直到有新数据进入缓冲区,再次被唤醒
  • 缓冲区填满的时候,生产者不能生产,也会进入休眠状态,直到缓冲区有空间,再次被唤醒

生产者和消费者模型作为一个非常重要的设计模型,它的优点在于:

  • 解耦:生产者和消费者之间不直接进行交互,即使生产者和消费者的代码发生变化,也不会对对方产生影响
  • 消峰:例如在某项工作中,假如 A 操作生产数据的速度很快,B 操作处理速度很慢,那么 A 操作就必须等待 B 操作完成才能结束,反之亦然。如果将 A 操作和B 操作进行解耦,中间插入一个缓冲区,这样 A 操作将生产的数据存入缓冲区,就接受了;B 操作从缓冲区获取数据并进行处理,平衡好 A 操作和 B 操作之间的缓冲区,可以显著提升系统的数据处理能力

生产者和消费者模型的应用场景非常多,例如 Java 的线程池任务执行框架、消息中间件 rabbitMQ 等,因此掌握生产者和消费者模型,对于开发者至关重要。

下面我们通过几个案例,一起来了解一下生产者和消费者设计模型的实践思路。

二、代码实践

2.1、利用 wait / notify 方法实现思路

生产者和消费者模型,最简单的一种技术实践方案就是基于线程的 wait() / notify() 方法,也就是通知和唤醒机制,可以将两个操作实现解耦,具体代码实践如下。

/*** 缓冲区容器类*/
public class Container {/*** 缓冲区最大容量*/private int capacity = 3;/*** 缓冲区*/private LinkedList<Integer> list = new LinkedList<Integer>();/*** 添加数据到缓冲区* @param value*/public synchronized void add(Integer value) {if(list.size() >= capacity){System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");try {// 进入等待状态wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);list.add(value);//唤醒其他所有处于wait()的线程,包括消费者和生产者notifyAll();}/*** 从缓冲区获取数据*/public synchronized void get() {if(list.size() == 0){System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");try {// 进入等待状态wait();} catch (InterruptedException e) {e.printStackTrace();}}// 从头部获取数据,并移除元素Integer val = list.removeFirst();System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);//唤醒其他所有处于wait()的线程,包括消费者和生产者notifyAll();}
}
/*** 生产者类*/
public class Producer extends Thread{private Container container;public Producer(Container container) {this.container = container;}@Overridepublic void run() {for (int i = 0; i < 6; i++) {container.add(i);}}
}
/*** 消费者类*/
public class Consumer extends Thread{private Container container;public Consumer(Container container) {this.container = container;}@Overridepublic void run() {for (int i = 0; i < 6; i++) {container.get();}}
}
/*** 测试类*/
public class MyThreadTest {public static void main(String[] args) {Container container = new Container();Producer producer = new Producer(container);Consumer consumer = new Consumer(container);producer.start();consumer.start();}
}

运行结果如下:

生产者:Thread-0,add:0
生产者:Thread-0,add:1
生产者:Thread-0,add:2
生产者:Thread-0,缓冲区已满,生产者进入waiting...
消费者:Thread-1,value:0
消费者:Thread-1,value:1
消费者:Thread-1,value:2
消费者:Thread-1,缓冲区为空,消费者进入waiting...
生产者:Thread-0,add:3
生产者:Thread-0,add:4
生产者:Thread-0,add:5
消费者:Thread-1,value:3
消费者:Thread-1,value:4
消费者:Thread-1,value:5

从日志上可以很清晰的看到,生产者线程生产一批数据之后,当缓冲区已经满了,会进入等待状态,此时会通知消费者线程;消费者线程处理完数据之后,当缓冲区没有数据时,也会进入等待状态,再次通知生产者线程。

2.2、利用 await / signal 方法实现思路

除此之外,我们还可以利用ReentrantLockCondition类中的 await() / signal() 方法实现生产者和消费者模型。

缓冲区容器类,具体代码实践如下。

/*** 缓冲区容器类*/
public class Container {private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();private int capacity = 3;private LinkedList<Integer> list = new LinkedList<Integer>();/*** 添加数据到缓冲区* @param value*/public void add(Integer value) {boolean flag = false;try {flag = lock.tryLock(3, TimeUnit.SECONDS);if(list.size() >= capacity){System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");// 进入等待状态condition.await();}System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);list.add(value);//唤醒其他所有处于wait()的线程,包括消费者和生产者condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {if(flag){lock.unlock();}}}/*** 从缓冲区获取数据*/public void get() {boolean flag = false;try {flag = lock.tryLock(3, TimeUnit.SECONDS);if(list.size() == 0){System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");// 进入等待状态condition.await();}// 从头部获取数据,并移除元素Integer val = list.removeFirst();System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);//唤醒其他所有处于wait()的线程,包括消费者和生产者condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {if(flag){lock.unlock();}}}
}

生产者、消费者、测试类代码,跟上文一致,运行结果和上文介绍的也是一样。

2.3、多生产者和消费者的实现思路

上面介绍的都是一个生产者线程和一个消费者线程,模型比较简单。实际上,在业务开发中,经常会出现多个生产者线程和多个消费者线程,按照以上的实现思路,会出现什么情况呢?

有可能会出现程序假死现象!下面我们来分析一下案例,假如有两个生产者线程 a1、a2,两个消费者线程 b1、b2,执行过程如下:

  • 1.生产者线程 a1 执行生产数据的操作,发现缓冲区数据已经填满了,然后进入等待阶段,同时向外发起通知,唤醒其它线程
  • 2.因为线程唤醒具有随机性,本应该唤醒消费者线程 b1,结果可能生产者线程 a2 被唤醒,检查缓冲区数据已经填满了,又进入等待阶段,紧接向外发起通知,消费者线程得不到被执行的机会
  • 3.消费者线程 b1、b2,也有可能会出现这个现象,本应该唤醒生产者线程,结果唤醒了消费者线程

遇到这种情况,应该如何解决呢?

因为ReentrantLockCondition的结合,编程具有高度灵活性,我们可以采用这种组合解决多生产者和多消费者中的假死问题。

具体实现逻辑如下:

/*** 缓冲区容器类*/
public class ContainerDemo {private Lock lock = new ReentrantLock();private Condition producerCondition = lock.newCondition();private Condition consumerCondition = lock.newCondition();private int capacity = 3;private LinkedList<Integer> list = new LinkedList<Integer>();/*** 添加数据到缓冲区* @param value*/public void add(Integer value) {boolean flag = false;try {flag = lock.tryLock(3, TimeUnit.SECONDS);if(list.size() >= capacity){System.out.println("生产者:"+ Thread.currentThread().getName()+",缓冲区已满,生产者进入waiting...");// 生产者进入等待状态producerCondition.await();}System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);list.add(value);// 唤醒所有消费者处于wait()的线程consumerCondition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {if(flag){lock.unlock();}}}/*** 从缓冲区获取数据*/public void get() {boolean flag = false;try {flag = lock.tryLock(3, TimeUnit.SECONDS);if(list.size() == 0){System.out.println("消费者:"+ Thread.currentThread().getName()+",缓冲区为空,消费者进入waiting...");// 消费者进入等待状态consumerCondition.await();}// 从头部获取数据,并移除元素Integer val = list.removeFirst();System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + val);// 唤醒所有生产者处于wait()的线程producerCondition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {if(flag){lock.unlock();}}}
}
/*** 生产者*/
public class Producer extends Thread{private ContainerDemo container;private Integer value;public Producer(ContainerDemo container, Integer value) {this.container = container;this.value = value;}@Overridepublic void run() {container.add(value);}
}
/*** 消费者*/
public class Consumer extends Thread{private ContainerDemo container;public Consumer(ContainerDemo container) {this.container = container;}@Overridepublic void run() {container.get();}
}
/*** 测试类*/
public class MyThreadTest {public static void main(String[] args) {ContainerDemo container = new ContainerDemo();List<Thread> threadList = new ArrayList<>();// 初始化6个生产者线程for (int i = 0; i < 6; i++) {threadList.add(new Producer(container, i));}// 初始化6个消费者线程for (int i = 0; i < 6; i++) {threadList.add(new Consumer(container));}// 启动线程for (Thread thread : threadList) {thread.start();}}
}

运行结果如下:

生产者:Thread-0,add:0
生产者:Thread-1,add:1
生产者:Thread-2,add:2
生产者:Thread-3,缓冲区已满,生产者进入waiting...
生产者:Thread-4,缓冲区已满,生产者进入waiting...
生产者:Thread-5,缓冲区已满,生产者进入waiting...
消费者:Thread-6,value:0
消费者:Thread-7,value:1
生产者:Thread-3,add:3
生产者:Thread-4,add:4
生产者:Thread-5,add:5
消费者:Thread-8,value:2
消费者:Thread-9,value:3
消费者:Thread-10,value:4
消费者:Thread-11,value:5

通过ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用对应的signalAll()方法就可以解决假死现象。

三、小结

最后我们来总结一下,对于生产者和消费者模型,通过合理的编程实现,可以充分充分发挥 cpu 多线程的特性,显著的提升系统处理数据的效率。

对于生产者和消费者模型中的假死现象,可以使用ReentrantLock定义两个Condition,进行交叉唤醒,以解决假死问题。

四、参考

1、https://www.cnblogs.com/xrq730/p/4855663.html

五、写到最后

最近无意间获得一份阿里大佬写的技术笔记,内容涵盖 Spring、Spring Boot/Cloud、Dubbo、JVM、集合、多线程、JPA、MyBatis、MySQL 等技术知识。需要的小伙伴可以点击如下链接获取,资源地址:技术资料笔记。

不会有人刷到这里还想白嫖吧?点赞对我真的非常重要!在线求赞。加个关注我会非常感激!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_998409.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

http【详解】状态码,方法,接口设计 —— RestfuI API,头部 —— headers,缓存

http 状态码 1xx 服务器收到请求 2xx 请求成功 200 成功 3xx 重定向&#xff08;目标服务器返回另一个服务器的地址&#xff0c;浏览器会自动去访问另一个服务器&#xff09; 常见应用场景&#xff1a;搜索引擎&#xff0c;短网址 301 永久重定向 &#xff08;常用于已停服的…

Leetcoder Day42| 动态规划part09 打家劫舍问题

198.打家劫舍 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 给定一个代表每个房…

Mybatis-Plus——06,CRUD查

CRUD查 一、普通查询1.1、通过id查询单个用户1.2、通过id查询多个用户1.3、条件查询 通过map封装 二、分页查询2.1、配置分页插件2.2、运行方法 三、通过wrapper条件构造器查询3.1、查询name不为空&#xff0c;email不为空&#xff0c;age大于18的用户3.2、查询nameJone的用户3…

利用Python副业赚钱,看完这篇你就懂了!_自学python能干些什么副业?

Python都可以做哪些副业&#xff1f; 1、兼职处理数据Excel整理数据功能虽然很强大&#xff0c;但在Python面前&#xff0c;曾经统治职场的它也的败下阵来。因为Python在搜集数据整理分析数据的过程中更加便捷&#xff0c;通过几行代码还可以实现自动化操作。 如果你学会Pyth…

大数据技术学习笔记(五)—— MapReduce(1)

目录 1 MapReduce 概述1.1 MapReduce 定义1.2 MapReduce 优缺点1.3 MapReduce 核心思想1.4 MapReduce 进程1.5 Hadoop 序列化类型1.6 MapReduce 编程规范1.7 WordCount 案例实操1.7.1 案例需求1.7.2 环境准备1.7.3 编写程序1.7.4 测试 2 MapReduce 序列化2.1 序列化概述2.2 自定…

Java数组常用操作

创建数组 int[] a {1,2,3};int[] a new int[]{1,2,3};int[] a new int[3];ArrayList<Integer> arr new ArrayList<>(); 添加元素 arr.add(99); //将99加入到数组末尾arr.add(3,99); //将99加入到指定索引3处访问元素 int c1 c[1]; int arr1 arr.get(1); …

kafka报文模拟工具的使用

日常项目中经常会碰到消费kafka某个topic的数据&#xff0c;如果知道报文格式&#xff0c;即可使用工具去模拟发送报文&#xff0c;以此测试代码中是否能正常消费到这个数据。 工具资源已上传&#xff0c;可直接访问连接下载&#xff1a;https://download.csdn.net/download/w…

【AI+应用】模仿爆款视频二次创作短视频操作步骤

本来不想水这篇的&#xff0c; 剪辑软件估计很多人用的比我还6。 今天自己遇到1个需求&#xff0c;我看到一篇公众号文章的视频觉得有意思&#xff0c;但视频有点长&#xff0c;我没带耳机看视频的习惯&#xff0c;就想着能不能下载下来&#xff0c; 提取视频的音频转为文字&am…

听 GPT 讲 client-go 源代码 (21)

分享更多精彩内容&#xff0c;欢迎关注&#xff01; File: client-go/applyconfigurations/storage/v1beta1/volumeattachmentstatus.go 在client-go中&#xff0c;client-go/applyconfigurations/storage/v1beta1/volumeattachmentstatus.go文件是用于处理VolumeAttachmentSta…

即插即用篇 | YOLOv8 引入 ParNetAttention 注意力机制 | 《NON-DEEP NETWORKS》

论文名称:《NON-DEEP NETWORKS》 论文地址:https://arxiv.org/pdf/2110.07641.pdf 代码地址:https://github.com/imankgoyal/NonDeepNetworks 文章目录 1 原理2 源代码3 添加方式4 模型 yaml 文件template-backbone.yamltemplate-small.yamltemplate-large.yaml

Redis中的SCAN渐进式扫描底层原理

Scan渐进式扫描原理 概述 由于Redis是单线程再处理用户的命令&#xff0c;而Keys命令会一次性遍历所有key&#xff0c;于是在命令执行过程中&#xff0c;无法执行其他命令。这就导致如果Redis中的key比较多&#xff0c;那么Keys命令执行时间就会比较长&#xff0c;从而阻塞Re…

MySQL面试题-锁(答案版)

锁 1、MySQL 有哪些锁&#xff1f; &#xff08;1&#xff09;全局锁 加了全局锁之后&#xff0c;整个数据库就处于只读状态了&#xff0c;这时其他线程执行以下操作&#xff0c;都会被阻塞&#xff1a; 对数据的增删改操作&#xff0c;比如 insert、delete、update等语句&…

JavaScript极速入门(2)

JQuery W3C标准给我们提供了一系列函数,让我们可以操作: 网页内容 网页结构 网页样式 但是原生的JavaScript提供的API操作DOM元素时,代码比较繁琐,冗长.我们学习使用JQuery来操作页面对象. JQuery是一个快速,简洁且功能丰富的JavaScript框架,于2006年发布.它封装JavaScript常…

用开发CesiumJS模拟飞机飞行应用(一,基本功能)

本部分向您展示如何构建您的第一个 Cesium 应用程序&#xff0c;以可视化模拟从旧金山到哥本哈根的真实航班&#xff0c;并使用 FlightRadar24收集的雷达数据。您将学习如何&#xff1a; 在网络上设置并部署您的 Cesium 应用程序。 添加全球 3D 建筑物、地形和图像的基础图层。…

微服务架构 | 多级缓存

INDEX 通用设计概述2 优势3 最佳实践 通用设计概述 通用设计思路如下图 内容分发网络&#xff08;CDN&#xff09; 可以理解为一些服务器的副本&#xff0c;这些副本服务器可以广泛的部署在服务器提供服务的区域内&#xff0c;并存有服务器中的一些数据。 用户访问原始服务器…

【2024.03.05】定时执行专家 V7.1 发布 - TimingExecutor V7.1 Release

目录 ▉ 软件介绍 ▉ 新版本 V7.1 下载地址 ▉ V7.1 新功能 ▼2024-03-03 V7.1 - 更新日志 ▉ V7.0 新UI设计 ▉ 软件介绍 《定时执行专家》是一款制作精良、功能强大、毫秒精度、专业级的定时任务执行软件。软件具有 25 种【任务类型】、12 种【触发器】触发方式&#x…

【Python】成功解决TypeError: ‘float‘ object is not iterable

【Python】成功解决TypeError: ‘float’ object is not iterable &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得…

2024 GoLand激活,分享几个GoLand激活的方案

文章目录 GoLand公司简介我这边使用GoLand的理由GoLand 最新变化GoLand 2023.3 最新变化AI Assistant 正式版GoLand 中的 AI Assistant&#xff1a;_Rename_&#xff08;重命名&#xff09;GoLand 中的 AI Assistant&#xff1a;_Write documentation_&#xff08;编写文档&…

Kubernetes(k8s第四部分之servers)

1&#xff0c;为什么不使用round-robin DNS&#xff1f; 因为DNS有缓存&#xff0c;不会清理&#xff0c;无法负载均衡 ipvs代理模式&#xff0c;这种模式&#xff0c;kube-proxy会监视Kubernetes Service 对象和Endpoints&#xff0c;调用netlink接口以相应地创建ipvs规则并…

HTML使用

文章目录 一、简介二、HTML快速入门三、基础标签四、图片、音频、视频标签五、超链接标签六、列表标签七、表格标签八、布局标签九、表单标签十、表单向标签 一、简介 二、HTML快速入门 ​ <html><head><title>你好</title></head><body>再…