【Java并发】详解 AbstractQueuedSynchronizer
Pin Young Lv9

前言

队列同步器 AbstractQueuedSynchronizer(以下简称 AQS),是用来构建锁或者其他同步组件的基础框架。它使用一个 int 成员变量来表示同步状态,通过 CAS 操作对同步状态进行修改,确保状态的改变是安全的。通过内置的 FIFO (First In First Out)队列来完成资源获取线程的排队工作。更多关于 Java 多线程的文章可以转到 这里

AQS 和 synchronized

在介绍 AQS 的使用之前,需要首先说明一点,AQS 同步和 synchronized 关键字同步(以下简称 synchronized 同步)是采用的两种不同的机制。首先看下 synchronized 同步,synchronized 关键字经过编译之后,会在同步块的前后分别形成 monitorenter 和 monitorexit 这两个字节码指令,这两个字节码需要关联到一个监视对象,当线程执行 monitorenter 指令时,需要首先获得获得监视对象的锁,这里监视对象锁就是进入同步块的凭证,只有获得了凭证才可以进入同步块,当线程离开同步块时,会执行 monitorexit 指令,释放对象锁。

在 AQS 同步中,使用一个 int 类型的变量 state 来表示当前同步块的状态。以独占式同步(一次只能有一个线程进入同步块)为例,state 的有效值有两个 0 和 1,其中 0 表示当前同步块中没有线程,1 表示同步块中已经有线程在执行。当线程要进入同步块时,需要首先判断 state 的值是否为 0,假设为 0,会尝试将 state 修改为 1,只有修改成功了之后,线程才可以进入同步块。注意上面提到的两个条件:

  • state 为 0,证明当前同步块中没有线程在执行,所以当前线程可以尝试获得进入同步块的凭证,而这里的凭证就是是否成功将 state 修改为 1(在 synchronized 同步中,我们说的凭证是对象锁,但是对象锁的最终实现是否和这种方式类似,没有找到相关的资料)
  • 成功将 state 修改为 1,通过使用 CAS 操作,我们可以确保即便有多个线程同时修改 state,也只有一个线程会修改成功。关于 CAS 的具体解释会在后面提到。

当线程离开同步块时,会修改 state 的值,将其设为 0,并唤醒等待的线程。所以在 AQS 同步中,我们说线程获得了锁,实际上是指线程成功修改了状态变量 state,而线程释放了锁,是指线程将状态变量置为了可修改的状态(在独占式同步中就是置为了 0),让其他线程可以再次尝试修改状态变量。在下面的表述中,我们说线程获得和释放了锁,就是上述含义, 这与 synchronized 同步中说的获得和释放锁的含义不同,需要区别理解。

基本使用

本节摘自 Java 并发编程的艺术

AQS 的设计是基于模板方法的,使用者需要继承 AQS 并重写指定的方法。在后续的流程中,AQS 提供的模板方法会调用重写的方法。一般来说,我们需要重写的方法主要有下面 5 个:

方法名称描述
protected boolean tryAcquire(int)独占式获取锁,实现该方法需要查询当前状态并判断同步状态是否和预期值相同,然后使用 CAS 操作设置同步状态
protected boolean tryRelease(int)独占式释放锁,实际也是修改同步变量
protected int tryAcquireShared(int)共享式获取锁,返回大于等于 0 的值,表示获取锁成功,反之获取失败
protected boolean tryReleaseShared(int)共享式释放锁
protected boolean isHeldExclusively()判断调用该方法的线程是否持有互斥锁

在自定义的同步组件中,我们一般会调用 AQS 提供的模板方法。AQS 提供的模板方法基本上分为 3 类: 独占式获取与释放锁、共享式获取与释放锁以及查询同步队列中的等待线程情况。下面是相关的模板方法:

方法名称描述
void acquire(int)独占式获取锁,如果当前线程成功获取锁,那么方法就返回,否则会将当前线程放入同步队列等待。该方法会调用重写的 tryAcquire(int arg) 方法判断是否可以获得锁
void acquireInterruptibly(int)和 acquire(int) 相同,但是该方法响应中断,当线程在同步队列中等待时,如果线程被中断,会抛出 InterruptedException 异常并返回。
boolean tryAcquireNanos(int, long)在 acquireInterruptibly(int) 基础上添加了超时控制,同时支持中断和超时,当在指定时间内没有获得锁时,会返回 false,获取到了返回 true
void acquireShared(int)共享式获得锁,如果成功获得锁就返回,否则将当前线程放入同步队列等待,与独占式获取锁的不同是,同一时刻可以有多个线程获得共享锁,该方法调用 tryAcquireShared(int)
acquireSharedInterruptibly(int)与 acquireShared(int) 相同,该方法响应中断
tryAcquireSharedNanos(int, long)在 acquireSharedInterruptibly(int) 基础上添加了超时控制
boolean release(int)独占式释放锁,该方法会在释放锁后,将同步队列中第一个等待节点唤醒
boolean releaseShared(int)共享式释放锁
CollectiongetQueuedThreads()获得同步队列中等待的线程集合

自定义组件通过使用同步器提供的模板方法来实现自己的同步语义。下面我们通过两个示例,看下如何借助于 AQS 来实现锁的同步语义。我们首先实现一个独占锁(排它锁),独占锁就是说在某个时刻内,只能有一个线程持有独占锁,只有持有锁的线程释放了独占锁,其他线程才可以获取独占锁。下面是具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/*
* Created by Jikai Zhang on 2017/4/6.
* <p>
* 自定义独占锁
*/
public class Mutex implements Lock {

// 通过继承 AQS,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {

// 当前线程是否被独占
@Override
protected boolean isHeldExclusively() {
return getState() == 1;

}

// 尝试获得锁
@Override
protected boolean tryAcquire(int arg) {
// 只有当 state 的值为 0,并且线程成功将 state 值修改为 1 之后,线程才可以获得独占锁
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
// state 为 0 说明当前同步块中没有锁了,无需释放
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
// 将独占的线程设为 null
setExclusiveOwnerThread(null);
// 将状态变量的值设为 0,以便其他线程可以成功修改状态变量从而获得锁
setState(0);
return true;
}

Condition newCondition() {
return new ConditionObject();
}
}

// 将操作代理到 Sync 上
private final Sync sync = new Sync();

@Override
public void lock() {
sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public static void withoutMutex() throws InterruptedException {
System.out.println("Without mutex: ");
int threadCount = 2;
final Thread threads[] = new Thread[threadCount];
for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 100000; j++) {
if (j % 20000 == 0) {
System.out.println("Thread-" + index + ": j =" + j);
}
}
}
});
}

for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
}

public static void withMutex() {
System.out.println("With mutex: ");
final Mutex mutex = new Mutex();
int threadCount = 2;
final Thread threads[] = new Thread[threadCount];
for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {

@Override
public void run() {

mutex.lock();
try {
for (int j = 0; j < 100000; j++) {
if (j % 20000 == 0) {
System.out.println("Thread-" + index + ": j =" + j);
}
}
} finally {
mutex.unlock();
}
}
});
}

for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
}

public static void main(String[] args) throws InterruptedException {
withoutMutex();
System.out.println();
withMutex();

}
}

程序的运行结果如下面所示。我们看到使用了 Mutex 之后,线程 0 和线程 1 不会再交替执行,而是当一个线程执行完,另外一个线程再执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Without mutex:
Thread-0: j =0
Thread-1: j =0
Thread-0: j =20000
Thread-1: j =20000
Thread-0: j =40000
Thread-1: j =40000
Thread-0: j =60000
Thread-1: j =60000
Thread-1: j =80000
Thread-0: j =80000

With mutex:
Thread-0: j =0
Thread-0: j =20000
Thread-0: j =40000
Thread-0: j =60000
Thread-0: j =80000
Thread-1: j =0
Thread-1: j =20000
Thread-1: j =40000
Thread-1: j =60000
Thread-1: j =80000

下面在看一个共享锁的示例。在该示例中,我们定义两个共享资源,即同一时间内允许两个线程同时执行。我们将同步变量的初始状态 state 设为 2,当一个线程获取了共享锁之后,将 state 减 1,线程释放了共享锁后,将 state 加 1。状态的合法范围是 0、1 和 2,其中 0 表示已经资源已经用光了,此时线程再要获得共享锁就需要进入同步序列等待。下面是具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* Created by Jikai Zhang on 2017/4/9.
* <p>
* 自定义共享锁
*/
public class TwinsLock implements Lock {

private static class Sync extends AbstractQueuedSynchronizer {

public Sync(int resourceCount) {
if (resourceCount <= 0) {
throw new IllegalArgumentException("resourceCount must be larger than zero.");
}
// 设置可以共享的资源总数
setState(resourceCount);
}


@Override
protected int tryAcquireShared(int reduceCount) {
// 使用尝试获得资源,如果成功修改了状态变量(获得了资源)
// 或者资源的总量小于 0(没有资源了),则返回。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount - reduceCount;
if (newCount < 0 || compareAndSetState(lastCount, newCount)) {
return newCount;
}
}
}

@Override
protected boolean tryReleaseShared(int returnCount) {
// 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount + returnCount;
if (compareAndSetState(lastCount, newCount)) {
return true;
}
}
}
}

// 定义两个共享资源,说明同一时间内可以有两个线程同时运行
private final Sync sync = new Sync(2);

@Override
public void lock() {
sync.acquireShared(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
sync.releaseShared(1);
}

@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}

public static void main(String[] args) {
final Lock lock = new TwinsLock();
int threadCounts = 10;
Thread threads[] = new Thread[threadCounts];
for (int i = 0; i < threadCounts; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}

for (int i = 0; i < threadCounts; i++) {
threads[i].start();
}
}
}

运行程序,我们会发现程序每次都会同时打印两条语句,如下面的形式,证明同时有两个线程在执行。

1
2
3
4
5
6
7
8
Thread-0
Thread-1
Thread-3
Thread-2
Thread-8
Thread-4
Thread-3
Thread-6

CAS 操作

CAS(Compare and Swap),比较并交换,通过利用底层硬件平台的特性,实现原子性操作。CAS 操作涉及到3个操作数,内存值 V,旧的期望值 A,需要修改的新值 B。当且仅当预期值 A 和 内存值 V 相同时,才将内存值 V 修改为 B,否则什么都不做。CAS 操作类似于执行了下面流程

1
2
3
if(oldValue == memory[valueAddress]) {
memory[valueAddress] = newValue;
}

在上面的流程中,其实涉及到了两个操作,比较以及替换,为了确保程序正确,需要确保这两个操作的原子性(也就是说确保这两个操作同时进行,中间不会有其他线程干扰)。现在的 CPU 中,提供了相关的底层 CAS 指令,即 CPU 底层指令确保了比较和交换两个操作作为一个原子操作进行(其实在这一点上还是有排他锁的. 只是比起用synchronized, 这里的排他时间要短的多.),Java 中的 CAS 函数是借助于底层的 CAS 指令来实现的。更多关于 CPU 底层实现的原理可以参考 这篇文章。我们来看下 Java 中对于 CAS 函数的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Atomically update Java variable to x if it is currently
* holding expected.
* @return true if successful
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);

/**
* Atomically update Java variable to x if it is currently
* holding expected.
* @return true if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

/**
* Atomically update Java variable to x if it is currently
* holding expected.
* @return true if successful
*/
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

