锁是控制多个线程访问共享资源的方式,是实现多线程同步的重要思想。在 java.util.concurrent
包中有很多相关的 API 和组件。
一般来说,锁能够防止多个线程同时访问一个共享资源;个别锁允许并发访问共享资源,如读写锁等。 悲观锁(包括独占锁和共享锁)估计很多人都很熟悉。在开始之前,先简要说一下乐观锁的内容。
乐观锁与 CAS 乐观锁从严格的意义来看,并不是锁;它的每一次操作,是在假设数据没有冲突的情况下进行的,如果因为数据冲突导致失败,就开始重试,直至成功为止。 因此乐观锁是无锁编程 的范畴。
CAS 是一种实现乐观锁的方式,全称 Compare And Set,使用到三个操作数:变量内存地址 ,预期值 ,新值 。 这种算法让 CPU 去比较内存中某个值是否和预期的值相同,如果相同则将这个值更新为新的值,不相同则不做更新。 CAS 是 CPU 指令,其读写都具有原子性,其实现方式是通过 C/C++ 调用 CPU 指令完成的:我们只能通过 JNI 与操作系统交互。
因此,CAS 可以保证并发安全,但并不会保证线程同步 ,而是“乐观”地认为数据在更新期间没有收到其它线程的影响。
ABA 问题 正是因为 CAS 的这份“乐观”,会导致一个数据同步的过程问题 。
比如此时有两个线程同时对同一个值进行 CAS 操作,从初始值 A 更新到 B:
线程 1 抢先获得时间片,线程 2 被阻塞;线程 A 发现期望值的确是 A,将其替换为 B;
在线程 2 恢复之前,线程 3(或者继续是线程 1)过来进行 CAS 操作:期望值为 B,更新到 A;线程 3 抢先获得了时间片,因为条件刚好符合,所以值被更新回 A;
线程 2 恢复后,发现初始值和期望值相同,又将值更新到 B。
由此:虽然线程 2 完成了更新的操作,但是它并不知道值从 A->B->A 的变化过程。
ABA 场景表面上不会造成问题,但是在对过程敏感的业务中,我们需要解决 ABA 问题。
讲完了 CAS,我们就来简单来介绍一下这些 API 和组件的实现细节和使用方式。
Lock 接口 在 Lock 接口之前,Java 使用 synchronized
关键字隐式地获取和释放对象同步锁。 在随后的 Lock 接口中,虽然使用的时候需要显式地获取和释放锁,没有 synchronized 来得便捷;但是 Lock 增加了获取和释放锁的可操作性 、可中断 地获取锁、超时获取锁 等功能,使用更灵活,扩展性更强:
特性
描述
尝试非阻塞地获得锁
当前线程尝试获取锁,如果此刻锁没被其它线程获得,则成功获取并持有锁
能被中断地获取锁
与 synchronized 不同:获得锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常会被抛出,同时锁会被释放
超时获取锁
在指定时间内获得锁,如果截止时间到了依旧无法获得锁,则返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package java.util.concurrent.locks;public interface Lock { void lock () void lockInterruptibly () throws InterruptedException ; boolean tryLock () boolean tryLock (long time, TimeUnit unit) throws InterruptedException ; void unlock () Condition newCondition () }
使用:1 2 3 4 5 6 7 Lock lock = new ReentrantLock(); lock.lock(); try { doSomething(); } finally { lock.unlock(); }
以上虽然也是 synchronized 的原理解释代码,但是 synchronized 是上述这一套加锁解锁的固化,而单纯使用 Lock 接口能更灵活地实现业务。
基础:AQS AbstractQueuedSynchronizer
,抽象队列同步器,是用于构建锁或其它同步组件的基础框架。
AQS 使用一个整型(int
)成员变量 state
来表示同步状态,通过内置 FIFO 双向队列 完成资源获取线程的排队工作。
显然,AQS 采用了模版方法设计模式(抽象类),自身并没有实现任何同步接口,仅仅定义了若干个同步状态获取和释放的方法。 AQS 既可以支持独占式地获取同步状态,也可以支持共享式获取状态,方便不同类型同步组件的实现。 使用者必须要继承同步器 (某个 Sync
),重写指定的方法(如上图中的 #
方法),或者继承 AQS 的一些实体类来实现具体的锁。
自定义的 Lock 在使用时,一般被定义为静态内部类。 自定义的 Lock 在调用 lock() 时,会调用 AQS 的 acquire()
方法。
大概的区别是: AQS 面向锁的实现者 ,简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队、等待、唤醒等操作;锁面向锁的使用者 ,定义了使用者和锁的交互接口。
基本数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 abstract AbstractQueuedSynchronizer { static final class Node { ... static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; } ... private transient volatile Node head; private transient volatile Node tail; private volatile int state; ... private transient Thread exclusiveOwnerThread; ... protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); } ... protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); } protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); } protected boolean isHeldExclusively () { throw new UnsupportedOperationException(); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unpackSuccessor(h); return true ; } return false ; } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } ... public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } ... public final Collection<Thread> getQueuedThreads () { ArrayList<Thread> list = new ArrayList<>(); for (Node p = tail; p != null ; p = p.prev) { Thread t = p.thread; if (t != null ) list.add(t); } return list; } ... }
同步队列 AQS 通过同步队列完成对同步状态的管理。
同步队列里的每一个节点,都代表着一个获取同步状态失败的线程,用来保存该线程的引用、等待状态、前驱后驱节点等信息。
头节点永远是一个哑节点,不代表任何线程;但有的时候可以被看作代表当前持有锁的线程。 因此 head 指向的 Node 对象中的 thread 属性永远是 null
。
a. 加入队列
当前线程获取同步状态失败(AQS 的 state
< 0
)时:同步器将当前线程的等待状态等信息构造成一个独占式节点,随后添加到同步队列里面; 此时当前线程会被阻塞。
很多个线程进行操作的话,同步队列很有可能在一个时间点不只有一个线程的加入。 因此代表线程的节点需要添加进同步队列的时候,需要保证线程安全性 :所以才会使用 compareAndSetTail()
这样的 CAS 方法进行操作。
b. 离开队列
当同步状态被释放(state
> 0
)的时候:唤醒头节点代表的线程,该线程会尝试再次获取同步状态。
头节点在获取同步状态成功,释放同步状态的时候,会唤醒后面的节点。 后继节点被唤醒之后,意味着它是持有该同步锁的线程 ,它也就是同步队列中的新的头节点 。
相对于加入队列(设置队列尾节点),离开队列的永远都是头节点,只有一个线程:头节点的后继节点才能够成为下一个头节点;因此头节点的设置会比尾节点简单。
实现:独占式同步状态获取与释放 1. 获取同步状态 :调用 acquire()
该方法对中断不敏感 ;也就是说,线程获取同步状态失败后被添加到同步队列中,后续对该线程进行中断操作 的话,线程不会被移出同步队列 。
2. 释放同步状态
调用 tryRelease(arg)
释放同步状态,然后唤醒头节点非 CANCELLED 状态的后继节点。
相关的实现方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 abstract AbstractQueuedSynchronizer { ... protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); } ... public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } ... public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } ... private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } ... final boolean aquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } ... private Node enq (final Node node) { for (;;) { Node t = tail; if (t = null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ... private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } ... }
分析源码可知:
线程获取同步状态策略 :
同步器维护一个同步队列,队列中每一个节点代表一个线程;
每当节点的前驱节点出队 ,或者自身被中断、或从等待状态返回时:检查自己的前驱节点是否为头节点;如果是,则节点代表的线程会去尝试获取同步状态;
前驱节点不是头节点的线程并不能去获取同步状态;它们会保持在同步队列中;
所有获取同步状态失败了的线程都在队列中进行自旋 。各自在自旋的时候不会相互通信;
移出队列(停止自旋)的条件是前驱节点为头节点,且成功获取了同步状态。
如此符合 FIFO,并且便于对过早的通知进行处理,比如对一些前驱节点不是头节点的线程节点的通知。
线程释放同步状态策略 :
调用 tryRelease(arg)
释放同步状态,然后唤醒头节点非 CANCELLED 状态的后继节点。
实现:共享式同步状态获取和释放 共享式获取与独占式获取的区别在于:同一时刻能否允许多个线程同时获取到同步状态。 资源在被共享状态时,共享的线程都可以访问资源,独占式的不能访问;资源在被独占式访问时,其他所有类型的线程都不能访问资源。
共享式释放与独占式释放的区别在于:共享式释放必须要保证同步状态 / 资源被线程安全地释放 。
相关的实现方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 abstract AbstractQueuedSynchronizer { ... protected int tryAcquireShared (int arg) { throw new UnsupportedOperationException(); } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); } ... public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } ... public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } ... private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupted(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } ... private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } ... private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } ... }
通过比较模板方法可知,共享式获取同步状态会立刻相应中断,而独占式获取同步状态不会马上响应。
即使是共享式获取,AQS 也是严格按照同步队列的入队顺序唤醒自旋的线程。
实现:超时获取同步状态 也属于独占式的同步状态更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 abstract AbstractQueuedSynchronizer { ... protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); } ... public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } ... }
doAcquireNanos()
实现与 aquireQueued()
相差无几,只是多了一些超时的处理;同时会相应中断,线程被中断的时候会抛出异常。 因此 doAcquireNanos(int, long)
也是 acquireInterruptibly(int)
的升级版。
应用:自定义 有一个设计:只允许最多两个线程同时访问,超过两个线程的访问将被阻塞。
1. 确定访问模式
应该采用共享式访问模式,实现 tryAcquireShared()
和 tryReleaseShared()
等共享式方法。
2. 定义资源数
资源数定为 2
,即设置 state
为 2;每当一个线程获得资源时,state
减 1,直至减到 0 时,说明已经有两个线程访问到了资源,其他访问线程将会被阻塞。 因此 state
的值范围是 [0, 1, 2]。
可以使用 compareAndSetState()
等 CAS 设置方法保证原子性。
3. 组合自定义同步器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class TwinLock implements Lock { private final Sync sync = new Sync(2 ); private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0 ) { throw new IllegalArgumentException("Count should large than 0." ); } setState(count); } public int tryAcquireShared (int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared (int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true ; } } } } @Override public void lock () { sync.acquireShared(1 ); } @Override public void unlock () { sync.releaseShared(1 ); } @Override public void lockInterruptibly () { } @Override public boolean tryLock () { return false ; } @Override public boolean tryLock (long time, TimeUnit unit) { return false ; } @Override public Condition newCondition () { return null ; } }
应用:ReentrantLock 可重入锁 顾名思义,这是能够重入的锁:即同一个线程可重复获得已经持有的锁(对资源重复加锁)。
可重入锁基于独占式的同步器进行设计。
加锁的时候,判断当前请求同步状态的线程和当前占有锁的线程是否为同一个;如果是的话就会再次获取;
另外维护了一个锁被持有的计数(hold count),用于跟踪对 lock()
的嵌套调用;因此被一个锁保护的代码块可调用另一个使用相同锁的方法。
如:Block A 中代码调用 method B,B 与 A 持有相同的锁对象,则锁的持有计数累加至 2
调用 B 执行完后,锁被释放一次,因此计数减为 1
可重入锁支持公平和非公平的选择。
公平性是针对于锁的获取顺序来说的,先进先出符合公平性
我们现在回过头来看前面提到的独占式获取同步状态:
如果一个已经占有锁的线程尝试再次获得同一个锁,那么按照代码逻辑,它自己会被自己阻塞;因为上述的独占式基本结构没有考虑这个场景。
可重入锁相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 package java.util.concurrent.locks;public class ReentrantLock implements Lock , java .io .Serializable { ... private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { ... abstract void lock () ; final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } ... } ... static final class FairSync extends Sync { ... protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } } ... public ReentrantLock () { sync = new NonFairSync(); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonFairSync(); } ... }
基本结构的使用:1 2 3 4 5 6 7 myLock.lock(); try { } finally { myLock.unlock(); }
小结:
确保任何时候只有一个线程进入 critical section
当其他线程调用 lock()
时将会被阻塞,直到第一个线程释放该锁对象
synchronized
也“隐式”地实现了可重入性,因为在每次的 monitorenter
和 monitorexit
之间维护了一个计数器。
注意:
锁不能使用在 try-with-resource
语句
首先,“解锁”的方法名不是 close
try-with-resource 在使用时希望声明新变量;而锁是多个线程共享的变量,不能随便新建
如在临界区之前抛出异常,finally 将会释放锁,但数据可能会受损
应用:读写锁 与独占锁和可重入锁不同,读写锁是共享式获取同步状态的实现:读操作可以并行执行,写操作将其它所有操作阻塞。 写操作完成之后,新状态要立即对读操作可见。
因此读写锁维护的是一对锁 。通过分离读锁和写锁,使得并发性相对于其他的排它锁有更大的提升;且简化了读写交互场景的编程方式。
在没有读写锁的实现的时候(before JDK 1.5),要实现读多写少的数据同步场景,我们需要:
当写操作开始时,所有晚于该写操作的操作都会进入等待状态;
写操作完成,释放同步状态并通知后续线程,等待的读操作才能继续执行;
写操作之间使用 synchronized
关键字进行同步。
这样读操作就不会发生脏读。
java.util.concurrent 包中的 ReentrantReadWriteLock
就是对读写锁的实现:
1 2 3 4 5 6 7 8 9 10 package java.util.concurrent.locks;public interface ReadWriteLock { Lock readLock () Lock writeLock () }
基本数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package java.util.concurrent.locks;public class ReentrantReadWriteLock implements ReadWriteLock , java .io .Serializable { ... private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock () { this (false ); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync() : new NonFairSync(); readerLock = new ReadLock(this ); writerLock = new WriterLock(this ); } public ReentrantReadWriteLock.WriteLock writeLock () { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock () { return readerLock; } ... public int getReadLockCount () { return sync.getReadLockCount(); } public boolean isWriteLocked () { return sync.isWriteLocked(); } ... public int getWriteHoldCount () { return sync.getWriteHoldCount(); } public int getReadHoldCount () { return sync.getReadHoldCount(); } ... }
使用方法和可重入锁的使用结构是一样的,读操作时获取读锁,写操作时获取写锁就可以了。
特性
说明
公平性选择
支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
重进入
支持重进入,以读写线程为例:读线程获取锁后,能再次获得读锁;写线程获得写锁后能再次获得写锁,也能获取读锁
锁降级
遵循获取写锁、获取读锁再释放写锁的次序,写锁能降级为读锁
常应用于大多数时间为读,少数时间为写的场景。这样的场景比较多见,此时读写锁比其他一般的独占锁性能要高,提供更好的并发性和吞吐量。
实现与设计(以 ReentrantReadWriteLock
为例) 1 . 读写状态的设计
万变不离其宗,还是以 state
来定义资源,记录同步器的状态:在 ReentrantReadWriteLock
中代表的是读写状态。 往细了说,就是读锁和写锁被获取的次数 ,记录着多个读线程的状态和一个写线程的状态。
单单一个 int
类型的变量,怎么能记录这么多状态? 只能说,ReentrantReadWriteLock
对于读写状态的设计的确是巧妙。
上图显然已经给出答案了:将 state 按位切割 ,高 16 位表示读状态,低 16 位表示写状态。 这样,读写状态的记录就变成了整型的位运算 :
当前状态 state = S 时,写状态取 S 低位 S & 0x0000FFFF
,读状态取 S 高位 S >>> 16;
写锁重入时,计数直接累加 S + 1;读锁被获取时,计数累加 S + (1 << 16) = S + 0x00010000
。
上图的示例表示,当前线程获得了写锁,且重进入了两次;该线程又获取了读锁,且重入了一次。 当写锁被获取时,如果读锁不为 0,那么读锁一定同时被获取了写锁的那个线程持有。
相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 package java.util.concurrent.locks;public class ReentrantReadWriteLock implements ReadWriteLock , java .io .Serializable { ... final Sync sync; ... abstract static class Sync extends AbstractQueuedSynchronizer { ... static final int SHARED_SHIFT = 16 ; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ; static int sharedCount (int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0 ; final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal <HoldCounter > { public HoldCounter initialValue () { return new HoldCounter(); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null ; private transient int firstReaderHoldCount; ... } ... public static class ReadLock implements Lock , java .io .Serializable { ... public void lock () { sync.acquireShared(1 ); } ... public void unlock () { sync.releaseShared(1 ); } ... } ... public static class WriteLock implements Lock , java .io .Serializable { ... public void lock () { sync.acquire(1 ); } ... public void unlock () { sync.release(1 ); } ... } ... }
2 . 写锁的获取与释放
写锁相关操作属于独占式 同步状态的获取和释放:
如果读锁数量不为 0;或写锁数量不为 0,并且不是可重入操作(当前持有读锁/写锁的线程不是请求写锁的线程),则获取失败;
如果当前锁数量为 0,线程有资格获得写锁
读锁被获取的时候不能获取写锁的原因:需要保证写操作对所有读操作可见; 如果在读锁被占用的时候写锁还能够被获取,那么该写操作就无法对这一次读操作可见了。
因此获取写锁需要实现 tryAcquire(arg)
,写锁的释放需要实现 tryRelease(arg)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package java.util.concurrent.locks;public class ReentrantReadWriteLock implements ReadWriteLock , java .io .Serializable { ... final Sync sync; ... abstract static class Sync extends AbstractQueuedSynchronizer { ... protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; } ... } ... }
3 . 读锁的获取与释放
读锁相关操作则是属于共享式 同步状态的获取和释放。
因此获取读锁需要实现 tryAcquireShared(arg)
,读锁的释放需要实现 tryReleaseShared(arg)
。
相对于写锁,读锁操作的实现会复杂一些:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 package java.util.concurrent.locks;public class ReentrantReadWriteLock implements ReadWriteLock , java .io .Serializable { ... final Sync sync; ... abstract static class Sync extends AbstractQueuedSynchronizer { ... readHolds = new ThreadLocalHoldCounter(); ... protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); } ... final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHold.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } } ... protected final boolean tryReleaseShared (int releases) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } ... } ... }
注 1:readerShouldBlock()
方法用来判定获取读锁的线程是否要被阻塞。
非公平状态下,写锁获取的优先级会更高,因此如果存在要读取写锁的线程,则读锁需要让步,readerShouldBlock() = true
;
公平状态下则按照先进先出的顺序:有线程在排队,你新来的不能直接获得锁
注 2:cachedHoldCounter
有什么用?其实用处不大,主要是用来提升性能。 将最后一次获取读锁的线程缓存到这里,比 ThreadLocal 的 map 结构性能好一些。
同理,设置 firstReader
和 firstReaderHoldCount
也一样,虽然用处不大,但是可以提升性能,使得在读锁不产生竞争的情况下,记录读锁的重入次数会非常方便快捷。 如果一个线程使用了 firstReader
,那么它就不需要占用 cachedHoldCounter
。
4 . 锁降级
并没有任何一篇文档中写明写锁比读锁更高级,不过从上述源码浅析中可以看到,在同步状态操作的时候,的确会对写锁一些照顾。(参照写锁的获取 )
读写锁中的“锁降级”,指的是线程持有锁的变化:持有写锁的线程,去获得读锁,随后再释放写锁。 感兴趣可以读一下 ReentrantReadWriteLock
的 javadoc 给出的示例。
由此定义反推的“升级”是不可以的,试想一下,获取读锁之后,线程再次获得写锁就会将自己阻塞掉了,而且可能就没有其他线程唤醒它了,很容易造成死锁。
Condition 接口 有的时候根据业务,某些线程进入临界区 critical section 后,还需要满足一定条件才能执行。 注:不能在多线程中简单使用判定后读写的方法 —— 有可能在 if 判断后被中断。
可以使用 Condition 的实现对象来管理已获得锁但还不能做有用工作的线程。
等待池是由 Condition 内部实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package java.util.concurrent.locks;interface Condition { void await () boolean await (long time, TimeUnit unit) void awaitUninterruptibly () void signalAll () void signal () }
看着上面的方法名,是不是觉得很熟悉?
没错,Condition 方法的命名(await()
、signal()
、signalAll()
)分别与 Object 同样功能的方法(wait()
、notify()
、notifyAll()
)区分开。
Object 监视器(Monitor)方法和 synchronized
关键字配合使用;而 Condition 接口和 Lock
配合使用。
对比项
Object Monitor Methods
Condition
前置条件
获取对象的锁
调用 Lock.lock() 获取锁 调用 Lock.newCondition() 获取 Condition 对象
调用方式
直接调用 如 object.wait()
直接调用 如 condition.await()
等待队列个数
一个(Object 只有一个 wait() / notify())
多个
当前线程释放锁并进入等待状态
支持
支持
当前线程释放锁并进入等待状态,在等待状态中不响应中断
不支持
支持
当前线程释放锁并进入超时等待状态
支持
支持
当前线程释放锁并进入等待状态到超时时间
不支持
支持
唤醒等待队列的一个线程
支持
支持
唤醒等待队列的全部线程
支持
支持
注:锁和条件都不是面向对象的设计
关键点:
锁用来保护代码片段,任何时刻只能有一个线程执行被保护的代码
锁可管理试图进入被保护代码段的线程
锁可拥有一个或多个相关的 Condition 条件对象
每个条件对象管理着已进入被保护代码段但未能运行的线程
实现 等待池的实现(结合同步队列)
等待(Condition.await()
)
通知(Condition.signal()
)
AQS 对 Condition 相关方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ... public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } ...