并发编程系列-AQS

发布时间 2023-09-21 11:51:30作者: 小年轻在奋斗

AbstractQueuedSynchronizer(AQS)是一个抽象队列同步器,它用于构建依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。该类的目的在于提供基本功能的封装,适用于大多数需要使用单个原子int值表示同步状态的同步器。举例来说,ReentrantLock、Semaphore以及FutureTask等都是基于AQS实现的,我们也可以通过继承AQS来实现自定义的同步器。

核心思想

网络上常见的解释是:

如果所请求的共享资源是空闲的,那么当前请求资源的线程将被设置为有效的工作线程,并且共享资源将被锁定。然而,如果所请求的共享资源已被占用,则需要一套机制来阻塞等待线程并在适当时分配锁。AQS采用了CLH队列锁的实现方式来处理这种情况,即将无法立即获取到锁的线程添加到队列中进行排队等待。

我理解,可以把AQS视为一种锁,并将其用于管理共享资源的状态和线程请求。当有线程请求资源时,AQS会记录该线程作为当前工作线程,并将自身设置为锁定状态。如果其他线程请求AQS,则它们将被记录到等待队列中,无法获取到锁的线程进入阻塞等待状态。

为什么需要 AQS

在深入研究AQS之前,我们可能会问为什么需要AQS? 其中synchronized关键字和CAS原子类已经提供了丰富的同步方案。

然而,在实际需求中,我们对同步的需求各不相同。例如,如果我们需要对锁设置超时时间,单独依靠synchronized关键字或CAS是无法实现的,需要进行二次封装。而JDK中提供了丰富的同步方案,比如使用基于AQS实现的ReentrantLock。

用法

要将AQS用作同步器的基础,请根据具体使用情况重新定义以下方法,可以使用getState、setState和/或compareAndSetState来检查和/或修改同步状态:

  • tryAcquire

  • tryRelease

  • tryAcquireShared

  • tryReleaseShared

  • isHeldExclusively

这些方法的默认实现会抛出UnsupportedOperationException。这些方法的实现必须是线程安全的,并且通常应该是短暂而不是阻塞的。定义这些方法是使用此类的唯一受支持的方式。所有其他方法都被声明为最终方法,因为它们不应该被修改。

你可能还会发现从AbstractOwnableSynchronizer继承的方法对于跟踪拥有独占同步器的线程非常有用。我们鼓励您使用这些方法,这样监控和诊断工具可以帮助用户确定哪些线程持有锁。

尽管这个类基于内部FIFO队列,但它并不自动执行FIFO调度策略。独占同步的核心形式为:

   Acquire:
       while (!tryAcquire(arg)) {
          enqueue thread if it is not already queued;
          possibly block current thread;
       }
  
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;

AQS是一种底层技术,用于实现锁机制。它提供了一种通过队列的方式,将争用锁的线程进行排队等待的机制。与此类似的是,共享模式也是通过类似的机制来实现。然而,共享模式可能涉及到级联信号的问题。

在获取操作之前,新获取的线程有可能会在其他被阻塞和排队的线程之前获得锁资源。如果需要的话,可以通过定义tryAcquire和/或tryAcquireShared方法来禁止插队,从而提供公平的FIFO获取顺序。具体来说,大多数公平同步器可以在tryAcquire返回false时,通过调用hasQueuedPredecessors方法返回true,以保证公平性。还有其他修改的可能性。

默认的插入策略通常是贪婪、放弃和避免护送策略,这种策略可以提供最高的吞吐量和可扩展性。尽管这不能保证公平性或无饥饿,但它允许较早排队的线程在较晚排队的线程之前重新竞争,并且每次重新竞争都有公平机会与传入线程成功竞争。此外,虽然获取操作不是传统意义上的自旋,但它们可能会在阻塞之前多次调用tryAcquire,并在其中插入其他计算。这样做可以提供自旋带来的许多好处,而不会增加太多的负担。如果需要的话,可以通过预先调用具有"快速路径"检查的方法来增加这一点,例如预先检查hasContended和/或hasQueuedThreads,以便仅在同步器可能不会争用的情况下执行自旋操作。

AQS提供了高效且可扩展的同步基础,适用于依赖于整数状态、获取和释放参数以及内部FIFO等待队列的同步器。如果这还不够,可以使用原子类、自定义java.util.Queue类和LockSupport阻塞支持从较低级别构建同步器。

用法示例

这是一个不可重入互斥锁类,它使用值 0 表示未锁定状态,使用值 1 表示锁定状态。虽然不可重入锁并不严格要求记录当前所有者线程,但无论如何,此类都会这样做以使使用情况更易于监控。它还支持条件并公开一些检测方法:

class Mutex implements Lock, java.io.Serializable {

   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Acquires the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // Releases the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Reports whether in locked state
     public boolean isLocked() {
       return getState() != 0;
     }

     public boolean isHeldExclusively() {
       // a data race, but safe due to out-of-thin-air guarantees
       return getExclusiveOwnerThread() == Thread.currentThread();
     }

