- 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
- 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙、Spring从成神到升仙系列
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
- 📝联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
文章目录
- 从源码全面解析 LinkedBlockingQueue的来龙去脉
- 一、引言
- 二、使用
- 三、源码
- 1、初始化
- 2、生产者的源码
- 2.1 add()源码实现
- 2.2 offer()源码实现
- 2.3 offer(time)源码实现
- 2.4 put()源码实现
- 3、消费者的源码
- 3.1 remove()源码实现
- 3.2 poll()源码实现
- 3.3 poll(time)源码实现
- 3.4 take()源码实现
- 4、疑惑
- 四、流程图
- 五、总结
从源码全面解析 LinkedBlockingQueue的来龙去脉
一、引言
并发编程在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在并发编程的使用和原理方面对小伙伴们进行 360° 的刁难。
作为一个在互联网公司面一次拿一次 Offer
的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。
于是在一个寂寞难耐的夜晚,暖男我痛定思痛,决定开始写 《吊打面试官》 系列,希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer
!
虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马
二、使用
对于阻塞队列,想必大家应该都不陌生,我们这里简单的介绍一下,对于 Java
里面的阻塞队列,其使用了 **生产者和消费者 **的模型
对于生产者来说,主要有以下几部分:
add(E) // 添加数据到队列,如果队列满了,无法存储,抛出异常
offer(E) // 添加数据到队列,如果队列满了,返回false
offer(E,timeout,unit) // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false
put(E) // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!
对于消费者来说,主要有以下几部分:
remove() // 从队列中移除数据,如果队列为空,抛出异常
poll() // 从队列中移除数据,如果队列为空,返回null,么的数据
poll(timeout,unit) // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取
take() // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取
我们本篇来讲讲堵塞队列中的第二员猛将,LinkedBlockingQueue
的故事
我们先来看其基本使用
public class LinkedBlockingQueueTest {public static void main(String[] args) throws Exception {LinkedBlockingQueue queue = new LinkedBlockingQueue();// 生产者扔数据queue.add("1");queue.offer("2");queue.offer("3", 2, TimeUnit.SECONDS);queue.put("2");// 消费者取数据System.out.println(queue.remove());System.out.println(queue.poll());System.out.println(queue.poll(2, TimeUnit.SECONDS));System.out.println(queue.take());}
}
三、源码
1、初始化
由于我们的 LinkedBlockingQueue
底层是链表实现的,所以我们初始化的时候不需要指定其大小
LinkedBlockingQueue queue = new LinkedBlockingQueue();// 如果我们不指定容量大小的话,这里的容量默认为Integer.MAX_VALUE
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}public LinkedBlockingQueue(int capacity) {// 如果容量传进来是小于等于0的,直接抛异常if (capacity <= 0){throw new IllegalArgumentException();}// 当前的容量赋值this.capacity = capacity;// 这里其实和我们的AQS有点像// 搞一个虚拟的头结点,减少后面的判空last = head = new Node<E>(null);
}
当然,除了我们初始化的这些成员变量,我们还有一部分:
class Node<E> {// 当前的数据E item;// 指向下一个数据的指针Node<E> next;Node(E x) {item = x;}
}// 当前链表中存在的数据数量
private final AtomicInteger count = new AtomicInteger();// 读锁
private final ReentrantLock takeLock = new ReentrantLock();// 唤醒消费者线程
private final Condition notEmpty = takeLock.newCondition();// 写锁
private final ReentrantLock putLock = new ReentrantLock();// 唤醒生产者线程
private final Condition notFull = putLock.newCondition();
这里可能有的小伙伴有点懵逼,为什么这哥们(LinkedBlockingQueue
)用了两个锁呢?为什么我 ArrayBlockingQueue
只能用一把锁?
不要急,我们慢慢的往下看他源码
2、生产者的源码
2.1 add()源码实现
public boolean add(E e) {return super.add(e);
}// 走到这里会发现,我们的add方法就是调用了offer方法
// offer: 添加数据到队列,如果队列满了,返回false
// 所以这里offer满了,就会抛出异常:"Queue full"
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");
}
2.2 offer()源码实现
public boolean offer(E e) {// 如果是空值,直接抛出异常if (e == null) throw new NullPointerException();// 引用,上篇我们分析过final AtomicInteger count = this.count;// 判断当前数据量是否和我们总容量一样if (count.get() == capacity){return false;}// 标记位int c = -1;// 创建节点Node<E> node = new Node<E>(e);// 引用写锁final ReentrantLock putLock = this.putLock;// 上锁putLock.lock();try {// 如果当前数据量小于总容量// 这里我们上面也检查过,相当于DCL的意思if (count.get() < capacity) {// 插入队列enqueue(node);// 得到当前数据量// 这里需要注意:getAndIncrement先返回数据,再加一c = count.getAndIncrement();// 如果我们发现当前数据量还小于总容量// 也就是我们可以继续放数据if (c + 1 < capacity)// 唤醒其他的生产者线程扔数据// 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中// 具体什么时候执行还需要看AQS的调度notFull.signal();}} finally {// 解锁putLock.unlock();}// 如果我们当前数据量为0,代表队列中原来无数据// 但上面现在扔进去了一个if (c == 0)// 需要唤醒所有的消费者消费数据signalNotEmpty();return c >= 0;
}private void enqueue(Node<E> node) {// 将当面节点挂在last节点后// 将last节点指向当前节点last = last.next = node;
}// 这里我们的Condition聊过
// 必须持有当前锁资源才可以使用Condition的方法
private void signalNotEmpty() {// 拿到读锁final ReentrantLock takeLock = this.takeLock;// 加锁takeLock.lock();try {// 唤醒消费者线程notEmpty.signal();} finally {// 解锁takeLock.unlock();}
}
2.3 offer(time)源码实现
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {// 如果是空值,直接抛出异常if (e == null) throw new NullPointerException();// 转成统一的单位long nanos = unit.toNanos(timeout);int c = -1;// 写锁final ReentrantLock putLock = this.putLock;// 当前容量final AtomicInteger count = this.count;// 加锁putLock.lockInterruptibly();try {// 如果当前数据量小于总容量// 这里我们上面也检查过,相当于DCL的意思while (count.get() == capacity) {// 如果我们剩余时间小于0,直接失败即可if (nanos <= 0)return false;// 反之生产者线程写入挂起nanos时间nanos = notFull.awaitNanos(nanos);}// 添加至队列enqueue(new Node<E>(e));// 得到当前数据量// 这里需要注意:getAndIncrement先返回数据,再加一c = count.getAndIncrement();// 如果我们发现当前数据量还小于总容量// 也就是我们可以继续放数据if (c + 1 < capacity)// 唤醒其他的生产者线程扔数据// 当然这里稍微多说一点,这里的唤醒指的是将生产者从Condition队列放到AQS队列中// 具体什么时候执行还需要看AQS的调度notFull.signal();} finally {// 解锁putLock.unlock();}// 如果我们当前数据量为0,代表队列中原来无数据// 但现在扔进去了一个,唤醒消费者线程if (c == 0)signalNotEmpty();return true;
}
2.4 put()源码实现
- 这里就不写了,其实和我们的 offer 一样,大家自己看看就好
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();
}
3、消费者的源码
3.1 remove()源码实现
public E remove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();
}
3.2 poll()源码实现
public E poll() {// 获取当前链表的数据量final AtomicInteger count = this.count;// 如果数据量为0,说明无数据// 消费者无法消费,直接返回null即可if (count.get() == 0)return null;E x = null;int c = -1;// 拿到读锁final ReentrantLock takeLock = this.takeLock;// 加锁takeLock.lock();try {// 如果数据量大于0,说明有数据// 这里我们上面也检查过,相当于DCL的意思if (count.get() > 0) {// 取数x = dequeue();// 得到当前数据量// 这里需要注意:getAndIncrement先返回数据,再减一c = count.getAndDecrement();// 如果我们的数据量大于1,则唤醒消费者来消费if (c > 1)notEmpty.signal();}} finally {// 解锁takeLock.unlock();}// 如果数据量等于当前的总容量// 说明当前的链表已经有空余了,唤醒生产者生产if (c == capacity)signalNotFull();return x;
}// 这个取数据和我们的AQS有点像
// 去除当前数据并且将当前节点作为头结点
private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;
}private void signalNotFull() {// 拿到写锁final ReentrantLock putLock = this.putLock;// 上锁putLock.lock();try {// 唤醒生产者notFull.signal();} finally {// 解锁putLock.unlock();}
}
3.3 poll(time)源码实现
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;// 统一时间单位long nanos = unit.toNanos(timeout);// 拿到当前数据量 + 读锁final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;// 加可中断锁takeLock.lockInterruptibly();try {// 如果当前的数据量为0while (count.get() == 0) {// 如果时间没有剩余,直接返回null即可if (nanos <= 0)return null;// 让消费者线程等待nanos时间nanos = notEmpty.awaitNanos(nanos);}// 取数据x = dequeue();// 后面都是一样的c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}
3.4 take()源码实现
- 这个大家可以自己看一下补充,也算一个小测试
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}
4、疑惑
看到这里,我想大家可能有和我一样的疑惑?
之前我们聊 ArrayBlockingQueue
的时候,他只用了一把锁(互斥锁),但 LinkedBlockingQueue
却使用了两把锁(读锁、写锁)
这时候你脑子会不会有一种疑问,我 ArrayBlockingQueue
能不能使用两把锁(读锁、写锁)来进行访问
如果你有这种想法,说明你确实思考了,哈哈哈
没错,博主我查阅了相关的资料,ArrayBlockingQueue
确实可以使用两把锁进行逻辑的更改
博主贴下链接:
- ArrayBlockingQueue改造代码
- ArrayBlockingQueue 使用单个锁进行插入和删除,而 LinkedBlockingQueue 使用 2 个单独的锁
考虑部分小伙伴可能没有VPN,博主贴下代码
整体的逻辑基本上是仿造 LinkedBlockingQueue
的业务逻辑改造的,经测试这种性能要比原始的 ArrayBlockingQueue
要快 20%~30% 左右,感兴趣的也可以自己去测试一下。
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ArrayBlockingQueueUsingTwoLockApproach {/** The queued items */final Object[] items;/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;/** Current number of elements */private final AtomicInteger count = new AtomicInteger();/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();public ArrayBlockingQueueUsingTwoLockApproach(int capacity) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];}public void put(Object e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == items.length) {notFull.await();}enqueue(e);c = count.getAndIncrement();if (c + 1 < items.length)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}public Object take() throws InterruptedException {Object x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == items.length)signalNotFull();return x;}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(Object x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count.incrementAndGet();}/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/private Object dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")Object x = (Object) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count.decrementAndGet();return x;}/*** Signals a waiting put. Called only from take/poll.*/private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}
}
四、流程图
其实,我们 LinkedBlockingQueue
整体的代码逻辑和 ArrayBlockingQueue
类似,只不过底层数据结构不同罢了
我们这里简单的画一下,有兴趣的同学也可以自己画吆
五、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。
如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长
我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。
我们下期再见。
我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。
往期文章推荐:
- 从根上剖析ReentrantLock的来龙去脉
- 阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似
- 从源码全面解析 ThreadLocal 关键字的来龙去脉
- 从源码全面解析 synchronized 关键字的来龙去脉
- 从源码全面解析 ArrayBlockingQueue 的来龙去脉
- 阿里面试官让我讲讲volatile,我直接从HotSpot开始讲起,一套组合拳拿下面试