CAS 自旋锁与 ABA 问题

2019/04/19

乐观者的人生观则相反,凡事不管最终结果如何,他都会先尝试去做,大不了最后不成功。

1. 前言

在 JDK 中有很多关于 CAS 的应用,如:

  • AQS
  • AtomicXXX 原子类
  • ConcurrentHashMap

例如在 AQS 类中,通过自旋锁的方式将结点加入到队尾:

  • 如果 compareAndSetHeadcompareAndSetTail 返回 false,意味着存在并发修改,将重新进入循环重复之前的操作
  • 如果返回 true 则意味着修改成功
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            // 队列为空,创建一个空结点作为head结点,并将tail也指向它
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { // 正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

2. 原理剖析

CAS (Compare And Swap),即比较并交换,是提供交换值原子操作的乐观实现算法。

即对于内存中的某一个值V,提供一个旧值 A 和一个新值B。如果提供的旧值 V 和 A 相等就把 B 写入 V;如果不相等则说明被修改过,返回失败状态。这个过程是原子性的。

例如下图示例,线程二在获取到内存中的值后对该值进行了操作,并试图将新值设置到内存中。但是在此之前线程一对该内存块成功进行了更新的操作,所以线程二通过 CAS 算法检测到当前内存块中的值与之前获取的不一致,所以导致更新失败:

我们可以借助 CAS 锁来实现一个简单的原子自增类 AtomicInt:仿造 AQS 的写法,通过 Unsafe 类获取到 value 值的地址偏移量,用于 compareAndSwapInt 方法的入参。

仔细观察你会发现这里获取 Unsafe 对象的方法和 JDK 中的不同,这是因为 getUnsafe 方法会判断当前调用这个方法的对象的类型,如果并非 java.util.concurrent.atomic 内的原子类、AbstractQueuedSynchronizer 等类型,则会抛出SecurityException不安全异常。因此需要反射来调用这个方法从而获得 Unsafe 对象实例:

public class AtomicInt {

    private volatile int value;

    private static final long valueOffset;

    private static final Unsafe unsafe;

    static {
        try {
            Class<?> clazz = Unsafe.class;
            Field f = clazz.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            unsafe = (Unsafe) f.get(clazz);

            // 获取到value变量的地址偏移量
            valueOffset = unsafe.objectFieldOffset(AtomicInt.class.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    public int get() {
        return value;
    }

    public final int getAndIncrement() {
        for (;;) {
            int oldValue = get();           // value变量的旧值
            int expect = current + 1;       // 操作之后的期望值
            if (compareAndSet(oldValue, expect)) {
                return oldValue; // 更新成功,返回旧值
            }
            // 更新失败,意味有并发修改,重新循环修改
        }
    }

    private boolean compareAndSet(int current, int expect) {
        // 由底层CPU硬件指针保证原子性的实现
        return unsafe.compareAndSwapInt(this, valueOffset, current, expect);
    }
}

getAndIncrement方法中为 CAS 算法的具体实现:首先计算出更新后的期望值,调用底层的compareAndSwapInt方法,如果当前 value 地址指向的值与期望值 expect 不一致将丢弃更改,返回false,重新进入循环修改。

我们用 100 个线程来进行自增的操作,CAS 锁的实现可以不使用同步锁,也能保证 AtomicInt 能原子性的自增,消除线程的竞争条件:

public class AtomicIntExample {

    private static AtomicInt atomic = new AtomicInt();

    public static void main(String[] args) throws InterruptedException {
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(atomic::getAndIncrement);
            threads.add(t);
        }
        for (Thread thread : threads) {
            thread.start();
            thread.join();
        }
        System.out.println("Value: " + atomic.get());
    }
}

// Value: 100

3. ABA 问题

考虑一种使用 CAS 的情况:

  1. 即线程 1 修改 V 值,读取出内存旧值为 A
  2. 此时,线程 1 被挂起(时间片耗尽等);线程 2 开始执行,将 V 修改为 B,然后又修改为 A
  3. 之后线程 1 又被唤醒,比较旧值与当前内存值相等,完成更新

这就是所谓的 ABA 问题,从表面上似乎没有问题。没错,其实 JDK 内置的 AtomicInteger 也存在 ABA 问题,但由于存储的是数值,即使出现了也不会造成任何影响。

但是对于自身存在状态的数据结构和内存重用机制的原因,可能就会出现意想不到的并发修改。可以参考维基百科的例子,另外在知乎上也有一个生动的例子可以帮助理解。

如何解决 ABA 问题呢?回想起我们通常使用 MySQL 来实现乐观锁时,会加一个 version 列作为这条行数据的版本号来实现。那既然是内存复用机制可能导致的情况,我们可以为数据再添加一个版本号来作为数据的唯一约束,这样也能避免 ABA 问题的发生。

其实在 JDK 中,通过 AtomicStampedReference 就能实现,可以参考 AtomicStampedReference usage