     // Provides a Condition
     public Condition newCondition() {
       return new ConditionObject();
     }

     // Deserializes properly
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();

   public void lock()              { sync.acquire(1); }
   public boolean tryLock()        { return sync.tryAcquire(1); }
   public void unlock()            { sync.release(1); }
   public Condition newCondition() { return sync.newCondition(); }
   public boolean isLocked()       { return sync.isLocked(); }
   public boolean isHeldByCurrentThread() {
     return sync.isHeldExclusively();
   }
   public boolean hasQueuedThreads() {
     return sync.hasQueuedThreads();
   }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

这是一个类似于 CountDownLatch 的锁存器类,只是它只需要一个信号即可触发。因为锁存器是非独占的,所以它使用共享的获取和释放方法。

 class BooleanLatch {

   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }

     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }

     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }

   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }

AQS 底层原理

父类 AbstractOwnableSynchronizer

AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer ,后者逻辑十分简单:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;
  // 设置当前持有锁的线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractOwnableSynchronizer 只是定义了设置持有锁的线程的能力。

CLH 队列

AQS(AbstractQueuedSynchronizer)的等待队列是CLH(Craig, Landin, and Hagersten)锁定队列的一种变体。CLH锁通常用于自旋锁。AQS通过将每个请求共享资源的线程封装为一个CLH节点来实现。这个节点的定义如下:

   /** CLH Nodes */
    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others

        // methods for atomic operations
        final boolean casPrev(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, PREV, c, v); // 通过 CAS 确保同步设置 prev 的值
        }
        final boolean casNext(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, NEXT, c, v);
        }
        final int getAndUnsetStatus(int v) {     // for signalling
            return U.getAndBitwiseAndInt(this, STATUS, ~v);
        }
        final void setPrevRelaxed(Node p) {      // for off-queue assignment
            U.putReference(this, PREV, p);
        }
        final void setStatusRelaxed(int s) {     // for off-queue assignment
            U.putInt(this, STATUS, s);
        }
        final void clearStatus() {               // for reducing unneeded signals
            U.putIntOpaque(this, STATUS, 0);
        }
        private static final long STATUS = U.objectFieldOffset(Node.class, "status");
        private static final long NEXT = U.objectFieldOffset(Node.class, "next");
        private static final long PREV = U.objectFieldOffset(Node.class, "prev");
    }

CLH节点的数据结构是一个双向链表的节点,其中每个操作都经过CAS(Compare and Swap)来确保线程安全。要将其加入CLH锁队列,您可以将其自动拼接为新的尾节点;要出队,则需要设置头节点,以便下一个满足条件的等待节点成为新的头节点:

  +------+  prev +-------+  prev +------+
 |      | <---- |       | <---- |      |
 | head | next  | first | next  | tail |
 |      | ----> |       | ----> |      |
 +------+       +-------+       +------+

Node 中的 status 字段表示当前节点代表的线程的状态。status 存在三种状态:

     static final int WAITING   = 1;          // must be 1
    static final int CANCELLED = 0x80000000; // must be negative 
    static final int COND      = 2;          // in a condition wait
  • WAITING:表示等待状态,值为 1。

  • CANCELLED:表示当前线程被取消,为 0x80000000。

  • COND:表示当前节点在等待条件,也就是在条件等待队列中,值为 2。

在上面的 COND 中,提到了一个条件等待队列的概念。首先,Node 是一个静态抽象类,它在 AQS 中存在三种实现类:

  • ExclusiveNode

  • SharedNode

  • ConditionNode

前两者都是空实现:

     static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

而最后的 ConditionNode 多了些内容:

   static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter; 
        // 检查线程是否中断或当前线程的状态已取消等待。
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }

        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }

ConditionNode 拓展了两个方法:

  • 检查线程状态是否处于等待。

  • 阻塞当前线程:当前线程正在等待执行,通过 LockSupport.park() 阻塞当前线程。这里通过 while 循环持续重试,尝试阻塞线程。

而到这一步,所有的信息都指向了一个相关的类 Condition 。

总结

AQS是锁机制实现的底层技术,它通过队列的方式将争用锁的线程进行排队等待。与此不同的是,Condition代替了Object中的同步方法能力。

当Condition进入等待状态时,它会在等待队列的队尾插入新的节点,并且通过release方法释放锁资源。在释放锁资源后,它会通过acquire开启一个自旋尝试重新获取锁资源。当自旋一定时间后,如果未成功获取到锁资源,就会通过LockSupport的API进入阻塞状态。

对于依赖单个原子int值来表示同步状态的解读,Node通过status属性来控制与之关联的线程的等待状态,而AQS的state用于控制同步状态。

顶尖架构师栈

关注回复关键字

【C01】超10G后端学习面试资源

【IDEA】最新IDEA激活工具和码及教程

【JetBrains软件名】 最新软件激活工具和码及教程

工具&码&教程

转载于:https://mp.weixin.qq.com/s/rM7eJ48_FuYzquJgSyrpjA