Fork me on GitHub

锁条件Condition

Condition简介

  Condition 的作用是对锁进行更精确的控制。 Condition 中的 await() 方法相当于 Object 的 wait() 方法, Condition 中的 signal() 方法相当于 Object 的 notify() 方法, Condition 中的 signalAll() 相当于 Object 的 notifyAll() 方法。不同的是, Object 中的 wait() , notify() , notifyAll() 方法是和”同步锁” (synchronized 关键字) 捆绑使用的;而 Condition 是需要与”互斥锁”/“共享锁”捆绑使用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()

Condtion的实现

  获取一个 Condition 必须要通过 Lock 的 newCondition() 方法。该方法定义在接口 Lock 下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。 Condition 为一个接口,其下仅有一个实现类 ConditionObject ,由于 Condition 的操作需要获取相关的锁,而 AQS 则是同步锁的实现基础,所以 ConditionObject 则定义为 AQS 的内部类。定义如下:

1
2
3
public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {}
}

等待队列

  每个 Condition 对象都包含着一个 FIFO 队列,该队列是 Condition 对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该 Condition 对象上等待的线程。我们看 Condition 的定义就明白了:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;

//头节点
private transient Node firstWaiter;
//尾节点
private transient Node lastWaiter;

public ConditionObject() {
}

/** 省略方法 **/
}

  从上面代码可以看出 Condition 拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用 await() 方法,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。结构如下:

  Node 里面包含了当前线程的引用。 Node 定义与 AQS 的 CLH 同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node 静态内部类)。

  Condition 的队列结构比 CLH 同步队列的结构简单些,新增过程较为简单只需要将原尾节点的 nextWaiter 指向新增节点,然后更新 lastWaiter 即可。

