JAVA 并发编程 - StampedLock 可重入性与 CPU 暴涨问题分析

前面一篇文章学习了StampedLock的简单使用,后来搜索资料时发现有人说StampedLock是不可重入的锁,于是随便写了个 Demo 验证了一把发现的确如此:

  1. import java.util.concurrent.locks.StampedLock;
  2. public class StampedLockTest {
  3. public static void main(String[] args) {
  4. StampedLock sl = new StampedLock();
  5. long stamp1 = sl.readLock();
  6. long stamp2 = sl.readLock();
  7. System.out.println(stamp1 + ".................." + stamp2);
  8. sl.unlock(stamp1);
  9. sl.unlock(stamp2);
  10. stamp1 = sl.writeLock();
  11. stamp2 = sl.writeLock();
  12. System.out.println(stamp1 + ".................." + stamp2);
  13. sl.unlock(stamp1);
  14. sl.unlock(stamp2);
  15. }
  16. }

运行以上程序,发现只打印了一次stamp1stamp2,那是因为StampedLock读锁之间是没有冲突的,所以不回阻塞,而写锁之间是冲突的,但是在同一个线程中,写锁未释放之前再次申请锁是不会成功的,所以stamp2 = sl.writeLock();将会一直等待导致死锁,即StampedLock的读写锁是不可重入的(读锁之间并不冲突),上面程序只验证了writeLockwriteLock是不可重入的,实际上writeLockreadLock也是不可重入的。

