Condition简介
Condition 的作用是对锁进行更精确的控制。 Condition 中的 await() 方法相当于 Object 的 wait() 方法, Condition 中的 signal() 方法相当于 Object 的 notify() 方法, Condition 中的 signalAll() 相当于 Object 的 notifyAll() 方法。不同的是, Object 中的 wait() , notify() , notifyAll() 方法是和”同步锁” (synchronized 关键字) 捆绑使用的;而 Condition 是需要与”互斥锁”/“共享锁”捆绑使用的。
1 | void await() |
Condtion的实现
获取一个 Condition 必须要通过 Lock 的 newCondition() 方法。该方法定义在接口 Lock 下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。 Condition 为一个接口,其下仅有一个实现类 ConditionObject ,由于 Condition 的操作需要获取相关的锁,而 AQS 则是同步锁的实现基础,所以 ConditionObject 则定义为 AQS 的内部类。定义如下:
1 | public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
等待队列
每个 Condition 对象都包含着一个 FIFO 队列,该队列是 Condition 对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该 Condition 对象上等待的线程。我们看 Condition 的定义就明白了:
1 | public class ConditionObject implements Condition, java.io.Serializable { |
从上面代码可以看出 Condition 拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用 await() 方法,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。结构如下:
Node 里面包含了当前线程的引用。 Node 定义与 AQS 的 CLH 同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node 静态内部类)。
Condition 的队列结构比 CLH 同步队列的结构简单些,新增过程较为简单只需要将原尾节点的 nextWaiter 指向新增节点,然后更新 lastWaiter 即可。
等待(await)
调用 Condition 的 await() 方法会使当前线程进入等待状态,同时会加入到 Condition 等待队列同时释放锁。当从 await() 方法返回时,当前线程一定是获取了 Condition 相关连的锁。
1 | public final void await() throws InterruptedException { |
此段代码的逻辑是:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在 CLH 同步队列中(收到 signal 信号之后就会在 AQS 队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。
加入条件队列(addConditionWaiter())源码如下:
1 | private Node addConditionWaiter() { |
该方法主要是将当前线程加入到 Condition 条件队列中。当然在加入到尾节点之前会清楚所有状态不为 Condition 的节点。
fullyRelease(Node node)
,负责释放该线程持有的锁。
1 | final long fullyRelease(Node node) { |
isOnSyncQueue(Node node)
:如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true
1 | final boolean isOnSyncQueue(Node node) { |
unlinkCancelledWaiters()
:负责将条件队列中状态不为Condition
的节点删除
1 | private void unlinkCancelledWaiters() { |
通知(signal)
调用 Condition 的 signal() 方法,将会唤醒在条件等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到 CLH 同步队列中。
1 | public final void signal() { |
该方法首先会判断当前线程是否已经获得了锁,这是前置条件。然后唤醒条件队列中的头节点。
doSignal(Node first):唤醒头节点
1 | private void doSignal(Node first) { |
doSignal(Node first)主要是做两件事:1.修改头节点,2.调用 transferForSignal(Node first) 方法将节点移动到CLH同步队列中。transferForSignal(Node first)源码如下:
1 | final boolean transferForSignal(Node node) { |
- 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
- 如果线程已经获取了锁,则将唤醒条件队列的首节点
- 唤醒首节点是先将条件队列中的头节点移出,然后调用 AQS 的 enq(Node node) 方法将其安全地移到 CLH 同步队列中
- 最后判断如果该节点的同步状态是否为 Cancel ,或者修改状态为 Signal 失败时,则直接调用LockSupport唤醒该节点的线程。
总结
一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过 isOnSyncQueue(Node node) 方法不断自检看节点是否已经在 CLH 同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法唤醒 CLH 同步队列的首节点。被唤醒的线程,将从 await() 方法中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。
synchronized原理
synchronized 原理在 java 中,每一个对象有且仅有一个同步锁。这也意味着,同步锁是依赖于对象而存在。
当我们调用某对象的 synchronized 方法时,就获取了该对象的同步锁。例如, synchronized(obj) 就获取了“obj这个对象”的同步锁。
不同线程对同步锁的访问是互斥的。也就是说,某时间点,对象的同步锁只能被一个线程获取到!通过同步锁,我们就能在多线程中,实现对“对象/方法”的互斥访问。 例如,现在有两个线程 A 和线程 B ,它们都会访问“对象 obj 的同步锁”。假设,在某一时刻,线程 A 获取到“ obj 的同步锁”并在执行一些操作;而此时,线程 B 也企图获取“ obj 的同步锁” —— 线程 B 会获取失败,它必须等待,直到线程 A 释放了“该对象的同步锁”之后线程 B 才能获取到“ obj 的同步锁”从而才可以运行。
synchronized基本规则
synchronized 基本规则我们将 synchronized 的基本规则总结为下面3条,并通过实例对它们进行说明。
第一条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程对“该对象”的该“synchronized方法”或者“synchronized代码块”的访问将被阻塞。
第二条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程仍然可以访问“该对象”的非同步代码块。
第三条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程对“该对象”的其他的“synchronized方法”或者“synchronized代码块”的访问将被阻塞。
实例锁 – 锁在某一个实例对象上。如果该类是单例,那么该锁也具有全局锁的概念。
实例锁对应的就是synchronized关键字。
全局锁 – 该锁针对的是类,无论实例多少个对象,那么线程都共享该锁。
全局锁对应的就是 static synchronized(或者是锁在该类的class或者classloader对象上)。
Condtion的实现
1 | import java.util.concurrent.locks.Lock; |
1 | p1 put 1 |
(01) BoundedBuffer 是容量为5的缓冲,缓冲中存储的是 Object 对象,支持多线程的读/写缓冲。多个线程操作“一个 BoundedBuffer 对象”时,它们通过互斥锁 lock 对缓冲区 items 进行互斥访问;而且同一个 BoundedBuffer 对象下的全部线程共用 “notFull” 和 “notEmpty” 这两个 Condition 。
notFull 用于控制写缓冲, notEmpty 用于控制读缓冲。当缓冲已满的时候,调用 put 的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count+1,最后,调用notEmpty.signal()缓冲notEmpty上的等待线程(调用notEmpty.await的线程)。 简言之,notFull控制“缓冲区的写入”,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程。
同理,notEmpty控制“缓冲区的读取”,当读取了缓冲区数据之后会唤醒notFull上的等待线程。
(02) 在ConditionTest2的main函数中,启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);同时,也启动10个“读线程”,从BoundedBuffer中不断的读数据。