前面一篇文章学习了StampedLock
的简单使用,后来搜索资料时发现有人说StampedLock
是不可重入的锁,于是随便写了个 Demo 验证了一把发现的确如此:
import java.util.concurrent.locks.StampedLock;
public class StampedLockTest {
public static void main(String[] args) {
StampedLock sl = new StampedLock();
long stamp1 = sl.readLock();
long stamp2 = sl.readLock();
System.out.println(stamp1 + ".................." + stamp2);
sl.unlock(stamp1);
sl.unlock(stamp2);
stamp1 = sl.writeLock();
stamp2 = sl.writeLock();
System.out.println(stamp1 + ".................." + stamp2);
sl.unlock(stamp1);
sl.unlock(stamp2);
}
}
运行以上程序,发现只打印了一次stamp1
和stamp2
,那是因为StampedLock
读锁之间是没有冲突的,所以不回阻塞,而写锁之间是冲突的,但是在同一个线程中,写锁未释放之前再次申请锁是不会成功的,所以stamp2 = sl.writeLock();
将会一直等待导致死锁,即StampedLock
的读写锁是不可重入的(读锁之间并不冲突),上面程序只验证了writeLock
和writeLock
是不可重入的,实际上writeLock
和readLock
也是不可重入的。
还有一个问题就是StampedLock
导致 CPU 爆满的问题,有人说是 BUG,查看源码后还不如说它设计就是如此,首先,查看两个关键的方法:
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
可以看到申请锁的操作在进行一次 CAS 操作失败后都会调用acquireRead(false, 0L)
或者acquireWrite(false, 0L)
方法去申请锁,继续查看这两个方法的源码:
/**
* See above for explanation.
*
* @param interruptible true if should check interrupts and if so
* return INTERRUPTED
* @param deadline if nonzero, the System.nanoTime value to timeout
* at (and return zero)
* @return next state, or INTERRUPTED
*/
private long acquireWrite(boolean interruptible, long deadline) {
StampedLock.WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else if ((p = wtail) == null) { // initialize queue
StampedLock.WNode hd = new StampedLock.WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new StampedLock.WNode(WMODE, p);
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
for (int spins = -1;;) {
StampedLock.WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
whead = node;
node.prev = null;
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
}
else if (h != null) { // help release stale waiters
StampedLock.WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
/**
* See above for explanation.
*
* @param interruptible true if should check interrupts and if so
* return INTERRUPTED
* @param deadline if nonzero, the System.nanoTime value to timeout
* at (and return zero)
* @return next state, or INTERRUPTED
*/
private long acquireRead(boolean interruptible, long deadline) {
StampedLock.WNode node = null, p;
for (int spins = -1;;) {
StampedLock.WNode h;
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
else if (m >= WBIT) {
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
if (spins == 0) {
StampedLock.WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
if (p == null) { // initialize queue
StampedLock.WNode hd = new StampedLock.WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new StampedLock.WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
node.cowait = null;
else {
for (;;) {
StampedLock.WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
for (int spins = -1;;) {
StampedLock.WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
StampedLock.WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
else if (h != null) {
StampedLock.WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
可以看到,其内部实现都是两个死循环,在死循环中,在它挂起线程时,使用的是Unsafe.park()
函数,而park()
函数在遇到线程中断时会直接返回,而不是像Thread.sleep()
那样会抛出异常。虽然这两个申请锁的方法第一个参数interruptible
表示是否处理中断,但是readLock()
与writeLock()
中都是传的 false,所以并不会处理中断,也就是代码中if (interruptible && Thread.interrupted())
后面的Thread.interrupted()
不会执行,中断转态不会清除,那么这样一来就会导致线程在调用park()
函数挂起时遇到中断会再次陷入循环,并且中断转态没有清除,后面的循环中调用park()
都将立即返回导致无法将线程挂起,所以当退出条件得不到满足条件时,将导致 CPU 暴涨。
下面演示程序模拟了 CPU 暴涨的情况:
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;
public class StampedLockTest {
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[3];
StampedLock sl = new StampedLock();
//启动一个写线程长时间占用锁
new Thread(() -> {
long stamp = sl.writeLock();
LockSupport.parkNanos(60000000000000L);
sl.unlock(stamp);
}).start();
Thread.sleep(100);
//启动三个读线程
for (int i = 0; i < 3; i++) {
threads[i] = new Thread(() -> {
long stamp = sl.readLock();
System.out.println(Thread.currentThread().getName() + "获得锁.");
sl.unlock(stamp);
});
threads[i].start();
}
//5秒后等到三个线程因写锁被占用导致挂起后进行中断
Thread.sleep(5000);
for (int i = 0 ; i < 3; i++) {
threads[i].interrupt();
}
}
}