Java并发编程之锁

Java语言synchronized关键字自带了一个内置的隐性锁(implicit lock),使用起来方便简单,但是内置锁一旦使用,则会强制将某个代码块加锁或者解锁,而且内置锁并不支持可中断的获取锁。从Java5.0开始,提供了一个并发工具包java.util.concurrent.*,实现了显性锁(explicit lock)ReentrantLock(可重入锁,可多次获取同一个锁);ReentrantLock实现了与synchronized一样的功能,确保并发过程中数据的互斥访问与可见性。获取ReentrantLock相当于进入一个synchronized代码块,而释放ReentrantLock则相当于从一个synchronized代码块退出。ReentrantLock实现了如下Lock.java接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public interface Lock {
//获取锁
void lock();

//获取锁,可响应中断
void lockInterruptibly() throws InterruptedException;

//尝试获取锁,如果可用则返回TRUE,否则返回FALSE
boolean tryLock();

//尝试在给定时间内获取锁,如果超时或者发生中断,则返回FALSE
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

//释放锁
void unlock();

//创建与该锁绑定的Conidition变量
Condition newCondition();
}

ReentrantLock支持中断地获取锁,也可以给定超时时间获取锁,从而为并发编程提供了更大的灵活性。如下代码是ReentrantLock使用的一个示例:

1
2
3
4
5
6
7
8
9
10
11

Lock lock = new ReentrantLock();

...

lock.lock();
try{
doSomething();
} finally{
lock.unlock();
}

可重入锁ReentrantLock有两种获取模式:一种是公平(fair),就是说锁按照请求的顺序获取的,而非公平(non-fair)模式则并不管等待队列中是否有线程,只要请求锁时可用,则直接获取锁。非公平锁是默认的实现方式。从表面上看,公平模式性能似乎要更好。但实际上,大多数情况非公平模式的性能都要优于公平模式(见 Doug Lea《Java Concurrency in Practice》)。

除了ReentrantLock,还有一种常用的锁ReentrantReadWriteLock(可重入读写锁)。在并发编程的情况下使用ReentrantLock虽然确保了任何时候都只有一个线程操作,保证了数据的一致性,但实际上大部分情况,共享数据往往是只需要读,而不会被修改,这样ReentrantLock对于多个读线程(Reader)的情况就显得有点过于谨慎了。那么,能否让多个读线程同时运行,而任何时刻都只有一个写线程(Writer)了?这就是ReadWriteLock需要做的事情。ReentrantReadWriteLock实现了如下ReadWriteLock接口:

1
2
3
4
5
6
7
8
9

public interface ReadWriteLock {

//读锁
Lock readLock();

//写锁
Lock writeLock();
}

读写锁确保可同时有多个Reader而任何时刻都只有一个Writer,从而提升了读写的并发程度。这种特性使得ReentrantReadWriteLock尤其适合读操作执行时间长而很少有写操作的情况。与ReentrantLock类似,读写锁也提供了Fair与Non-fair两种获取模式。在公平模式下,锁总是给那些请求等待时间最长的线程,如果读锁被一个线程持有,此时另一个线程请求写锁,则其他请求读锁的线程一直会阻塞直到获取到写锁的线程完成任务;对于非公平模式,锁获取的顺序是不定的。另外,读写锁还支持锁降级(downgrading),如果一个线程持有写锁,但需要请求读锁,此时无需释放写锁就可以直接获取读锁(其他写锁请求被阻塞),但不支持锁升级(upgrading,可能导致死锁):持有读锁的线程请求写锁。

这里我们来看一个具体的实例:利用读写锁来实现一个可支持多个Reader,单个Writer的Map对象,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public class ReadWriteMap<K,V> {
private final Map<K,V> map;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock rl = rwLock.readLock();
private final Lock wl = rwLock.writeLock();

public ReadWriteMap(Map<K,V> map){
this.map = map;
}

public void put(K key, V value){
wl.lock();

try{
map.put(key, value);
} finally{
wl.unlock();
}
}
// do same for putAll(), clear()
public void remove(Object key){
wl.lock();

try{
map.remove(key);
} finally{
wl.unlock();
}
}

public V get(Object key){
rl.lock();

try{
return map.get(key);
} finally{
rl.unlock();
}
}
}

了解了两种基本的锁,接下来这一节就来着重分析下ReentrantLock的具体实现原理。

ReentrantLock的实现原理

在分析ReentrantLock的具体原理之前,先来看下java.util.concurrent包同步器实现的基础类AbstractQueueSynchronizer(AQS)的实现机制。除了ReentrantLockReentrantReadWriteLock之外,AQS是同步类如Semaphore,CountDownLatch,FutureTask的实现基础。与内置同步synchronized不同的是,AQS支持:

  • 非阻塞的同步(Lock.tryLock())以及阻塞同步;
  • 锁获取超时,防止线程饥饿;
  • 可通过中断取消锁的获取;

AQS主要提供了两类方法:acquire操作(可能阻塞或者不阻塞当前线程)和release操作。AQS对于每个阻塞线程都维护了一个同步状态,这个状态可以是互斥的(exclusive,只有一个线程可访问)也可以是共享的(shared,多个线程可访问)。互斥模式一般用于实现锁,如ReentrantLock;而信号量Semaphore,计数门栓CountDownLatch则通过共享模式实现。

AQS实现原理

实现AQS的基本原理比较简单,一个获取操作可以按如下方式处理:

1
2
3
4
5
6
7

while(*synchronization state does not allow acquire*){
*enqueue current thread if not already queued;*
*possibly block current thread;*
}

*dequeue current thread if it was queued;*

而释放操作可以这么处理:

1
2
3
4
5

*update synchronization state*

if(*state may permit a blocked thread to acquire*)
*unblock one or more queued threads;*

要实现上述操作,需要三个基本组件的通力协作:

  • 原子地更新同步状态(通过对一个volatile变量进行CAS操作实现);
  • 阻塞线程以及解阻线程(通过LockSupport.park/unpark方法实现);
  • 维护阻塞线程队列;

这里,主要来看下AQS是如何实现阻塞线程队列的。AQS中阻塞线程队列是由一个个Node节点组成的双端链表,每个节点包含了阻塞线程,队列的头节点、尾节点、前驱节点,后驱节点以及线程同步状态、线程等待状态等信息。获取时,AQS首先通过tryAcquire(由每个同步类自己实现)尝试获取,如果获取失败,则需要将其添加到等待队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 尝试获取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 添加等待节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 入队列
enq(node);
return node;
}

入队后,不断尝试获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23


final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 前驱节点为头节点,并且设置状态成功
if (p == head && tryAcquire(arg)) {
//出队列
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
....
}
} finally {
if (failed)
cancelAcquire(node);
}
}

释放时,调用tryRelease()释放,接着解阻头节点的前驱节点线程:

1
2
3
4
5
6
7
8
9
10

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

接下来看看可重入锁ReentrantLock是如何利用AQS来实现锁的获取与释放的。

ReentrantLock的实现

ReentrantLock提供了两个构造函数:默认构造函数利用非公平模式实现;如果需要公平模式,则可通过指定模式来构造。这里NonfairSyncFairSync都是ReentrantLock的内部类。

1
2
3
4
5
6
7
8
9

public ReentrantLock() {
sync = new NonfairSync();
}

// 指定实现模式:nonfair
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

获取锁,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public void lock() {
sync.lock();
}

static final class FairSync extends Sync {
....
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

非公平方式获取时,首先不管阻塞线程队列是否有线程,直接通过CAS操作来设置状态,直至成功:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

static final class NonfairSync extends Sync {

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// AQS实现
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}


abstract static class Sync extends AbstractQueuedSynchronizer {
....
// 非公平方式获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

而对于公平方式,则首先会检查AQS等待队列中是否有线程比当前线程更早的进行获取锁的操作,如果没有才让当前线程成为锁持有者,否则当前线程需要进入等待队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
// AQS的实现
acquire(1);
}

// 公平方式获取锁,检查是否有更早的等待线程
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

有关ReentrantLock以及AQS的实现源码可参考OpenJDK

参考文献