还有一个问题就是StampedLock导致 CPU 爆满的问题,有人说是 BUG,查看源码后还不如说它设计就是如此,首先,查看两个关键的方法:

  1. public long readLock() {
  2. long s = state, next; // bypass acquireRead on common uncontended case
  3. return ((whead == wtail && (s & ABITS) < RFULL &&
  4. U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
  5. next : acquireRead(false, 0L));
  6. }
  7. public long writeLock() {
  8. long s, next; // bypass acquireWrite in fully unlocked case only
  9. return ((((s = state) & ABITS) == 0L &&
  10. U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
  11. next : acquireWrite(false, 0L));
  12. }

可以看到申请锁的操作在进行一次 CAS 操作失败后都会调用acquireRead(false, 0L)或者acquireWrite(false, 0L)方法去申请锁,继续查看这两个方法的源码:

  1. /**
  2. * See above for explanation.
  3. *
  4. * @param interruptible true if should check interrupts and if so
  5. * return INTERRUPTED
  6. * @param deadline if nonzero, the System.nanoTime value to timeout
  7. * at (and return zero)
  8. * @return next state, or INTERRUPTED
  9. */
  10. private long acquireWrite(boolean interruptible, long deadline) {
  11. StampedLock.WNode node = null, p;
  12. for (int spins = -1;;) { // spin while enqueuing
  13. long m, s, ns;
  14. if ((m = (s = state) & ABITS) == 0L) {
  15. if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
  16. return ns;
  17. }
  18. else if (spins < 0)
  19. spins = (m == WBIT && wtail == whead) ? SPINS : 0;
  20. else if (spins > 0) {
  21. if (LockSupport.nextSecondarySeed() >= 0)
  22. --spins;
  23. }
  24. else if ((p = wtail) == null) { // initialize queue
  25. StampedLock.WNode hd = new StampedLock.WNode(WMODE, null);
  26. if (U.compareAndSwapObject(this, WHEAD, null, hd))
  27. wtail = hd;
  28. }
  29. else if (node == null)
  30. node = new StampedLock.WNode(WMODE, p);
  31. else if (node.prev != p)
  32. node.prev = p;
  33. else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
  34. p.next = node;
  35. break;
  36. }
  37. }
  38. for (int spins = -1;;) {
  39. StampedLock.WNode h, np, pp; int ps;
  40. if ((h = whead) == p) {
  41. if (spins < 0)
  42. spins = HEAD_SPINS;
  43. else if (spins < MAX_HEAD_SPINS)
  44. spins <<= 1;
  45. for (int k = spins;;) { // spin at head
  46. long s, ns;
  47. if (((s = state) & ABITS) == 0L) {
  48. if (U.compareAndSwapLong(this, STATE, s,
  49. ns = s + WBIT)) {
  50. whead = node;
  51. node.prev = null;
  52. return ns;
  53. }
  54. }
  55. else if (LockSupport.nextSecondarySeed() >= 0 &&
  56. --k <= 0)
  57. break;
  58. }
  59. }
  60. else if (h != null) { // help release stale waiters
  61. StampedLock.WNode c; Thread w;
  62. while ((c = h.cowait) != null) {
  63. if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  64. (w = c.thread) != null)
  65. U.unpark(w);
  66. }
  67. }
  68. if (whead == h) {
  69. if ((np = node.prev) != p) {
  70. if (np != null)
  71. (p = np).next = node; // stale
  72. }
  73. else if ((ps = p.status) == 0)
  74. U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
  75. else if (ps == CANCELLED) {
  76. if ((pp = p.prev) != null) {
  77. node.prev = pp;
  78. pp.next = node;
  79. }
  80. }
  81. else {
  82. long time; // 0 argument to park means no timeout
  83. if (deadline == 0L)
  84. time = 0L;
  85. else if ((time = deadline - System.nanoTime()) <= 0L)
  86. return cancelWaiter(node, node, false);
  87. Thread wt = Thread.currentThread();
  88. U.putObject(wt, PARKBLOCKER, this);
  89. node.thread = wt;
  90. if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
  91. whead == h && node.prev == p)
  92. U.park(false, time); // emulate LockSupport.park
  93. node.thread = null;
  94. U.putObject(wt, PARKBLOCKER, null);
  95. if (interruptible && Thread.interrupted())
  96. return cancelWaiter(node, node, true);
  97. }
  98. }
  99. }
  100. }
  101. /**
  102. * See above for explanation.
  103. *
  104. * @param interruptible true if should check interrupts and if so
  105. * return INTERRUPTED
  106. * @param deadline if nonzero, the System.nanoTime value to timeout
  107. * at (and return zero)
  108. * @return next state, or INTERRUPTED
  109. */
  110. private long acquireRead(boolean interruptible, long deadline) {
  111. StampedLock.WNode node = null, p;
  112. for (int spins = -1;;) {
  113. StampedLock.WNode h;
  114. if ((h = whead) == (p = wtail)) {
  115. for (long m, s, ns;;) {
  116. if ((m = (s = state) & ABITS) < RFULL ?
  117. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
  118. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
  119. return ns;
  120. else if (m >= WBIT) {
  121. if (spins > 0) {
  122. if (LockSupport.nextSecondarySeed() >= 0)
  123. --spins;
  124. }
  125. else {
  126. if (spins == 0) {
  127. StampedLock.WNode nh = whead, np = wtail;
  128. if ((nh == h && np == p) || (h = nh) != (p = np))
  129. break;
  130. }
  131. spins = SPINS;
  132. }
  133. }
  134. }
  135. }
  136. if (p == null) { // initialize queue
  137. StampedLock.WNode hd = new StampedLock.WNode(WMODE, null);
  138. if (U.compareAndSwapObject(this, WHEAD, null, hd))
  139. wtail = hd;
  140. }
  141. else if (node == null)
  142. node = new StampedLock.WNode(RMODE, p);
  143. else if (h == p || p.mode != RMODE) {
  144. if (node.prev != p)
  145. node.prev = p;
  146. else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
  147. p.next = node;
  148. break;
  149. }
  150. }
  151. else if (!U.compareAndSwapObject(p, WCOWAIT,
  152. node.cowait = p.cowait, node))
  153. node.cowait = null;
  154. else {
  155. for (;;) {
  156. StampedLock.WNode pp, c; Thread w;
  157. if ((h = whead) != null && (c = h.cowait) != null &&
  158. U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  159. (w = c.thread) != null) // help release
  160. U.unpark(w);
  161. if (h == (pp = p.prev) || h == p || pp == null) {
  162. long m, s, ns;
  163. do {
  164. if ((m = (s = state) & ABITS) < RFULL ?
  165. U.compareAndSwapLong(this, STATE, s,
  166. ns = s + RUNIT) :
  167. (m < WBIT &&
  168. (ns = tryIncReaderOverflow(s)) != 0L))
  169. return ns;
  170. } while (m < WBIT);
  171. }
  172. if (whead == h && p.prev == pp) {
  173. long time;
  174. if (pp == null || h == p || p.status > 0) {
  175. node = null; // throw away
  176. break;
  177. }
  178. if (deadline == 0L)
  179. time = 0L;
  180. else if ((time = deadline - System.nanoTime()) <= 0L)
  181. return cancelWaiter(node, p, false);
  182. Thread wt = Thread.currentThread();
  183. U.putObject(wt, PARKBLOCKER, this);
  184. node.thread = wt;
  185. if ((h != pp || (state & ABITS) == WBIT) &&
  186. whead == h && p.prev == pp)
  187. U.park(false, time);
  188. node.thread = null;
  189. U.putObject(wt, PARKBLOCKER, null);
  190. if (interruptible && Thread.interrupted())
  191. return cancelWaiter(node, p, true);
  192. }
  193. }
  194. }
  195. }
  196. for (int spins = -1;;) {
  197. StampedLock.WNode h, np, pp; int ps;
  198. if ((h = whead) == p) {
  199. if (spins < 0)
  200. spins = HEAD_SPINS;
  201. else if (spins < MAX_HEAD_SPINS)
  202. spins <<= 1;
  203. for (int k = spins;;) { // spin at head
  204. long m, s, ns;
  205. if ((m = (s = state) & ABITS) < RFULL ?
  206. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
  207. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
  208. StampedLock.WNode c; Thread w;
  209. whead = node;
  210. node.prev = null;
  211. while ((c = node.cowait) != null) {
  212. if (U.compareAndSwapObject(node, WCOWAIT,
  213. c, c.cowait) &&
  214. (w = c.thread) != null)
  215. U.unpark(w);
  216. }
  217. return ns;
  218. }
  219. else if (m >= WBIT &&
  220. LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
  221. break;
  222. }
  223. }
  224. else if (h != null) {
  225. StampedLock.WNode c; Thread w;
  226. while ((c = h.cowait) != null) {
  227. if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  228. (w = c.thread) != null)
  229. U.unpark(w);
  230. }
  231. }
  232. if (whead == h) {
  233. if ((np = node.prev) != p) {
  234. if (np != null)
  235. (p = np).next = node; // stale
  236. }
  237. else if ((ps = p.status) == 0)
  238. U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
  239. else if (ps == CANCELLED) {
  240. if ((pp = p.prev) != null) {
  241. node.prev = pp;
  242. pp.next = node;
  243. }
  244. }
  245. else {
  246. long time;
  247. if (deadline == 0L)
  248. time = 0L;
  249. else if ((time = deadline - System.nanoTime()) <= 0L)
  250. return cancelWaiter(node, node, false);
  251. Thread wt = Thread.currentThread();
  252. U.putObject(wt, PARKBLOCKER, this);
  253. node.thread = wt;
  254. if (p.status < 0 &&
  255. (p != h || (state & ABITS) == WBIT) &&
  256. whead == h && node.prev == p)
  257. U.park(false, time);
  258. node.thread = null;
  259. U.putObject(wt, PARKBLOCKER, null);
  260. if (interruptible && Thread.interrupted())
  261. return cancelWaiter(node, node, true);
  262. }
  263. }
  264. }
  265. }

