文章目录
- 3.生产者和消费者模式
- 前言
- 3.1生产者和消费者模式概述
- 3.2生产者和消费者案例
- 3.3 阻塞队列基本使用
- 3.4 阻塞队列实现等待唤醒机制
- 总结
3.生产者和消费者模式
前言
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线
程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必
须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大
于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问
题,所以便有了生产者和消费者模式。
3.1生产者和消费者模式概述
-
概述
- 生产者消费者模式是一个十分经典的 多线程协作的模式,弄懂生产者消费者问题能够让我们对多线程编程的理解更加深刻。
- 所谓生产者消费者问题,实际上主要包含了两类线程:
- 一类是生产者线程用于生产数据。
- 一类是消费者线程用于消费数据。
- 为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库。
- 生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为。
- 消费者只需要从共享数据区去获取数据,并不需要关心生产者的行为。
-
Object 类的等待和唤醒方法
方法名 | 说明 |
---|---|
void wait() | 导致当前线程等待,直到另一个线程调用该对象的 notify()方法或notifyAll()方法 |
void notify() | 唤醒正在等待对象监视器的单个线程 |
void notifyAll() | 唤醒正在等待对象监视器的所有线程 |
3.2生产者和消费者案例
-
案例需求
- 桌子类(Desk): 定义表示包子数量的变量,定义锁对象变量,定义标记桌子上有无包子的变量。
- 生产者(Cooker): 继承 Thread 类,重写 run() 方法,设置线程任务。
- 1.判断是否有包子,决定当前线程是否执行
- 2.如果有包子,就进入等待状态,如果没有包子,继续执行,生产包子
- 3.生产包子之后,更新桌子上包子的状态,唤醒消费者消费包子
- 消费者类(Foodie): 继承 Thread类,重写 run() 方法,设置线程任务。
- 1.判断是否有包子,决定当前线程是否执行
- 2.如果没有包子 ,就进入等待状态,如果有包子,就消费包子
- 3.消费包子后,更新桌子上包子状态,唤醒生产者生产包子
- 测试类(Demo): 里面有 main() 方法,main() 方法中的代码步骤如下
- 1.创建生产者线程和消费者线程对象
- 2.分别开启两个线程
-
Cooker(生产者)类:
public class Cooker extends Thread {private Desk desk;public Cooker(Desk desk) {this.desk=desk;}/*生产者步骤:1.判断桌子上是否有汉堡包如果有就等待,如果没有就生产,2.把汉堡包放在桌子上。3.叫醒等待的消费者开吃*/@Overridepublic void run() {while(true){synchronized (desk.getLock()){if(desk.getCount()==0){break;}else{if(!desk.isFlag()){//生产System.out.println("厨师正在生产汉堡包");desk.setFlag(true);desk.getLock().notifyAll();}else{try {desk.getLock().wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}}
}
- Foodie(消费者)类:
public class Foodie extends Thread{private Desk desk;public Foodie(Desk desk) {this.desk=desk;}//套路://1.while(true)死循环//2.synchronized 锁,锁对象要唯一//3.判断,共享数据是否结束,结束//4.判断,共享数据是否结束,没有结束@Overridepublic void run() {
/*消费者步骤:1.判断桌子上是否有汉堡包2.如果没有就等待3.如果有就开吃4.吃完之后,桌子上的汉堡包就没有了叫醒等待的生产者继续生产汉堡包的总数量减一*/while(true){synchronized (desk.getLock()){if(desk.getCount() == 0){break;}else{if(desk.isFlag()){//有System.out.println("吃货在吃汉堡包");desk.setFlag(false);desk.getLock().notifyAll();desk.setCount(desk.getCount()-1);}else{//没有就等待//使用什么对象当作锁,那么就必须用这个对象去调用等待和唤醒的方法try {desk.getLock().wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}}
}
- Desk(共享数据区)类:
public class Desk {//定义一个标记//true 就表示桌子上有汉堡包的,此时允许吃货执行。//false 就表示桌子上没有汉堡包的,此时允许厨师执行。// public static boolean flag = false;private boolean flag;//汉堡包的总数量
// public static int count = 10;//以后我们在使用这种必须有默认值的变量private int count ;//锁对象
// public static final Object lock = new Object();private final Object lock = new Object();public Desk() {this(false,10);}public Desk(boolean flag, int count) {this.flag = flag;this.count = count;}public boolean isFlag() {return flag;}public void setFlag(boolean flag) {this.flag = flag;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}public Object getLock() {return lock;}@Overridepublic String toString() {return "Desk{" +"flag=" + flag +", count=" + count +", lock=" + lock +'}';}
}
- Demo(测试)类:
public class Demo {public static void main(String[] args) {/*消费者步骤:1.判断桌子上是否有汉堡包2.如果没有就等待3.如果有就开吃4.吃完之后,桌子上的汉堡包就没有了叫醒等待的生产者继续生产汉堡包的总数量减一*//*生产者步骤:1.判断桌子上是否有汉堡包如果有就等待,如果没有就生产,2.把汉堡包放在桌子上。3.叫醒等待的消费者开吃*/Desk desk = new Desk();Foodie f = new Foodie(desk);Cooker c = new Cooker(desk);f.start();c.start();}
}
- 运行结果:
3.3 阻塞队列基本使用
-
阻塞队列继承结构
-
常见 BlockingQueue:
- ArrayBlockingQueue:底层是数组,有界
- LinkedBlockingQueue:底层是链表,无界。但不是真正的无界,最大为 int 的最大值
-
BlockingQueue的核心方法:
- put(anObject): 将参数放入队列,如果放不进去会阻塞。
- take(): 取出第一个数据,取不到会阻塞。
-
代码示例
public class Demo {public static void main(String[] args) throws InterruptedException {//阻塞队列的基本用法//创建阻塞队列的对象,并规定里边的容量为1ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);//存储元素arrayBlockingQueue.put("汉堡包");//取元素System.out.println(arrayBlockingQueue.take());System.out.println(arrayBlockingQueue.take());System.out.println("程序结束了...");}
}
- 运行结果:
注意:
这是因为:只 put() 进去一个数据,而要取出两个数据,第二个 take()取不到数据,导致阻塞。
3.4 阻塞队列实现等待唤醒机制
-
案例需求
- 生产者(Cooker): 继承 Thread 类,重写 run() 方法,设置线程任务
- 1.构造方法中接收一个阻塞队列对象
- 2.在 run() 方法中循环获取阻塞队列中添加包子
- 3.打印添加剂结果
- 消费者(Foodie): 继承 Thread 类,重写 run()方法,设置线程任务
- 1.构造方法中接收一个阻塞队列对象
- 2.在 run 方法中循环获取阻塞队列中的包子
- 3.打印获取结果
- 测试类(Demo): 里面有 main 方法, main 方法中的代码步骤如下
- 1.创建生产者线程 和消费者线程对象,构造方法中传入阻塞队列对象
- 2.分别开启两个线程
- 生产者(Cooker): 继承 Thread 类,重写 run() 方法,设置线程任务
-
Cooker(生产者类):
public class Cooker extends Thread {private ArrayBlockingQueue<String> list;public Cooker(ArrayBlockingQueue<String> list) {this.list = list;}@Overridepublic void run() {while (true) {try {list.put("汉堡包");System.out.println("厨师放了一个汉堡包");} catch (InterruptedException e) {e.printStackTrace();}}}
}
- Foodie(消费者)类:
public class Foodie extends Thread {private ArrayBlockingQueue<String> list;public Foodie(ArrayBlockingQueue<String> list) {this.list = list;}@Overridepublic void run() {while (true) {try {String take = list.take();System.out.println("吃货从队列中获取了"+take);} catch (InterruptedException e) {e.printStackTrace();}}}
}
- Demo(测试)类:
public class Demo {public static void main(String[] args) {//创建一个阻塞队列,容量为1ArrayBlockingQueue<String> list = new ArrayBlockingQueue<>(1);//创建线程并开启Cooker c = new Cooker(list);Foodie f = new Foodie(list);c.start();f.start();}
}
-
运行结果:
注意: -
我们设置的阻塞队列的容量为1,正常情况下应该存一个,取一个,但是为什么会出现同一种情况连续出现两次呢?
- 可能这时我们会考虑到是没有上锁导致两个线程抢CPU执行权的问题
-
有了这个思考,我们可以来查看 put 方法 和 take 方法的源码
- put 方法的源码
- put 方法的源码
-
take 方法的源码
-
从源码中我们可以看见,两个方法底层都已经实现了上锁,但是为什么会出现一种情况连续出现两次呢?
- 这是因为两条输出语句是我们自己写的,并没有在锁里边,所以会出现上边这种情况。
总结
以上是今天的全部内容,详细介绍了生产者和消费者模式,并用具体的例子帮助读者理解这种模式,也介绍了阻塞队列的基本使用,以及阻塞队列实现等待唤醒机制并分析了出现的问题。希望大家多多关注