上面三个函数定义在 sun.misc.Unsafe 类中,使用该类可以进行一些底层的操作,例如直接操作原生内存,更多关于 Unsafe 类的文章可以参考 这篇。以 compareAndSwapInt 为例,我们看下如何使用 CAS 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
* Created by Jikai Zhang on 2017/4/8.
*/
public class CASIntTest {
private volatile int count = 0;

private static final Unsafe unsafe = getUnsafe();
private static final long offset;

// 获得 count 属性在 CASIntTest 中的偏移量(内存地址偏移)
static {
try {
offset = unsafe.objectFieldOffset(CASIntTest.class.getDeclaredField("count"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
// 通过反射的方式获得 Unsafe 类
public static Unsafe getUnsafe() {
Unsafe unsafe = null;
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return unsafe;
}

public void increment() {
int previous = count;
unsafe.compareAndSwapInt(this, offset, previous, previous + 1);
}

public static void main(String[] args) {
CASIntTest casIntTest = new CASIntTest();
casIntTest.increment();
System.out.println(casIntTest.count);
}
}

在 CASIntTest 类中,我们定义一个 count 变量,其中 increment 方法是将 count 的值加 1。下面是 increase 方法的代码:

1
2
int previous = count;
unsafe.compareAndSwapInt(this, offset, previous, previous + 1);

在没有线程竞争的条件下,该代码执行的结果是将 count 变量的值加 1(多个线程竞争可能会有线程执行失败),但是在 compareAndSwapInt 函数中,我们并没有传入 count 变量,那么函数是如何修改的 count 变量值?其实我们往 compareAndSwapInt 函数中传入了 count 变量在堆内存中的地址,函数直接修改了 count 变量所在内存区域。count 属性在堆内存中的地址是由 CASIntTest 实例的起始内存地址和 count 属性相对于起始内存的偏移量决定的。其中对象属性在对象中的偏移量通过 objectFieldOffset 函数获得,函数原型如下所示。该函数接受一个 Filed 类型的参数,返回该 Filed 属性在对象中的偏移量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Report the location of a given static field, in conjunction with {@link
* #staticFieldBase}.
* Do not expect to perform any sort of arithmetic on this offset;
* it is just a cookie which is passed to the unsafe heap memory accessors.
*
* Any given field will always have the same offset, and no two distinct
* fields of the same class will ever have the same offset.
*
* As of 1.4.1, offsets for fields are represented as long values,
* although the Sun JVM does not use the most significant 32 bits.
* It is hard to imagine a JVM technology which needs more than
* a few bits to encode an offset within a non-array object,
* However, for consistency with other methods in this class,
* this method reports its result as a long value.
*/
public native long objectFieldOffset(Field f);

下面我们再看一下 compareAndSwapInt 的函数原型。我们知道 CAS 操作需要知道 3 个信息:内存中的值,期望的旧值以及要修改的新值。通过前面的分析,我们知道通过 o 和 offset 我们可以确定属性在内存中的地址,也就是知道了属性在内存中的值。expected 对应期望的旧址,而 x 就是要修改的新值。

1
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

compareAndSwapInt 函数首先比较一下 expected 是否和内存中的值相同,如果不同证明其他线程修改了属性值,那么就不会执行更新操作,但是程序如果就此返回了,似乎不太符合我们的期望,我们是希望程序可以执行更新操作的,如果其他线程先进行了更新,那么就在更新后的值的基础上进行修改,所以我们一般使用循环配合 CAS 函数,使程序在更新操作完成之后再返回,如下所示:

1
2
3
4
long before = counter;
while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
before = counter;
}

下面是使用 CAS 函数实现计数器的一个实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
* Created by Jikai Zhang on 2017/4/8.
*/
public class CASCounter {

// 通过反射的方式获得 Unsafe 类
public static Unsafe getUnsafe() {
Unsafe unsafe = null;
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return unsafe;
}

private volatile long counter = 0;
private static final long offset;
private static final Unsafe unsafe = getUnsafe();

static {
try {
offset = unsafe.objectFieldOffset(CASCounter.class.getDeclaredField("counter"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}

public void increment() {
long before = counter;
while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
before = counter;
}
}

public long getCounter() {
return counter;
}

private static long intCounter = 0;

public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
Thread threads[] = new Thread[threadCount];
final CASCounter casCounter = new CASCounter();

for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {

for (int i = 0; i < 10000; i++) {
casCounter.increment();
intCounter++;
}
}
});
threads[i].start();
}

for(int i = 0; i < threadCount; i++) {
threads[i].join();
}
System.out.printf("CASCounter is %d \nintCounter is %d\n", casCounter.getCounter(), intCounter);
}
}

在 AQS 中,对原始的 CAS 函数封装了一下,省去了获得变量地址的步骤,如下面的形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
private static final long waitStatusOffset;

static {
try {
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
} catch (Exception ex) {
throw new Error(ex);
}
}

private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}

同步队列

AQS 依赖内部的同步队列(一个 FIFO的双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把队列中第一个等待节点线程唤醒(下图中的 Node1),使其再次尝试获取同步状态。同步队列的结构如下所示:

image

图片来自 http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer

Head 节点本身不保存等待线程的信息,它通过 next 变量指向第一个保存线程等待信息的节点(Node1)。当线程被唤醒之后,会删除 Head 节点,而唤醒线程所在的节点会设置为 Head 节点(Node1 被唤醒之后,Node1会被置为 Head 节点)。下面我们看下 JDK 中同步队列的实现。

Node 类

首先看在节点所对应的 Node 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static final class Node {

/**
* 标志是独占式模式还是共享模式
*/
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

/**
* 线程等待状态的有效值
*/
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

/**
* 线程状态,合法值为上面 4 个值中的一个
*/
volatile int waitStatus;

/**
* 当前节点的前置节点
*/
volatile Node prev;

/**
* 当前节点的后置节点
*/
volatile Node next;

/**
* 当前节点所关联的线程
*/
volatile Thread thread;

/**
* 指向下一个在某个条件上等待的节点,或者指向 SHARE 节点,表明当前处于共享模式
*/
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

在 Node 类中定义了四种等待状态:

  • CANCELED: 1,因为等待超时 (timeout)或者中断(interrupt),节点会被置为取消状态。处于取消状态的节点不会再去竞争锁,也就是说不会再被阻塞。节点会一直保持取消状态,而不会转换为其他状态。处于 CANCELED 的节点会被移出队列,被 GC 回收。
  • SIGNAL: -1,表明当前的后继结点正在或者将要被阻塞(通过使用 LockSupport.pack 方法),因此当前的节点被释放(release)或者被取消时(cancel)时,要唤醒它的后继结点(通过 LockSupport.unpark 方法)。
  • CONDITION: -2,表明当前节点在条件队列中,因为等待某个条件而被阻塞。
  • PROPAGATE: -3,在共享模式下,可以认为资源有多个,因此当前线程被唤醒之后,可能还有剩余的资源可以唤醒其他线程。该状态用来表明后续节点会传播唤醒的操作。需要注意的是只有头节点才可以设置为该状态(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
  • 0:新创建的节点会处于这种状态

独占锁的获取和释放

我们首先看下独占锁的获取和释放过程

独占锁获取

下面是获取独占锁的流程图:
image

我们通过 acquire 方法来获取独占锁,下面是方法定义

1
2
3
4
5
6
7
8
9
public final void acquire(int arg) {
// 首先尝试获取锁,如果获取失败,会先调用 addWaiter 方法创建节点并追加到队列尾部
// 然后调用 acquireQueued 阻塞或者循环尝试获取锁
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
// 在 acquireQueued 中,如果线程是因为中断而退出的阻塞状态会返回 true
// 这里的 selfInterrupt 主要是为了恢复线程的中断状态
selfInterrupt();
}
}

acquire 会首先调用 tryAcquire 方法来获得锁,该方法需要我们来实现,这个在前面已经提过了。如果没有获取锁,会调用 addWaiter 方法创建一个和当前线程关联的节点追加到同步队列的尾部,我们调用 addWaiter 时传入的是 Node.EXCLUSIVE,表明当前是独占模式。下面是 addWaiter 的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// tail 指向同步队列的尾节点
Node pred = tail;
// Try the fast path of enq; backup to full enq on failure
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

addWaiter 方法会首先调用 if 方法,来判断能否成功将节点添加到队列尾部,如果添加失败,再调用 enq 方法(使用循环不断重试)进行添加,下面是 enq 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 同步队列采用的懒初始化(lazily initialized)的方式,
// 初始时 head 和 tail 都会被设置为 null,当一次被访问时
// 才会创建 head 对象,并把尾指针指向 head。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

addWaiter 仅仅是将节点加到了同步队列的末尾,并没有阻塞线程,线程阻塞的操作是在 acquireQueued 方法中完成的,下面是 acquireQueued 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果当前节点的前继节点是 head,就使用自旋(循环)的方式不断请求锁
if (p == head && tryAcquire(arg)) {
// 成功获得锁,将当前节点置为 head 节点,同时删除原 head 节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

// shouldParkAfterFailedAcquire 检查是否可以挂起线程,
// 如果可以挂起进程,会调用 parkAndCheckInterrupt 挂起线程,
// 如果 parkAndCheckInterrupt 返回 true,表明当前线程是因为中断而退出挂起状态的,
// 所以要将 interrupted 设为 true,表明当前线程被中断过
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireQueued 会首先检查当前节点的前继节点是否为 head,如果为 head,将使用自旋的方式不断的请求锁,如果不是 head,则调用 shouldParkAfterFailedAcquire 查看是否应该挂起当前节点关联的线程,下面是 shouldParkAfterFailedAcquire 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 当前节点的前继节点的等待状态
int ws = pred.waitStatus;
// 如果前继节点的等待状态为 SIGNAL 我们就可以将当前节点对应的线程挂起
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// ws 大于 0,表明当前线程的前继节点处于 CANCELED 的状态,
// 所以我们需要从当前节点开始往前查找,直到找到第一个不为
// CAECELED 状态的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire 会检查前继节点的等待状态,如果前继节点状态为 SIGNAL,则可以将当前节点关联的线程挂起,如果不是 SIGNAL,会做一些其他的操作,在当前循环中不会挂起线程。如果确定了可以挂起线程,就调用 parkAndCheckInterrupt 方法对线程进行阻塞:

1
2
3
4
5
6
7
8
9
10
private final boolean parkAndCheckInterrupt() {
// 挂起当前线程
LockSupport.park(this);
// 可以通过调用 interrupt 方法使线程退出 park 状态,
// 为了使线程在后面的循环中还可以响应中断,会重置线程的中断状态。
// 这里使用 interrupted 会先返回线程当前的中断状态,然后将中断状态重置为 false,
// 线程的中断状态会返回给上层调用函数,在线程获得锁后,
// 如果发现线程曾被中断过,会将中断状态重新设为 true
return Thread.interrupted();
}

独占锁释放

下面是释放独占锁的流程:
image

通过 release 方法,我们可以释放互斥锁。下面是 release 方法的实现:

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// waitStatus 为 0,证明是初始化的空队列或者后继结点已经被唤醒了
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

在独占模式下释放锁时,是没有其他线程竞争的,所以处理会简单一些。首先尝试释放锁,如果失败就直接返回(失败不是因为多线程竞争,而是线程本身就不拥有锁)。如果成功的话,会检查 h 的状态,然后调用 unparkSuccessor 方法来唤醒后续线程。下面是 unparkSuccessor 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
// 将 head 节点的状态置为 0,表明当前节点的后续节点已经被唤醒了,
// 不需要再次唤醒,修改 ws 状态主要作用于 release 的判断
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

在 unparkSuccessor 方法中,如果发现头节点的后继结点为 null 或者处于 CANCELED 状态,会从尾部往前找(在节点存在的前提下,这样一定能找到)离头节点最近的需要唤醒的节点,然后唤醒该节点。

共享锁获取和释放

独占锁的流程和原理比较容易理解,因为只有一个锁,但是共享锁的处理就相对复杂一些了。在独占锁中,只有在释放锁之后,才能唤醒等待的线程,而在共享模式中,获取锁和释放锁之后,都有可能唤醒等待的线程。如果想要理清共享锁的工作过程,必须将共享锁的获取和释放结合起来看。这里我们先看一下共享锁的释放过程,只有明白了释放过程做了哪些工作,才能更好的理解获取锁的过程。

共享锁释放

下面是释放共享锁的流程:
image

通过 releaseShared 方法会释放共享锁,下面是具体的实现:

1
2
3
4
5
6
7
public final boolean releaseShared(int releases) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

releases 是要释放的共享资源数量,其中 tryReleaseShared 的方法由我们自己重写,该方法的主要功能就是修改共享资源的数量(state + releases),因为可能会有多个线程同时释放资源,所以实现的时候,一般采用循环加 CAS 操作的方式,如下面的形式:

1
2
3
4
5
6
7
8
9
10
protected boolean tryReleaseShared(int releases) {
// 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
for (;;) {
int lastCount = getState();
int newCount = lastCount + releases;
if (compareAndSetState(lastCount, newCount)) {
return true;
}
}
}

当共享资源数量修改了之后,会调用 doReleaseShared 方法,该方法主要唤醒同步队列中的第一个等待节点(head.next),下面是具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// head = null 说明没有初始化,head = tail 说明同步队列中没有等待节点
if (h != null && h != tail) {
// 查看当前节点的等待状态
int ws = h.waitStatus;
// 我们在前面说过,SIGNAL说明有后续节点需要唤醒
if (ws == Node.SIGNAL) {

/*
* 将当前节点的值设为 0,表明已经唤醒了后继节点
* 可能会有多个线程同时执行到这一步,所以使用 CAS 保证只有一个线程能修改成功,
* 从而执行 unparkSuccessor,其他的线程会执行 continue 操作
*/
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
/*
* ws 等于 0,说明无需唤醒后继结点(后续节点已经被唤醒或者当前节点没有被阻塞的后继结点),
* 也就是这一次的调用其实并没有执行唤醒后继结点的操作。就类似于我只需要一张优惠券,
* 但是我的两个朋友,他们分别给我了一张,因此我就剩余了一张。然后我就将这张剩余的优惠券
* 送(传播)给其他人使用,因此这里将节点置为可传播的状态(PROPAGATE)
*/
continue; // loop on failed CAS
}
}
if (h == head) // loop if head changed
break;
}
}

从上面的实现中,doReleaseShared 的主要作用是用来唤醒阻塞的节点并且一次只唤醒一个,让该节点关联的线程去重新竞争锁,它既不修改同步队列,也不修改共享资源。

当多个线程同时释放资源时,可以确保两件事:

  1. 共享资源的数量能正确的累加
  2. 至少有一个线程被唤醒,其实只要确保有一个线程被唤醒就可以了,即便唤醒了多个线程,在同一时刻,也只能有一个线程能得到竞争锁的资格,在下面我们会看到。

所以释放锁做的主要工作还是修改共享资源的数量。而有了多个共享资源后,如何确保同步队列中的多个节点可以获取锁,是由获取锁的逻辑完成的。下面看下共享锁的获取。

共享锁的获取

下面是获取共享锁的流程
image

通过 acquireShared 方法,我们可以申请共享锁,下面是具体的实现:

1
2
3
4
5
public final void acquireShared(int arg) {
// 如果返回结果小于 0,证明没有获取到共享资源
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

如果没有获取到共享资源,就会执行 doAcquireShared 方法,下面是该方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

从上面的代码中可以看到,只有前置节点为 head 的节点才有可能去竞争锁,这点和独占模式的处理是一样的,所以即便唤醒了多个线程,也只有一个线程能进入竞争锁的逻辑,其余线程会再次进入 park 状态,当线程获取到共享锁之后,会执行 setHeadAndPropagate 方法,下面是具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void setHeadAndPropagate(Node node, long propagate) {
// 备份一下头节点
Node h = head; // Record old head for check below
/*
* 移除头节点,并将当前节点置为头节点
* 当执行完这一步之后,其实队列的头节点已经发生改变,
* 其他被唤醒的线程就有机会去获取锁,从而并发的执行该方法,
* 所以上面备份头节点,以便下面的代码可以正确运行
*/
setHead(node);

/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/

/*
* 判断是否需要唤醒后继结点,propagate > 0 说明共享资源有剩余,
* h.waitStatus < 0,表明当前节点状态可能为 SIGNAL,CONDITION,PROPAGATE
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 只有 s 不处于独占模式时,才去唤醒后继结点
if (s == null || s.isShared())
doReleaseShared();
}
}

判断后继结点是否需要唤醒的条件是十分宽松的,也就是一定包含必要的唤醒,但是也有可能会包含不必要的唤醒。从前面我们可以知道 doReleaseShared 函数的主要作用是唤醒后继结点,它既不修改共享资源,也不修改同步队列,所以即便有不必要的唤醒也是不影响程序正确性的。如果没有共享资源,节点会再次进入等待状态。

到了这里,脉络就比较清晰了,当一个节点获取到共享锁之后,它除了将自身设为 head 节点之外,还会判断一下是否满足唤醒后继结点的条件,如果满足,就唤醒后继结点,后继结点获取到锁之后,会重复这个过程,直到判断条件不成立。就类似于考试时从第一排往最后一排传卷子,第一排先留下一份,然后将剩余的传给后一排,后一排会重复这个过程。如果传到某一排卷子没了,那么位于这排的人就要等待,直到老师又给了他新的卷子。

中断

在获取锁时还可以设置响应中断,独占锁和共享锁的处理逻辑类似,这里我们以独占锁为例。使用 acquireInterruptibly 方法,在获取独占锁时可以响应中断,下面是具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
// 这里会抛出异常
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}

从上面的代码中我们可以看出,acquireInterruptibly 和 acquire 的逻辑类似,只是在下面的代码处有所不同:当线程因为中断而退出阻塞状态时,会直接抛出 InterruptedException 异常。

1
2
3
4
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
// 这里会抛出异常
throw new InterruptedException();
}

我们知道,不管是抛出异常还是方法返回,程序都会执行 finally 代码,而 failed 肯定为 true,所以抛出异常之后会执行 cancelAcquire 方法,cancelAcquire 方法主要将节点从同步队列中移除。下面是具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// 跳过前面的已经取消的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 保存下 pred 的后继结点,以便 CAS 操作使用
// 因为可能存在已经取消的节点,所以 pred.next 不一等于 node
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 将节点状态设为 CANCELED
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

从上面的代码可以看出,节点的删除分为三种情况:

  • 删除节点为尾节点,直接将该节点的第一个有效前置节点置为尾节点
  • 删除节点的前置节点为头节点,则对该节点执行 unparkSuccessor 操作
  • 删除节点为中间节点,结果如下图所示。下图中(1)表示同步队列的初始状态,假设删除 node2, node1 是正常节点(非 CANCELED),(2)就是删除 node2 后同步队列的状态,此时 node1 节点的后继已经变为 node3,也就是说当 node1 变为 head 之后,会直接唤醒 node3。当另外的一个节点中断之后再次执行 cancelAcquire,在执行下面的代码时,会使同步队列的状态由(2)变为(3),此时 node2 已经没有外界指针了,可以被回收了。如果一直没有另外一个节点中断,也就是同步队列一直处于(2)状态,那么需要等 node3 被回收之后,node2 才可以被回收。
    1
    2
    3
    Node pred = node.prev;
    while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;

image

超时

超时是在中断的基础上加了一层时间的判断,这里我们还是以独占锁为例。 tryAcquireNanos 支持获取锁的超时处理,下面是具体实现:

1
2
3
4
5
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

当获取锁失败之后,会执行 doAcquireNanos 方法,下面是具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0 L)
return false;

// 线程最晚结束时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 判断是否超时,如果超时就返回
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0 L)
return false;

// 这里如果设定了一个阈值,如果超时的时间比阈值小,就认为
// 当前线程没必要阻塞,再执行几次 for 循环估计就超时了
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);

if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

当线程超时返回时,还是会执行 cancelAcquire 方法,cancelAcquire 的逻辑已经在前面说过了,这里不再赘述。

参考文章