本文深入解读了高频面试点——ReentrantLock的条件队列使用方法及其原理。源码有详细注释,建议收藏阅读。
点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达
Jdk中独占锁的实现除了使用关键字synchronized
外,还可以使用ReentrantLock
。虽然在性能上两者没有什么区别,但ReentrantLock
相比synchronized
而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景,其原理之前已经介绍过,请自行阅读。
重点,一文掌握ReentrantLock加解锁原理!|原创
使用synchronized
结合Object
上的wait
和notify
方法可以实现线程间的等待通知机制。ReentrantLock
的Condition
同样可以实现这个功能,而且相比前者使用起来更清晰也更简单。前者是java底层级别的,后者是语言级别的,后者可控制性和扩展性更好。
Condition与Object的wait/notify区别
Condition能够支持不响应中断,而通过使用 Object 方式不支持
Condition能够支持多个等待队列(new 多个Condition对象),而 Object 方式只能支持一个
Condition能够支持超时时间的设置,而 Object 不支持
使用示例
为了方便理解源码,我们先用一个Demo展示一下ReentrantLock的线程停止和通知是如何使用的。这里使用的是一个生产者和消费者的模型,一个线程负责加,另一个线程负责减。
static volatile int i = 0;
static final ReentrantLock LOCK = new ReentrantLock();
static final Condition condition = LOCK.newCondition();public static void add() throws InterruptedException {LOCK.lock();try {while (i == 0) {Thread.sleep(1000);System.out.print("add\t");System.out.println(++i);condition.signal();condition.await();}} finally {LOCK.unlock();}
}public static void sub() throws InterruptedException {LOCK.lock();try {while (i == 1) {Thread.sleep(1000);System.out.print("sub\t");System.out.println(--i);condition.signal();condition.await();}} finally {LOCK.unlock();}
}public static void main(String[] args) throws InterruptedException {new Thread(() -> {while (true) {try {add();} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(() -> {while (true) {try {sub();} catch (InterruptedException e) {e.printStackTrace();}}}).start();
}
可以看到,想要获得一个Condition对象,需要首先通过一个ReentrantLock锁来创建,而最终调用其实为AQS中的内部类ConditionObject。
condition是要和lock配合使用的,而lock的实现原理又依赖于AQS,所以AQS内部实现了ConditionObject。我们知道在锁机制的实现上,AQS内部维护了一个双向的同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列。condition内部也是使用相似的方式,内部维护了一个单向的等待队列,所有调用condition.await
方法的线程会加入到等待队列中,并且线程状态转换为等待状态。
ConditionObject中有两个成员变量:头节点firstWaiter 和 尾节点lastWaiter ,同步队列的成员Node 复用了实现同步队列的内部类Node。用nextWaiter保存了下一个等待节点,源码如下。
Condition condition = LOCK.newCondition();
//ReentrantLock内部类Sync
abstract static class Sync extends AbstractQueuedSynchronizer {final ConditionObject newCondition() {return new ConditionObject();}
}
// AQS内部类 ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;//真正的创建Condition对象public ConditionObject() { }
}
static final class Node {Node nextWaiter;
}
用Object的方式Object对象监视器上只能拥有一个同步队列和一个等待队列,而使用Lock可以有有一个同步队列和多个等待队列。可以多次调用lock.newCondition()创建多个Condition,所以一个Lock可以持有多个等待队列。
下面开始解读await()
和signal()
方法。
Await方法原理
阻塞前:
1.在条件队列尾部添加新节点(状态CONDITION=-2),如果头节点为空则把当前节点设为头节点。
2.获取当前线程占有的state,无论state是几,都清空为0,代表完全释放锁。并且在释放当前线程所占用的锁之后,会唤醒同步队列中的下一个节点。
3.进入自旋判断逻辑:如果当前节点状态是 CONDITION(-2)或者 prev 节点(表示在同步队列中有前驱节点)为空,返回false,进入while逻辑,阻塞当前线程;如果有继承者,表示肯定在同步队列中,直接跳出循环;如果从同步队列队尾开始寻找,找到当前节点,同样表示在队列中,跳出循环。
注意!! 是先添加到条件队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park。此时的A线程可能已经状态不再是CONDITION,说明已经进入同步队列,那就可以跳过Park再次直接争夺锁,所以这里需要自旋锁去不断尝试判断。
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾Node node = addConditionWaiter();// 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点int savedState = fullyRelease(node);int interruptMode = 0;// 当不在同步队列中(处于condition状态或者前一个节点为null)while (!isOnSyncQueue(node)) {// 3. 当前线程进入到等待状态LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 4. 自旋等待获取到同步状态(即获取到lock)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelled//删除无效的等待节点unlinkCancelledWaiters();// 5. 处理被中断的情况if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
阻塞后:
恢复执行后,检查是否中断。然后自旋再次判断是否已经进入同步队列,返回true,跳出循环继续执行。
调用acquireQueued,尝试去争夺锁,这里逻辑和
lock
一样,已经是同步队列去竞争锁的逻辑。并且会将之前清空的state值按照原来的大小设置。最后都是一些中断标记的处理,主流程已经结束。
注意:退出await
方法一定表明当前线程已经获得了与condition
关联的锁资源。
具体请看代码:
// AQS
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 1. 添加新节点,将当前线程保存其中,并且添加到等待队列队尾Node node = addConditionWaiter();// 2. 释放当前线程所占用的lock,并且唤醒同步队列中的下一个节点int savedState = fullyRelease(node);int interruptMode = 0;//是先添加到等待队列,再释放锁。所以有可能出现以下的情况,A插入条件队列调用await唤醒B,但是在A唤醒后准备park时,B已经执行完需要的逻辑,并且再次Park,此时的A就可以跳过Park再次直接争夺锁。while (!isOnSyncQueue(node)) {// 3. 关键节点!!!:当前线程进入到等待状态LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 4. 自旋等待获取到同步状态(即获取到lock)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 如果节点线程被取消才会进入这里的逻辑。正常不会if (node.nextWaiter != null) // clean up if cancelled//删除无效的等待节点unlinkCancelledWaiters();// 5. 处理被中断的情况if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
}
// 添加新的条件队列节点
private Node addConditionWaiter() {Node t = lastWaiter;// 清除被取消的尾节点if (t != null && t.waitStatus != Node.CONDITION) {//解除关联unlinkCancelledWaiters();t = lastWaiter;}//将当前线程保存在Node中Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;else//队尾插入t.nextWaiter = node;//更新lastWaiter (如果是第一次插入节点,头尾节点都是同一个)lastWaiter = node;return node;
}
//完全释放锁状态
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 这里会释放锁,并且唤醒后继节点if (release(savedState)) {//成功释放同步状态failed = false;return savedState;} else {//不成功释放同步状态抛出异常throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}
}
Signal
检查本线程是否持有锁,正常是持有锁,如果不符合就抛出异常。
从等待队列中拿到第一个节点。如果头节点为空代表条件队列为空,谁也不通知直接结束。
将头节点从条件队列中移除,并且把nextWaiter置为null。然后把节点状态设为0,转移进入同步队列。如果队列为空则初始化同步队列。
如果前驱节点不是 signal 状态或者前一个节点已经被取消,直接对头节点线程解除阻塞。返回true跳出循环。
至此本线程方法执行结束。依旧持有锁,但是转移了条件队列的头节点到同步队列中,就做了这一件事。
//AQS
public final void signal() {//1. 先检测当前线程是否已经获取lockif (!isHeldExclusively())throw new IllegalMonitorStateException();//2. 获取等待队列中第一个节点,之后的操作都是针对这个节点Node first = firstWaiter;if (first != null)doSignal(first);
}//ReentrantLock
protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();
}//AQS
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;//1. 将头结点从等待队列中移除first.nextWaiter = null;//2. while中transferForSignal方法对头结点做真正的处理} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}final boolean transferForSignal(Node node) {//1. 更新状态为0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;//2.将该节点移入到同步队列中去// 这里的处理和同步队列的生成用的同一个方法// node p 为前驱节点(原尾节点)Node p = enq(node);int ws = p.waitStatus;// 如果前驱节点不是signal状态或者前一个节点已经被取消,直接对头节点解除阻塞。返回true跳出循环if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
}
具体原理图如下:
SignalAll
signalAll与signal方法的区别体现在doSignalAll
方法上,前面我们已经知道doSignal
方法只会对等待队列的头节点进行操作,而doSignalAll
将条件队列中的所有Node都转移到了同步队列中,即“通知”当前调用condition.await()方法的每一个线程,代码如下。
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);
}
最后,欢迎大家提问和交流。
如果对你有帮助,欢迎点赞、评论或分享,感谢阅读!
update在MySQL中是怎样执行的,一张图牢记|原创
2022-11-19
讲真,这篇最全HashMap你不能错过!|原创
2022-11-17
MySQL主从数据不一致,怎么办?
2022-11-15