AQS简介 AQS(AbstractQueuedSynchronizer)是一个模板类,其提供一个先进先出(FIFO)的等待队列,旨在为大多数依赖单个原子int
值表示状态的同步器提供有用的基础。
AQS类支持默认的排他锁和共享锁之一或两者。在排他锁模式下获取时,其他线程尝试获取是不会成功的,共享模式由多个线程获取可能会(但不一定)成功。AQS类不会“理解”这些区别,只是锁意义上说,当成功获取共享模式时,下一个等待线程(如果存在)也必须确定它是否也能够获取。在不同模式下等待的线程共享相同的FIFO队列。
如果要通过AQS来实现自己的同步机制,那么需要根据实现方式(排他锁or共享锁)来重写它的protected
方法,如下所示:
1 2 3 4 5 6 7 8 9 10 tryAccquire(int ):boolean tryRelease(int ):boolean tryAcquireShared(int ):boolean tryReleaseShared():boolean isHeldExclusively():boolean
通常子类仅支持这些模式之一,当然也可以两者兼顾如ReadWriteLock
。仅支持排他或仅共享模式的子类不需要定义支持未使用模式的方法。鉴于这些情况,,该类中的其他方法将执行所有排队和阻塞机制,子类可以操作状态值,但前提是必须使用原子性操作且推荐使用getState
、setState
、compareAndSetState
等三个方法来操作状态。
在使用方面,子类应定义为非公共内部帮助器类 ,用于实现其封闭类的同步属性。AQS未实现任何同步接口,只定义了诸如可调用模板方法,所以子类需要用具体的锁和相关的同步器适当地调用这些模板方法来实现其公共方法。此外,AQS亦提供了默认Condition
的实现ConditionObject
供实现者使用。
AQS的使用 下面的例子来源于JDK源码注释部分。
排他锁模式 下面是一个不可重入的排他锁,0值代表解锁状态,1值代表锁住状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class Mutex implements Lock , Serializable { private static class Sync extends AbstractQueuedSynchronizer { protected boolean isHeldExclusively () { return getState() == 1 ; } public boolean tryAcquire (int acquires) { assert acquires == 1 ; if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int releases) { assert releases == 1 ; if (getState() == 0 ) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null ); setState(0 ); return true ; } Condition newCondition () { return new ConditionObject(); } private void readObject (ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0 ); } } private final Sync sync = new Sync(); public void lock () { sync.acquire(1 ); } public boolean tryLock () { return sync.tryAcquire(1 ); } public void unlock () { sync.release(1 ); } public Condition newCondition () { return sync.newCondition(); } public boolean isLocked () { return sync.isHeldExclusively(); } public boolean hasQueuedThreads () { return sync.hasQueuedThreads(); } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); } }
共享锁模式 下面是一个只需要1个信号量的共享模式的latch锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled () { return getState() != 0 ; } protected int tryAcquireShared (int ignore) { return isSignalled() ? 1 : -1 ; } protected boolean tryReleaseShared (int ignore) { setState(1 ); return true ; } } private final Sync sync = new Sync(); public boolean isSignalled () { return sync.isSignalled(); } public void signal () { sync.releaseShared(1 ); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); }
AQS的源码剖析 FIFO队列 AQS的等待队列是“ CLH”(Craig,Landin和 Hagersten)锁队列的变体。 CLH锁通常用于自旋锁,但是在AQS中将它用于阻塞同步器,并且使用相同的基本策略,即在其节点的前驱节点中保存有关线程的某些控制信息。每个节点中的“status”字段跟踪线程是否应阻塞,节点的前驱释放时会发出信号。队列的每个节点都充当特定通知样式的监视器,包含单个等待线程,但是状态字段不控制是否为线程授予锁等。如果是队列的第一个节点那么它包含的线程可能会尝试获取锁,但是不保证成功,仅授予竞争权,所以当前已释放的竞争者线程可能需要重新等待。
在AQS中以链表的形式来表示FIFO队列,示例如下所示:
1 2 3 +------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
下面是链表的节点Node
的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
排他锁获取 排他锁的获取方式如下所示:
1 2 3 4 5 6 acquire(int ):void acquireInterruptibly():void tryAcquireNanos(int ,long ):boolean
这里直接分析acquire
方法(acquireInterruptibly
和tryAcquireNanos
两种获取锁的方式同其流程大同小异),源码如下所示:
1 2 3 4 5 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
可以得到的信息是,先尝试获取tryAcquire
,如果获取成功则结束,如果获取失败则进入等待队列。而tryAcquire
的获取过程则是需要我们自己去实现,也正如前文说到的一样,用AQS实现排他锁需要重写相应的方法。下面AQS对tryAcquire
1 2 3 protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); }
当首次获取锁失败后,会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
这一小块代码去让线程能持续的去获取锁。其中addWaiter
方法主要是讲创建队列节点,然后将其加入等待队列,设计CAS和自选等操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
acquireQueued
方法主要是采用自选的方式去获取锁,其中会涉及shouldParkAfterFailedAcquire
根据节点的status
状态值来得知是否挂起。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
对于shouldParkAfterFailedAcquire
方法,要求 pred == node.prev,源码部分如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
排他锁释放 对于排他锁的释放,AQS使用的release
方法来操作,其中会调用到需要子类重载的方法tryRelease
来进行对锁的释放。如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected boolean tryRelease (int arg) { throw new UnsupportedOperationException(); }
可以看出,在释放锁成功之后,便是一个通知操作unparkSuccessor
,该操作是唤醒下一个获取锁的线程。如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
共享锁获取 共享锁的获取方式如下所示:
1 2 3 4 5 6 acquireShared(int ):void acquireSharedInterruptibly(int ):void tryAcquireSharedNanos(int ,long ):boolean
这里直接分析acquireShared
方法(acquireSharedInterruptibly
和tryAcquireSharedNanos
两种获取锁的方式同其流程大同小异),源码如下所示:
1 2 3 4 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
这个方法比较简单,调用子类需要重载的方法tryAcquireShared
获取锁失败后会走后续的方法(doAcquireShared
)流程。其中主要做了如下两件事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
共享锁释放 对于共享锁的释放,AQS使用的releaseShared
方法来操作,其中会调用到需要子类重载的方法tryReleaseShared
来进行对锁的释放。如下所示:
1 2 3 4 5 6 7 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
可以看出,在尝试释放锁成功之后会调用doReleaseShared
方法继续传播行为。其源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }