Java并发编程之原子操作

在古希腊时代,哲学家Democritus(德谟克利特)提出了关于物质构成的理论:所有物质都由不可分割的元素组成,这种元素被称之为原子(atom)。编程中所说的原子性(atomic)借用了这个概念, 用于表示某个代码指令动作的不可中断性。在之前的一篇文章(Java Memory Model)时,提到多线程并发访问共享数据时,会出现数据竞争,从而导致数据不一致的情况。比如对一个整型变量进行加一的操作: ++counter,表面上看,这是一个单一原子操作,但实际上这个操作有三个步骤:首先,需要从内存中(有可能是cache中)加载到寄存器;接着,将该值加一;最后需要将寄存器中的值写入内存。这样,在多个线程访问的情况,上述几个步骤出现交织执行,就可能出现各个线程读写数据不一致的情形。

Atomic Theory: http://www.softschools.com/timelines/atomic_theory_timeline/95/

于是,有了原子操作。原子操作确保了执行的不可中断,因而能避免数据冲突。Java从5.0开始有一个atomic的工具包专门支持intlong以及引用变量的原子操作,而在硬件层面,目前大部分处理器都支持诸如CAS(CompareAndSet/CompareAndSwap),FAA(FetchAndAdd)等原子指令。在介绍Java中的原子操作类之前,先来了解下硬件层面的原子指令。

一个执行操作的原子性,是说该操作不可中断,除非其他线程在执行原子操作(此时当前线程进行原子操作出现错误),否则当前线程需要占用CPU,直到执行完该操作为止,从而确保数据的互斥访问与可见性。

处理器原子指令

相比锁(lock),原子操作由于暂时禁止了中断,确保当前线程不会进行上下文切换,不会阻塞当前线程,效率通常要更高。因此,支持原子操作的处理器通常可以用来实现无锁算法(Non-blocking_algorithm)。常见的几个原子指令有:

  • atomic swap: 原子的交换数据,x86对应的指令为XCHG (X86_instruction_listings);
  • test and set: 向内存写入1,并返回旧值;
  • fetch and add: 从内存中读取变量,并增加其值;
  • compare and swap: CAS, 将一个变量的值与某个期望值比较,如果相等,则将新的值写入到内存;否则直接返回原值,不做任何操作(对应X86指令CMPXCHG);

这里主要看下CAS的具体原理。CAS操作涉及的参数有内存区域V,期望的旧值A,以及新值B;如果V中的当前值与期望的值相等(表示当前没有其他线程修改过V),则更新V为新值B。当多个线程同时访问V时,一个线程成功修改了变量值,其他线程则直接失败了,但CAS不会阻塞这些线程。

以下是模拟实际CAS操作的一个代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

// samples from Ch15 of <<Java Concurrency in Practice>>
public class SimulatedCAS {
private int value;

public synchronized int get(){
return value;
}

public synchronized int compareAndSwap(int expectedV, int newV){
int oldV = value;
if(oldV == expectedV){
value = newV;
}

return oldV;
}

public synchronized boolean compareAndSet(int expectedV, int newV){
return (expectedV == compareAndSwap(expectedV,newV));
}
}

Atomic工具包

Java5.0开始提供了一个Atomic的工具包,支持对int (AtomicInteger) ,longAtomicLong),boolean(AtomicBoolean)以及整型数组(AtomicIntegerArray)、引用类型(AtomicReference)、引用类型成员(AtomicReferenceFieldUpdater)的原子操作。如果使用其他基本类型数据如float,double,可以通过floatToIntBits()doubleToLongBit()longdouble转换成一个intlong再使用相应的原子类。

如下是AtomicInteger的一个代码片段,其内部整型成员value是一个volatile变量,这样可以确保多线程访问时数据的可见性;而底层封装类Unsafe提供的CAS操作则保证了AtomicInteger成员变量访问的互斥性。可以这么说,AtomicIntegervolatile的一般化实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// 确保变量的可见性
private volatile int value;

public AtomicInteger(int initialValue) {
value = initialValue;
}

public AtomicInteger() {
}

public final int get() {
return value;
}


public final void set(int newValue) {
value = newValue;
}

/**
* Eventually sets to the given value.
*/
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}

public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}


public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

....

}

最后,以AtomicInteger为例来看下Atomic包具体的使用。假定,我们想要统计一个类某个接口的调用次数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

public class AtomicExamples {
private static AtomicInteger sCalledCount = new AtomicInteger(0);


public static void main(String[] args){
final AtomicExamples atomic = new AtomicExamples();

for(int i = 0; i <= 10; ++i){
Thread t = new Thread(new Runnable(){

@Override
public void run() {
System.out.println("currently called " +
atomic.callMe() + " times");
}

},"thread-" + i);

t.start();
}
}

public int callMe(){
return sCalledCount.incrementAndGet();
}

}

参考文献