admin

JAVA 并发编程 - StampedLock 实现思想
StampedLock 的内部实现是基于 CLH 锁的。CLH 锁是一种自旋锁,自旋即不断的循环,它的操作是非常占...
扫描右侧二维码阅读全文
29
2018/09

JAVA 并发编程 - StampedLock 实现思想

StampedLock 的内部实现是基于 CLH 锁的。CLH 锁是一种自旋锁,自旋即不断的循环,它的操作是非常占用 CPU 的,因为它会一直循环在那里看自旋锁的保持者是否释放了锁。但它能够保证无饥饿性,提供先来先服务的公平性。

CLH 锁的基本思想如下:
锁维护一个线程等待队列,所有申请锁的线程都记录在这个队列里面,每一个节点代表一个线程,保存一个标记位(locked),用于判断当前线程是否释放锁。
当一个线程试图获取锁时,会将自身locked标记为置为true表示要获取锁,然后添加到队列尾部,使用如下代码来判断前序节点是否已释放锁:

while(pre.locked) {
}

只要前序节点locked没有释放锁,那么它将一直自旋(循环)等待。这样也确保了无饥饿性,锁的获取顺序如下图所示:
微信截图_20180929164318.png
可以看到每一个节点需要获得锁都需要等到前序节点的锁释放,即队列的先进先出(FIFO)特性。

StampedLock正是基于 CLH 这种思想,但是实现却更为复杂。
从源码可以看到其内部维护了一个等待链表队列:

/** Wait nodes */
static final class WNode {
volatile StampedLock.WNode prev;
volatile StampedLock.WNode next;
volatile StampedLock.WNode cowait;    // list of linked readers
volatile Thread thread;   // non-null while possibly parked
volatile int status;      // 0, WAITING, or CANCELLED
final int mode;           // RMODE or WMODE
WNode(int m, StampedLock.WNode p) { mode = m; prev = p; }
}

/** Head of CLH queue */
private transient volatile StampedLock.WNode whead;
/** Tail (last) of CLH queue */
private transient volatile StampedLock.WNode wtail;

/** Lock sequence/state */
private transient volatile long state;

WNode为链表的节点,每一个WNode代表一个等待线程。
最后面一行的state表示当前锁的状态,long 类型,有 64 位,倒数第 8 位表示写锁状态,如果该位为 1,表示写锁被占用。末尾 7 为表示当前正在读取的线程的数量。
对于一次乐观读的操作如下:

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

一些常量定义如下:

private static final int LG_READERS = 7;

// Values for lock state and stamp operations
private static final long RUNIT = 1L;
private static final long WBIT  = 1L << LG_READERS;
private static final long RBITS = WBIT - 1L;
private static final long RFULL = RBITS - 1L;
private static final long ABITS = RBITS | WBIT;
private static final long SBITS = ~RBITS; // note overlap with ABITS

// Initial value for lock state; avoid failure value zero
private static final long ORIGIN = WBIT << 1;

// Special value from cancelled acquire methods so caller can throw IE
private static final long INTERRUPTED = 1L;

// Values for node status; order matters
private static final int WAITING   = -1;
private static final int CANCELLED =  1;

// Modes for nodes (int not boolean to allow arithmetic)
private static final int RMODE = 0;
private static final int WMODE = 1;

一次成功的乐观读必须保证当前写锁没有被占用,可以发现tryOptimisticRead()方法中(s = state) & WBIT) == 0L即用来判断写锁是否被占用,从常量中我们可以知道WBIT值为'0x80',(s = state) & WBIT) == 0L即判断state倒数第 8 位是否为 0。如果写锁未被占用,则将state的低 7 位清零后的值作为stamp返回。
当成功进行了一次乐观读之后,有现成进行了写之后state就会发生改变:

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
    return ((((s = state) & ABITS) == 0L &&
            U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

U.compareAndSwapLong(this, STATE, s, next = s + WBIT))可以看到最后一个参数即更新state的值,s + WBIT即表示设置写锁位为 1,这样一来就会改变state的值,那么在校验乐观读票据的时候就会有两种情况:

  • 校验时写锁被占用
  • 校验时写锁未被占用,但又被成功申请过至少一次以上
    在《实战Java高并发程序设计》一书中应该只说到了第一种情况,观察validate()校验函数:
public boolean validate(long stamp) {
    U.loadFence();
    return (stamp & SBITS) == (state & SBITS);
}

由于在乐观读时返回的票据是由state的末尾 7 位清零后所返回的,所以validate()校验的是stampstate末尾 7 位清零的值。如果此时写锁被占用那么state的倒数第 8 位为 1 肯定导致校验失败。
但是有没有想过,如果有线程成功申请写锁并修改数据后释放锁了,倒数第 8 位不就又还原为 0 了吗,此时还会校验成功?
肯定是不能校验成功的,也就是《实战Java高并发程序设计》没有说到的另外一种情况,写锁申请成功,倒数第 8 位置为 1,释放锁的时候虽然倒数第 8 位还原为 0 了,但是高 8 位以上的值在unlock()里面改变了:

public void unlock(long stamp) {
    long a = stamp & ABITS, m, s; StampedLock.WNode h;
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((m = s & ABITS) == 0L)
            break;
        else if (m == WBIT) {
            if (a != m)
                break;
            state = (s += WBIT) == 0L ? ORIGIN : s;
            if ((h = whead) != null && h.status != 0)
                release(h);
            return;
        }
        else if (a == 0L || a >= WBIT)
            break;
        else if (m < RFULL) {
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            return;
    }
    throw new IllegalMonitorStateException();
}

unlockWrite也是一样的道理,所以在进行一次写操作后,之前乐观读的校验必然失败。

在乐观读失败后,我们可以进行锁的升级,进行悲观读操作:

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    return ((whead == wtail && (s & ABITS) < RFULL &&
            U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

代码中首先使用一次 CAS 操作获取锁,并将state加 1,即更新悲观读锁的数量(前提是悲观读的线程数量没有溢出,据说溢出的话 JDK 也做了响应处理,我就没有去做过多的研究了),如果这次 CAS 失败,则会调用acquireRead二次尝试获取。

acquireRead()中,线程会在不同条件下进行若干次自旋,尝试使用 CAS 获取锁,如果自旋失败,则启动 CLH 队列,将自己添加到队列的末尾,之后再进行自旋。在第一次自旋和第二次自旋的时候,均有可能在失败的时候通过Unsafe.park()将当前线程挂起。

一但成功获取到读锁,则会将 CLH 队列中的读线程通过Unsafe.unpark()全部唤起(因为读线程之间并不是冲突的)。

acquireWrite()acquireRead()差不多。

以上就是StampedLock的源码简单分析,其实现也可以说是异常复杂,这里只是简单分析实现思想,具体代码实现细节不做过多深究。

Last modification:September 29th, 2018 at 06:00 pm
If you think my article is useful to you, please feel free to appreciate

Leave a Comment