Java语言synchronized
关键字自带了一个内置的隐性锁(implicit lock),使用起来方便简单,但是内置锁一旦使用,则会强制将某个代码块加锁或者解锁,而且内置锁并不支持可中断的获取锁。从Java5.0开始,提供了一个并发工具包java.util.concurrent.*
,实现了显性锁(explicit lock)ReentrantLock
(可重入锁,可多次获取同一个锁);ReentrantLock
实现了与synchronized
一样的功能,确保并发过程中数据的互斥访问与可见性。获取ReentrantLock
相当于进入一个synchronized
代码块,而释放ReentrantLock
则相当于从一个synchronized
代码块退出。ReentrantLock
实现了如下Lock.java
接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public interface Lock { void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock();
Condition newCondition(); }
|
ReentrantLock
支持中断地获取锁,也可以给定超时时间获取锁,从而为并发编程提供了更大的灵活性。如下代码是ReentrantLock
使用的一个示例:
1 2 3 4 5 6 7 8 9 10 11 12
| Lock lock = new ReentrantLock();
...
lock.lock(); try{ doSomething(); } finally{ lock.unlock(); }
|
可重入锁ReentrantLock
有两种获取模式:一种是公平(fair),就是说锁按照请求的顺序获取的,而非公平(non-fair)模式则并不管等待队列中是否有线程,只要请求锁时可用,则直接获取锁。非公平锁是默认的实现方式。从表面上看,公平模式性能似乎要更好。但实际上,大多数情况非公平模式的性能都要优于公平模式(见 Doug Lea《Java Concurrency in Practice》)。
除了ReentrantLock
,还有一种常用的锁ReentrantReadWriteLock
(可重入读写锁)。在并发编程的情况下使用ReentrantLock
虽然确保了任何时候都只有一个线程操作,保证了数据的一致性,但实际上大部分情况,共享数据往往是只需要读,而不会被修改,这样ReentrantLock
对于多个读线程(Reader)的情况就显得有点过于谨慎了。那么,能否让多个读线程同时运行,而任何时刻都只有一个写线程(Writer)了?这就是ReadWriteLock
需要做的事情。ReentrantReadWriteLock
实现了如下ReadWriteLock
接口:
1 2 3 4 5 6 7 8 9 10
| public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
|
读写锁确保可同时有多个Reader而任何时刻都只有一个Writer,从而提升了读写的并发程度。这种特性使得ReentrantReadWriteLock
尤其适合读操作执行时间长而很少有写操作的情况。与ReentrantLock
类似,读写锁也提供了Fair与Non-fair两种获取模式。在公平模式下,锁总是给那些请求等待时间最长的线程,如果读锁被一个线程持有,此时另一个线程请求写锁,则其他请求读锁的线程一直会阻塞直到获取到写锁的线程完成任务;对于非公平模式,锁获取的顺序是不定的。另外,读写锁还支持锁降级(downgrading),如果一个线程持有写锁,但需要请求读锁,此时无需释放写锁就可以直接获取读锁(其他写锁请求被阻塞),但不支持锁升级(upgrading,可能导致死锁):持有读锁的线程请求写锁。
这里我们来看一个具体的实例:利用读写锁来实现一个可支持多个Reader,单个Writer的Map对象,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class ReadWriteMap<K,V> { private final Map<K,V> map; private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock rl = rwLock.readLock(); private final Lock wl = rwLock.writeLock(); public ReadWriteMap(Map<K,V> map){ this.map = map; } public void put(K key, V value){ wl.lock(); try{ map.put(key, value); } finally{ wl.unlock(); } } public void remove(Object key){ wl.lock(); try{ map.remove(key); } finally{ wl.unlock(); } } public V get(Object key){ rl.lock(); try{ return map.get(key); } finally{ rl.unlock(); } } }
|
了解了两种基本的锁,接下来这一节就来着重分析下ReentrantLock
的具体实现原理。
ReentrantLock的实现原理
在分析ReentrantLock
的具体原理之前,先来看下java.util.concurrent
包同步器实现的基础类AbstractQueueSynchronizer
(AQS)的实现机制。除了ReentrantLock
,ReentrantReadWriteLock
之外,AQS是同步类如Semaphore
,CountDownLatch
,FutureTask
的实现基础。与内置同步synchronized
不同的是,AQS支持:
- 非阻塞的同步(
Lock.tryLock()
)以及阻塞同步;
- 锁获取超时,防止线程饥饿;
- 可通过中断取消锁的获取;
AQS主要提供了两类方法:acquire
操作(可能阻塞或者不阻塞当前线程)和release
操作。AQS对于每个阻塞线程都维护了一个同步状态,这个状态可以是互斥的(exclusive,只有一个线程可访问)也可以是共享的(shared,多个线程可访问)。互斥模式一般用于实现锁,如ReentrantLock
;而信号量Semaphore
,计数门栓CountDownLatch
则通过共享模式实现。
AQS实现原理
实现AQS的基本原理比较简单,一个获取操作可以按如下方式处理:
1 2 3 4 5 6 7 8
| while(*synchronization state does not allow acquire*){ *enqueue current thread if not already queued;* *possibly block current thread;* }
*dequeue current thread if it was queued;*
|
而释放操作可以这么处理:
1 2 3 4 5 6
| *update synchronization state*
if(*state may permit a blocked thread to acquire*) *unblock one or more queued threads;*
|
要实现上述操作,需要三个基本组件的通力协作:
- 原子地更新同步状态(通过对一个
volatile
变量进行CAS操作实现);
- 阻塞线程以及解阻线程(通过
LockSupport.park/unpark
方法实现);
- 维护阻塞线程队列;
这里,主要来看下AQS是如何实现阻塞线程队列的。AQS中阻塞线程队列是由一个个Node
节点组成的双端链表,每个节点包含了阻塞线程,队列的头节点、尾节点、前驱节点,后驱节点以及线程同步状态、线程等待状态等信息。获取时,AQS首先通过tryAcquire
(由每个同步类自己实现)尝试获取,如果获取失败,则需要将其添加到等待队列中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
|
入队后,不断尝试获取:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } .... } } finally { if (failed) cancelAcquire(node); } }
|
释放时,调用tryRelease()
释放,接着解阻头节点的前驱节点线程:
1 2 3 4 5 6 7 8 9 10 11 12
| public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
接下来看看可重入锁ReentrantLock
是如何利用AQS来实现锁的获取与释放的。
ReentrantLock的实现
ReentrantLock
提供了两个构造函数:默认构造函数利用非公平模式实现;如果需要公平模式,则可通过指定模式来构造。这里NonfairSync
跟FairSync
都是ReentrantLock
的内部类。
1 2 3 4 5 6 7 8 9 10
| public ReentrantLock() { sync = new NonfairSync(); }
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
|
获取锁,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public void lock() { sync.lock(); }
static final class FairSync extends Sync { .... protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
|
非公平方式获取时,首先不管阻塞线程队列是否有线程,直接通过CAS操作来设置状态,直至成功:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| static final class NonfairSync extends Sync {
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
abstract static class Sync extends AbstractQueuedSynchronizer { .... final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
|
而对于公平方式,则首先会检查AQS等待队列中是否有线程比当前线程更早的进行获取锁的操作,如果没有才让当前线程成为锁持有者,否则当前线程需要进入等待队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L;
final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
|
有关ReentrantLock
以及AQS的实现源码可参考OpenJDK 。
参考文献