文章目录
- 阻塞队列
- 自己实现一个阻塞队列(三步)
- 标准库中的阻塞队列
- 使用阻塞队列的优势
- 生产者消费者模型
阻塞队列
队列(Queue)是我们熟悉的一个数据结构,它是“先进先出”的。但是并不是所有的队列都是“先进先出”的,比如:
优先级队列(PriorityQueue):基于自己的比较规则,拿出相应的值。
消息队列(MQ):在队列中引入一个“类型”,出队列的时候会指定某个类型的元素先出。
而我们今天学习的阻塞队列,它是一个“先进先出”的队列,但又有一些其他特点:
- 线程安全的。
- 带有阻塞功能:
如果队列满,继续入队列,入队列操作就会阻塞,直到队列不满,入队列操作才能完成。
如果队列空,继续出队列,出队列操作就会阻塞,直到队列不空,出队列操作才能完成。
自己实现一个阻塞队列(三步)
- 实现基本队列
class MyBlockingQueue{public int[] items = new int[100];public int head = 0;public int tail = 0;public int size = 0;//入队列public void put ( int key) {// 插入操作items[tail] = key;tail++;if (tail >= items.length) {tail = 0;}size++;}//出队列public Integer take() {// 删除操作int ret = items[head];head++;if (head >= items.length) {head = 0;}size--;return ret;}
}
- 保证线程安全
class MyBlockingQueue{public int[] items = new int[100];public int head = 0;public int tail = 0;public int size = 0;//入队列public void put ( int key) {synchronized (this) {// 插入操作items[tail] = key;tail++;if (tail >= items.length) {tail = 0;}size++;}}//出队列public Integer take() {synchronized (this) {// 删除操作int ret = items[head];head++;if (head >= items.length) {head = 0;}size--;return ret;}}
}
- 实现阻塞功能
class MyBlockingQueue{public int[] items = new int[100];public int head = 0;public int tail = 0;public int size = 0;//入队列public void put ( int key) throws InterruptedException {synchronized (this) {//当wait被唤醒之后 size还有可能为满 所以不能一唤醒了就直接去使用 得再次判断条件是否满足// 比如:1.抛出异常 2. 三个线程同时执行 其中两个线程竞争锁 没竞争到锁的线程被唤醒之后size还为空while(size >= items.length){this.wait();}//判断队列是否为满 满了不能插入 就阻塞
// if (size >= items.length){
// this.wait();
// }// 插入操作items[tail] = key;tail++;if (tail >= items.length) {tail = 0;}size++;this.notify();}}//出队列public Integer take() throws InterruptedException {synchronized (this) {//当wait被唤醒之后 size还有可能为空 所以不能一唤醒了就直接去使用 得再次判断条件是否满足while(size <= 0){this.wait();}//判断队列是否为空 空了不能删除 就阻塞
// if (size <= 0){
// this.wait();
// }// 删除操作int ret = items[head];head++;if (head >= items.length) {head = 0;}size--;this.notify();return ret;}}
}
标准库中的阻塞队列
//标准库的阻塞队列public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);//带有阻塞功能的入队列blockingQueue.put(1);blockingQueue.put(2);blockingQueue.take();blockingQueue.put(3);//带有阻塞功能的出队列System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());blockingQueue.put(4);System.out.println(blockingQueue.take());blockingQueue.put(4);}
使用阻塞队列的优势
- 有利于代码解耦合
- 削峰填谷
在服务器A流量激增的情况下,通过阻塞队列能够进行限流。使服务器B、C、D能够不受干扰、平稳运行。
生产者消费者模型
生产者消费者模型:描述的是多线程协同工作的一种方式。借助阻塞队列实现。
//生产者-消费者模型public static void main(String[] args) {MyBlockingQueue myBlockingQueue = new MyBlockingQueue();Thread thread = new Thread(() -> {int n = 1;while (true){try {myBlockingQueue.put(n);} catch (InterruptedException e) {e.printStackTrace();}n++;}});Thread thread1 = new Thread(() -> {while (true){try {System.out.println(myBlockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}});thread.start();thread1.start();}