JAVA 并发编程 - StampedLock 可重入性与 CPU 暴涨问题分析 时间: 2018-09-29 16:01 分类: JDK,JAVA 前面一篇文章学习了`StampedLock`的简单使用,后来搜索资料时发现有人说`StampedLock`是不可重入的锁,于是随便写了个 Demo 验证了一把发现的确如此: ```java import java.util.concurrent.locks.StampedLock; public class StampedLockTest { public static void main(String[] args) { StampedLock sl = new StampedLock(); long stamp1 = sl.readLock(); long stamp2 = sl.readLock(); System.out.println(stamp1 + ".................." + stamp2); sl.unlock(stamp1); sl.unlock(stamp2); stamp1 = sl.writeLock(); stamp2 = sl.writeLock(); System.out.println(stamp1 + ".................." + stamp2); sl.unlock(stamp1); sl.unlock(stamp2); } } ``` 运行以上程序,发现只打印了一次`stamp1`和`stamp2`,那是因为`StampedLock`读锁之间是没有冲突的,所以不回阻塞,而写锁之间是冲突的,但是在同一个线程中,写锁未释放之前再次申请锁是不会成功的,所以`stamp2 = sl.writeLock();`将会一直等待导致死锁,即`StampedLock`的读写锁是不可重入的(读锁之间并不冲突),上面程序只验证了`writeLock`和`writeLock`是不可重入的,实际上`writeLock`和`readLock`也是不可重入的。 还有一个问题就是`StampedLock`导致 CPU 爆满的问题,有人说是 BUG,查看源码后还不如说它设计就是如此,首先,查看两个关键的方法: ```java public long readLock() { long s = state, next; // bypass acquireRead on common uncontended case return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); } public long writeLock() { long s, next; // bypass acquireWrite in fully unlocked case only return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L)); } ``` 可以看到申请锁的操作在进行一次 CAS 操作失败后都会调用`acquireRead(false, 0L)`或者`acquireWrite(false, 0L)`方法去申请锁,继续查看这两个方法的源码: ```java /** * See above for explanation. * * @param interruptible true if should check interrupts and if so * return INTERRUPTED * @param deadline if nonzero, the System.nanoTime value to timeout * at (and return zero) * @return next state, or INTERRUPTED */ private long acquireWrite(boolean interruptible, long deadline) { StampedLock.WNode node = null, p; for (int spins = -1;;) { // spin while enqueuing long m, s, ns; if ((m = (s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } else if (spins < 0) spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { // initialize queue StampedLock.WNode hd = new StampedLock.WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) node = new StampedLock.WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } } for (int spins = -1;;) { StampedLock.WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long s, ns; if (((s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // help release stale waiters StampedLock.WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } } /** * See above for explanation. * * @param interruptible true if should check interrupts and if so * return INTERRUPTED * @param deadline if nonzero, the System.nanoTime value to timeout * at (and return zero) * @return next state, or INTERRUPTED */ private long acquireRead(boolean interruptible, long deadline) { StampedLock.WNode node = null, p; for (int spins = -1;;) { StampedLock.WNode h; if ((h = whead) == (p = wtail)) { for (long m, s, ns;;) { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; else if (m >= WBIT) { if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else { if (spins == 0) { StampedLock.WNode nh = whead, np = wtail; if ((nh == h && np == p) || (h = nh) != (p = np)) break; } spins = SPINS; } } } } if (p == null) { // initialize queue StampedLock.WNode hd = new StampedLock.WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) node = new StampedLock.WNode(RMODE, p); else if (h == p || p.mode != RMODE) { if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } } else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) node.cowait = null; else { for (;;) { StampedLock.WNode pp, c; Thread w; if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) // help release U.unpark(w); if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT); } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } for (int spins = -1;;) { StampedLock.WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { StampedLock.WNode c; Thread w; whead = node; node.prev = null; while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { StampedLock.WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } } ``` 可以看到,其内部实现都是两个死循环,在死循环中,在它挂起线程时,使用的是`Unsafe.park()`函数,而`park()`函数在遇到线程中断时会直接返回,而不是像`Thread.sleep()`那样会抛出异常。虽然这两个申请锁的方法第一个参数`interruptible`表示是否处理中断,但是`readLock()`与`writeLock()`中都是传的 false,所以并不会处理中断,也就是代码中`if (interruptible && Thread.interrupted())`后面的`Thread.interrupted()`不会执行,中断转态不会清除,那么这样一来就会导致线程在调用`park()`函数挂起时遇到中断会再次陷入循环,并且中断转态没有清除,后面的循环中调用`park()`都将立即返回导致无法将线程挂起,所以当退出条件得不到满足条件时,将导致 CPU 暴涨。 下面演示程序模拟了 CPU 暴涨的情况: ```java import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.StampedLock; public class StampedLockTest { public static void main(String[] args) throws InterruptedException { Thread[] threads = new Thread[3]; StampedLock sl = new StampedLock(); //启动一个写线程长时间占用锁 new Thread(() -> { long stamp = sl.writeLock(); LockSupport.parkNanos(60000000000000L); sl.unlock(stamp); }).start(); Thread.sleep(100); //启动三个读线程 for (int i = 0; i < 3; i++) { threads[i] = new Thread(() -> { long stamp = sl.readLock(); System.out.println(Thread.currentThread().getName() + "获得锁."); sl.unlock(stamp); }); threads[i].start(); } //5秒后等到三个线程因写锁被占用导致挂起后进行中断 Thread.sleep(5000); for (int i = 0 ; i < 3; i++) { threads[i].interrupt(); } } } ``` 标签: JAVA 并发编程