等待(await)

  调用 Condition 的 await() 方法会使当前线程进入等待状态,同时会加入到 Condition 等待队列同时释放锁。当从 await() 方法返回时,当前线程一定是获取了 Condition 相关连的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void await() throws InterruptedException {
// 当前线程中断
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
* 直到检测到此节点在同步队列上
* 需要被唤醒才能从条件队列转移到CLF同步队列
*/
while (!isOnSyncQueue(node)) {
//线程挂起
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

  此段代码的逻辑是:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在 CLH 同步队列中(收到 signal 信号之后就会在 AQS 队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。

  加入条件队列(addConditionWaiter())源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Node addConditionWaiter() {
Node t = lastWaiter; //尾节点
//Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
if (t != null && t.waitStatus != Node.CONDITION) {
//清除条件队列中所有状态不为Condition的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//当前线程新建节点,状态CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/**
* 将该节点加入到条件队列中最后一个位置
*/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

  该方法主要是将当前线程加入到 Condition 条件队列中。当然在加入到尾节点之前会清楚所有状态不为 Condition 的节点。

  fullyRelease(Node node),负责释放该线程持有的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final long fullyRelease(Node node) {
boolean failed = true;
try {
//节点状态--其实就是持有锁的数量
long savedState = getState();
//释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

  isOnSyncQueue(Node node):如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true

1
2
3
4
5
6
7
8
9
10
final boolean isOnSyncQueue(Node node) {
//状态为Condition,获取前驱节点为null,返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//后继节点不为null,肯定在CLH同步队列中
if (node.next != null)
return true;

return findNodeFromTail(node);
}

  unlinkCancelledWaiters():负责将条件队列中状态不为Condition的节点删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

通知(signal)

  调用 Condition 的 signal() 方法,将会唤醒在条件等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到 CLH 同步队列中。

1
2
3
4
5
6
7
8
9
public final void signal() {
//检测当前线程是否为拥有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒
}

  该方法首先会判断当前线程是否已经获得了锁,这是前置条件。然后唤醒条件队列中的头节点。

  doSignal(Node first):唤醒头节点

1
2
3
4
5
6
7
8
9
private void doSignal(Node first) {
do {
//修改头结点,完成旧头结点的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

  doSignal(Node first)主要是做两件事:1.修改头节点,2.调用 transferForSignal(Node first) 方法将节点移动到CLH同步队列中。transferForSignal(Node first)源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
 final boolean transferForSignal(Node node) {
//将该节点从状态CONDITION改变为初始状态0,
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

//将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
Node p = enq(node);
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
  1. 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
  2. 如果线程已经获取了锁,则将唤醒条件队列的首节点
  3. 唤醒首节点是先将条件队列中的头节点移出,然后调用 AQS 的 enq(Node node) 方法将其安全地移到 CLH 同步队列中
  4. 最后判断如果该节点的同步状态是否为 Cancel ,或者修改状态为 Signal 失败时,则直接调用LockSupport唤醒该节点的线程。

总结

  一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过 isOnSyncQueue(Node node) 方法不断自检看节点是否已经在 CLH 同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法唤醒 CLH 同步队列的首节点。被唤醒的线程,将从 await() 方法中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。

synchronized原理

  synchronized 原理在 java 中,每一个对象有且仅有一个同步锁。这也意味着,同步锁是依赖于对象而存在。
当我们调用某对象的 synchronized 方法时,就获取了该对象的同步锁。例如, synchronized(obj) 就获取了“obj这个对象”的同步锁。
不同线程对同步锁的访问是互斥的。也就是说,某时间点,对象的同步锁只能被一个线程获取到!通过同步锁,我们就能在多线程中,实现对“对象/方法”的互斥访问。 例如,现在有两个线程 A 和线程 B ,它们都会访问“对象 obj 的同步锁”。假设,在某一时刻,线程 A 获取到“ obj 的同步锁”并在执行一些操作;而此时,线程 B 也企图获取“ obj 的同步锁” —— 线程 B 会获取失败,它必须等待,直到线程 A 释放了“该对象的同步锁”之后线程 B 才能获取到“ obj 的同步锁”从而才可以运行。

synchronized基本规则

  synchronized 基本规则我们将 synchronized 的基本规则总结为下面3条,并通过实例对它们进行说明。
  第一条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程对“该对象”的该“synchronized方法”或者“synchronized代码块”的访问将被阻塞。
  第二条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程仍然可以访问“该对象”的非同步代码块。
  第三条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程对“该对象”的其他的“synchronized方法”或者“synchronized代码块”的访问将被阻塞。

实例锁 – 锁在某一个实例对象上。如果该类是单例,那么该锁也具有全局锁的概念。
实例锁对应的就是synchronized关键字。

全局锁 – 该锁针对的是类,无论实例多少个对象,那么线程都共享该锁。
全局锁对应的就是 static synchronized(或者是锁在该类的class或者classloader对象上)。

Condtion的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[5];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
while (count == items.length)
notFull.await();
// 将x添加到缓冲中
items[putptr] = x;
// 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
if (++putptr == items.length) putptr = 0;
// 将“缓冲”数量+1
++count;
// 唤醒take线程,因为take线程通过notEmpty.await()等待
notEmpty.signal();

// 打印写入的数据
System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
} finally {
lock.unlock(); // 释放锁
}
}

public Object take() throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
while (count == 0)
notEmpty.await();
// 将x从缓冲中取出
Object x = items[takeptr];
// 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
if (++takeptr == items.length) takeptr = 0;
// 将“缓冲”数量-1
--count;
// 唤醒put线程,因为put线程通过notFull.await()等待
notFull.signal();

// 打印取出的数据
System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
return x;
} finally {
lock.unlock(); // 释放锁
}
}
}

public class ConditionTest2 {
private static BoundedBuffer bb = new BoundedBuffer();

public static void main(String[] args) {
// 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
// 启动10个“读线程”,从BoundedBuffer中不断的读数据。
for (int i=0; i<10; i++) {
new PutThread("p"+i, i).start();
new TakeThread("t"+i).start();
}
}

static class PutThread extends Thread {
private int num;
public PutThread(String name, int num) {
super(name);
this.num = num;
}
public void run() {
try {
Thread.sleep(1); // 线程休眠1ms
bb.put(num); // 向BoundedBuffer中写入数据
} catch (InterruptedException e) {
}
}
}

static class TakeThread extends Thread {
public TakeThread(String name) {
super(name);
}
public void run() {
try {
Thread.sleep(10); // 线程休眠1ms
Integer num = (Integer)bb.take(); // 从BoundedBuffer中取出数据
} catch (InterruptedException e) {
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
p1 put  1
p4 put 4
p5 put 5
p0 put 0
p2 put 2
t0 take 1
p3 put 3
t1 take 4
p6 put 6
t2 take 5
p7 put 7
t3 take 0
p8 put 8
t4 take 2
p9 put 9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9

(01) BoundedBuffer 是容量为5的缓冲,缓冲中存储的是 Object 对象,支持多线程的读/写缓冲。多个线程操作“一个 BoundedBuffer 对象”时,它们通过互斥锁 lock 对缓冲区 items 进行互斥访问;而且同一个 BoundedBuffer 对象下的全部线程共用 “notFull” 和 “notEmpty” 这两个 Condition 。
notFull 用于控制写缓冲, notEmpty 用于控制读缓冲。当缓冲已满的时候,调用 put 的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count+1,最后,调用notEmpty.signal()缓冲notEmpty上的等待线程(调用notEmpty.await的线程)。 简言之,notFull控制“缓冲区的写入”,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程。
同理,notEmpty控制“缓冲区的读取”,当读取了缓冲区数据之后会唤醒notFull上的等待线程。
(02) 在ConditionTest2的main函数中,启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);同时,也启动10个“读线程”,从BoundedBuffer中不断的读数据。

-------------本文结束感谢您的阅读-------------