JAVA 并发编程 - StampedLock 实现思想 时间: 2018-09-29 18:00 分类: JDK,JAVA StampedLock 的内部实现是基于 CLH 锁的。CLH 锁是一种自旋锁,自旋即不断的循环,它的操作是非常占用 CPU 的,因为它会一直循环在那里看自旋锁的保持者是否释放了锁。但它能够保证无饥饿性,提供先来先服务的公平性。 CLH 锁的基本思想如下: 锁维护一个线程等待队列,所有申请锁的线程都记录在这个队列里面,每一个节点代表一个线程,保存一个标记位(locked),用于判断当前线程是否释放锁。 当一个线程试图获取锁时,会将自身`locked`标记为置为`true`表示要获取锁,然后添加到队列尾部,使用如下代码来判断前序节点是否已释放锁: ```java while(pre.locked) { } ``` 只要前序节点`locked`没有释放锁,那么它将一直自旋(循环)等待。这样也确保了无饥饿性,锁的获取顺序如下图所示: ![微信截图_20180929164318.png][1] 可以看到每一个节点需要获得锁都需要等到前序节点的锁释放,即队列的先进先出(FIFO)特性。 而`StampedLock`正是基于 CLH 这种思想,但是实现却更为复杂。 从源码可以看到其内部维护了一个等待链表队列: ```java /** Wait nodes */ static final class WNode { volatile StampedLock.WNode prev; volatile StampedLock.WNode next; volatile StampedLock.WNode cowait; // list of linked readers volatile Thread thread; // non-null while possibly parked volatile int status; // 0, WAITING, or CANCELLED final int mode; // RMODE or WMODE WNode(int m, StampedLock.WNode p) { mode = m; prev = p; } } /** Head of CLH queue */ private transient volatile StampedLock.WNode whead; /** Tail (last) of CLH queue */ private transient volatile StampedLock.WNode wtail; /** Lock sequence/state */ private transient volatile long state; ``` `WNode`为链表的节点,每一个`WNode`代表一个等待线程。 最后面一行的`state`表示当前锁的状态,long 类型,有 64 位,倒数第 8 位表示写锁状态,如果该位为 1,表示写锁被占用。末尾 7 为表示当前正在读取的线程的数量。 对于一次乐观读的操作如下: ```java public long tryOptimisticRead() { long s; return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L; } ``` 一些常量定义如下: ```java private static final int LG_READERS = 7; // Values for lock state and stamp operations private static final long RUNIT = 1L; private static final long WBIT = 1L << LG_READERS; private static final long RBITS = WBIT - 1L; private static final long RFULL = RBITS - 1L; private static final long ABITS = RBITS | WBIT; private static final long SBITS = ~RBITS; // note overlap with ABITS // Initial value for lock state; avoid failure value zero private static final long ORIGIN = WBIT << 1; // Special value from cancelled acquire methods so caller can throw IE private static final long INTERRUPTED = 1L; // Values for node status; order matters private static final int WAITING = -1; private static final int CANCELLED = 1; // Modes for nodes (int not boolean to allow arithmetic) private static final int RMODE = 0; private static final int WMODE = 1; ``` 一次成功的乐观读必须保证当前写锁没有被占用,可以发现`tryOptimisticRead()`方法中`(s = state) & WBIT) == 0L`即用来判断写锁是否被占用,从常量中我们可以知道`WBIT`值为'0x80',`(s = state) & WBIT) == 0L`即判断`state`倒数第 8 位是否为 0。如果写锁未被占用,则将`state`的低 7 位清零后的值作为`stamp`返回。 当成功进行了一次乐观读之后,有现成进行了写之后`state`就会发生改变: ```java public long writeLock() { long s, next; // bypass acquireWrite in fully unlocked case only return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L)); } ``` `U.compareAndSwapLong(this, STATE, s, next = s + WBIT))`可以看到最后一个参数即更新`state`的值,`s + WBIT`即表示设置写锁位为 1,这样一来就会改变`state`的值,那么在校验乐观读票据的时候就会有两种情况: * 校验时写锁被占用 * 校验时写锁未被占用,但又被成功申请过至少一次以上 在《实战Java高并发程序设计》一书中应该只说到了第一种情况,观察`validate()`校验函数: ```java public boolean validate(long stamp) { U.loadFence(); return (stamp & SBITS) == (state & SBITS); } ``` 由于在乐观读时返回的票据是由`state`的末尾 7 位清零后所返回的,所以`validate()`校验的是`stamp`和`state`末尾 7 位清零的值。如果此时写锁被占用那么`state`的倒数第 8 位为 1 肯定导致校验失败。 但是有没有想过,如果有线程成功申请`写锁`并修改数据后释放锁了,倒数第 8 位不就又还原为 0 了吗,此时还会校验成功? 肯定是不能校验成功的,也就是《实战Java高并发程序设计》没有说到的另外一种情况,写锁申请成功,倒数第 8 位置为 1,释放锁的时候虽然倒数第 8 位还原为 0 了,但是高 8 位以上的值在`unlock()`里面改变了: ```java public void unlock(long stamp) { long a = stamp & ABITS, m, s; StampedLock.WNode h; while (((s = state) & SBITS) == (stamp & SBITS)) { if ((m = s & ABITS) == 0L) break; else if (m == WBIT) { if (a != m) break; state = (s += WBIT) == 0L ? ORIGIN : s; if ((h = whead) != null && h.status != 0) release(h); return; } else if (a == 0L || a >= WBIT) break; else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) release(h); return; } } else if (tryDecReaderOverflow(s) != 0L) return; } throw new IllegalMonitorStateException(); } ``` `unlockWrite`也是一样的道理,所以在进行一次写操作后,之前乐观读的校验必然失败。 在乐观读失败后,我们可以进行锁的升级,进行悲观读操作: ```java public long readLock() { long s = state, next; // bypass acquireRead on common uncontended case return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); } ``` 代码中首先使用一次 CAS 操作获取锁,并将`state`加 1,即更新悲观读锁的数量(前提是悲观读的线程数量没有溢出,据说溢出的话 JDK 也做了响应处理,我就没有去做过多的研究了),如果这次 CAS 失败,则会调用`acquireRead`二次尝试获取。 在`acquireRead()`中,线程会在不同条件下进行若干次自旋,尝试使用 CAS 获取锁,如果自旋失败,则启动 CLH 队列,将自己添加到队列的末尾,之后再进行自旋。在第一次自旋和第二次自旋的时候,均有可能在失败的时候通过`Unsafe.park()`将当前线程挂起。 一但成功获取到读锁,则会将 CLH 队列中的读线程通过`Unsafe.unpark()`全部唤起(因为读线程之间并不是冲突的)。 `acquireWrite()`与`acquireRead()`差不多。 以上就是`StampedLock`的源码简单分析,其实现也可以说是异常复杂,这里只是简单分析实现思想,具体代码实现细节不做过多深究。 [1]: https://0o0.me/usr/uploads/2018/09/3895772346.png 标签: JAVA 并发编程