可以看到,其内部实现都是两个死循环,在死循环中,在它挂起线程时,使用的是Unsafe.park()函数,而park()函数在遇到线程中断时会直接返回,而不是像Thread.sleep()那样会抛出异常。虽然这两个申请锁的方法第一个参数interruptible表示是否处理中断,但是readLock()writeLock()中都是传的 false,所以并不会处理中断,也就是代码中if (interruptible && Thread.interrupted())后面的Thread.interrupted()不会执行,中断转态不会清除,那么这样一来就会导致线程在调用park()函数挂起时遇到中断会再次陷入循环,并且中断转态没有清除,后面的循环中调用park()都将立即返回导致无法将线程挂起,所以当退出条件得不到满足条件时,将导致 CPU 暴涨。

下面演示程序模拟了 CPU 暴涨的情况:

  1. import java.util.concurrent.locks.LockSupport;
  2. import java.util.concurrent.locks.StampedLock;
  3. public class StampedLockTest {
  4. public static void main(String[] args) throws InterruptedException {
  5. Thread[] threads = new Thread[3];
  6. StampedLock sl = new StampedLock();
  7. //启动一个写线程长时间占用锁
  8. new Thread(() -> {
  9. long stamp = sl.writeLock();
  10. LockSupport.parkNanos(60000000000000L);
  11. sl.unlock(stamp);
  12. }).start();
  13. Thread.sleep(100);
  14. //启动三个读线程
  15. for (int i = 0; i < 3; i++) {
  16. threads[i] = new Thread(() -> {
  17. long stamp = sl.readLock();
  18. System.out.println(Thread.currentThread().getName() + "获得锁.");
  19. sl.unlock(stamp);
  20. });
  21. threads[i].start();
  22. }
  23. //5秒后等到三个线程因写锁被占用导致挂起后进行中断
  24. Thread.sleep(5000);
  25. for (int i = 0 ; i < 3; i++) {
  26. threads[i].interrupt();
  27. }
  28. }
  29. }

标签: JAVA 并发编程

发表评论:

icon_mrgreen.gificon_neutral.gificon_twisted.gificon_arrow.gificon_eek.gificon_smile.gificon_confused.gificon_cool.gificon_evil.gificon_biggrin.gificon_idea.gificon_redface.gificon_razz.gificon_rolleyes.gificon_wink.gificon_cry.gificon_surprised.gificon_lol.gificon_mad.gificon_sad.gificon_exclaim.gificon_question.gif