Analysis of Blocking Principle of JAVA Blocking Queue

Analysis of Blocking Principle of JAVA Blocking Queue

Sharing is the transfer of value, just like it


JDK-8u261

1 Introduction

Because the threads in the CLH queue, which thread acquires the lock, which thread enters the queue, and which thread releases the lock, are beyond our control. Therefore, the emergence of conditional queues provides us with a way to actively block and wake up threads only after the specified conditions are met. For conditional queues, we first need to explain some concepts: conditional queues are another kind of queues in AQS besides CLH queues. Each creation of a Condition actually creates a conditional queue, and each call to the await method is actually to the conditional queue. In the queue, each call to the signal method is actually dequeuing from the conditional queue. Unlike the status of multiple nodes on the CLH queue, there is only one status of the node on the conditional queue: CONDITION. So if a node on the condition queue is no longer in the CONDITION state, it means that this node should be dequeued. It should be noted that the conditional queue can only be run in exclusive mode .

Generally, when using a conditional queue as a blocking queue, two conditional queues are created: notFull and notEmpty . notFull means that when the condition queue is full, the put method will be in a waiting state until the queue is not full; notEmpty means that when the condition queue is empty, the take method will be in a waiting state until there is data in the queue.

The notFull.signal method and notEmpty.signal method will move the nodes on the conditional queue to the CLH queue (only one at a time). In other words, there is a situation where a node is transferred from the conditional queue to the CLH queue . It also means that there will be no lock resource contention on the conditional queue, and all lock contention occurs on the CLH queue. .

Some other differences between the conditional queue and the CLH queue are as follows:

  • The conditional queue uses the nextWaiter pointer to point to the next node, which is a singly linked list structure, which is different from the doubly linked list structure of the CLH queue;
  • The conditional queue uses firstWaiter and lastWaiter to point to the head and tail pointer, which is different from the head and tail of the CLH queue;
  • The first node in the conditional queue will not be a special empty node like the CLH queue;
  • Unlike the CLH queue that uses a lot of CAS operations to control concurrency, the prerequisite for the conditional queue to enter the queue is that the exclusive lock resource has been obtained, so there is no need to consider concurrency in many places.

The following is the specific source code analysis. The conditional queue uses ArrayBlockingQueue as an example:


2 Constructor

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  public ArrayBlockingQueue(int capacity) {
 5    this(capacity, false);
 6}
 7
 8  public ArrayBlockingQueue(int capacity, boolean fair) {
 9    if (capacity <= 0)
10        throw new IllegalArgumentException();
11    //
12    this.items = new Object[capacity];
13    //ReentrantLock fair 
14    lock = new ReentrantLock(fair);
15    //notEmpty 
16    notEmpty = lock.newCondition();
17    //notFull 
18    notFull = lock.newCondition();
19  }
 

3 put method

  1  /**
  2   * ArrayBlockingQueue:
  3   */
  4  public void put(E e) throws InterruptedException {
  5    //
  6    checkNotNull(e);
  7    final ReentrantLock lock = this.lock;
  8    /*
  9     lock Semaphore acquire 
 10     
 11     */
 12    lock.lockInterruptibly();
 13    try {
 14        while (count == items.length)
 15            //notFull 
 16            notFull.await();
 17        //notEmpty
 18        enqueue(e);
 19    } finally {
 20        //
 21        lock.unlock();
 22    }
 23  }
 

4 await method

If the array is found to be full during put, or the array is found to be empty during take, the await method will be called to put the current node into the conditional queue:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  public final void await() throws InterruptedException {
 5    //
 6    if (Thread.interrupted())
 7        throw new InterruptedException();
 8    //
 9    Node node = addConditionWaiter();
10    //
11    int savedState = fullyRelease(node);
12    int interruptMode = 0;
13    //CLH unpark 
14    while (!isOnSyncQueue(node)) {
15        LockSupport.park(this);
16        /*
17         signal CLH 
18         spurious wakeup 
19         while 
20         */
21        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
22            break;
23    }
24    //CLH signal CLH 
25    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
26        /*
27        <<<THROW_IE REINTERRUPT transferAfterCancelledWait >>>
28
29         acquireQueued true 
30         true acquireQueued 
31         15 acquireQueued interruptMode=0 
32         acquireQueued interruptMode REINTERRUPT 
33         THROW_IE 15 signal 
34         CLH signal 25 
35         InterruptedException
36         */
37        interruptMode = REINTERRUPT;
38    /*
39     acquireQueued 
40     interruptMode=REINTERRUPT signal 
41    nextWaiter unlinkCancelledWaiters 
42     interruptMode=THROW_IE signal 
43    unlinkCancelledWaiters transferAfterCancelledWait 
44     0 CONDITION 
45     signal 
46     */
47    if (node.nextWaiter != null)
48        unlinkCancelledWaiters();
49    //
50    if (interruptMode != 0)
51        reportInterruptAfterWait(interruptMode);
52  }
 

5 addConditionWaiter method

The logic of adding a node to the conditional queue:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  private Node addConditionWaiter() {
 5    Node t = lastWaiter;
 6    /*
 7     CONDITION CONDITION 
 8     CONDITION 
 9     */        
10    if (t != null && t.waitStatus != Node.CONDITION) {
11        //CONDITION 
12        unlinkCancelledWaiters();
13        t = lastWaiter;
14    }
15    //CONDITION 
16    Node node = new Node(Thread.currentThread(), Node.CONDITION);
17    if (t == null)
18        //t null 
19        firstWaiter = node;
20    else
21        //t null 
22        t.nextWaiter = node;
23    //
24    lastWaiter = node;
25    /*
26     CLH enq 
27     
28     */
29    return node;
30  }
31
32  /**
33   *  12 
34   *  CONDITION 
35   */
36  private void unlinkCancelledWaiters() {
37    Node t = firstWaiter;
38    /*
39     trail CONDITION 
40     CONDITION CONDITION 
41     trail.nextWaiter = next 
42     */
43    Node trail = null;
44    while (t != null) {
45        Node next = t.nextWaiter;
46        if (t.waitStatus != Node.CONDITION) {
47            t.nextWaiter = null;
48            if (trail == null)
49                firstWaiter = next;
50            else
51                trail.nextWaiter = next;
52            if (next == null)
53                lastWaiter = trail;
54        } else
55            trail = t;
56        t = next;
57    }
58  }
 

6 fullyRelease method

Release lock resources, including all lock resources that can be reentered:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  final int fullyRelease(Node node) {
 5    boolean failed = true;
 6    try {
 7        int savedState = getState();
 8        /*
 9         
10         savedState fully 
11         */
12        if (release(savedState)) {
13            failed = false;
14            return savedState;
15        } else {
16            /*
17             state 
18             166 
19             */
20            throw new IllegalMonitorStateException();
21        }
22    } finally {
23        /*
24         CANCELLED addConditionWaiter 10 
25         CONDITION CONDITION 
26         
27         CANCELLED CONDITION 
28         addConditionWaiter 12 CONDITION 
29         CONDITION 
30         */
31        if (failed)
32            node.waitStatus = Node.CANCELLED;
33    }
34  }
 

7 isOnSyncQueue method

Determine whether the node is in the CLH queue to determine whether the signal method is completed when waking up. Of course, this method will also be called in the transferAfterCancelledWait method:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   *  CLH 
 4   */
 5  final boolean isOnSyncQueue(Node node) {
 6    /*
 7     CONDITION prev prev CLH 
 8     prev false
 9     */
10    if (node.waitStatus == Node.CONDITION || node.prev == null)
11        return false;
12    //next next CLH nextWaiter true
13    if (node.next != null)
14        return true;
15    //CLH 
16    return findNodeFromTail(node);
17  }
18
19  /**
20   *  CLH 
21   */
22  private boolean findNodeFromTail(Node node) {
23    Node t = tail;
24    for (; ; ) {
25        if (t == node)
26            return true;
27        if (t == null)
28            return false;
29        t = t.prev;
30    }
31  }
 

8 checkInterruptWhileWaiting method

Determine the state of wake-up (0/THROW_IE/REINTERRUPT):

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   *  0
 4   *  signal THROW_IE
 5   *  signal REINTERRUPT
 6   */
 7  private int checkInterruptWhileWaiting(Node node) {
 8    return Thread.interrupted() ?
 9            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
10            0;
11  }
12
13  /**
14   *  signal THROW_IE REINTERRUPT 
15   *  signal CONDITION CLH transferForSignal 
16   * <p>
17   * THROW_IE signal CLH 
18   *  CLH InterruptedException
19   * <p>
20   * REINTERRUPT signal 
21   *  CLH signal 
22   *  CLH CLH 
23   *  InterruptedException
24   */
25  final boolean transferAfterCancelledWait(Node node) {
26    //CONDITION
27    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
28        /*
29         CAS CONDITION interruptMode THROW_IE
30         CLH 
31         */
32        enq(node);
33        return true;
34    }
35    /*
36     CAS CONDITION signal 
37     signal transferForSignal CONDITION
38     CLH CONDITION 
39     CLH 
40     signal CLH 
41     */
42    while (!isOnSyncQueue(node))
43        Thread.yield();
44    return false;
45  }
 

9 reportInterruptAfterWait method

The final processing of interrupt wake-up:

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  private void reportInterruptAfterWait(int interruptMode)
 5        throws InterruptedException {
 6    if (interruptMode == THROW_IE)
 7        //THROW_IE InterruptedException 
 8        throw new InterruptedException();
 9    else if (interruptMode == REINTERRUPT)
10        //REINTERRUPT true 
11        selfInterrupt();
12  }
 

10 enqueue method

The enqueue logic of ArrayBlockingQueue:

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  private void enqueue(E x) {
 5    final Object[] items = this.items;
 6    //
 7    items[putIndex] = x;
 8    //putIndex putIndex 0 
 9    if (++putIndex == items.length)
10        putIndex = 0;
11    //+1
12    count++;
13    /*
14     notEmpty notEmpty CLH 
15     notEmpty signal take 
16     */
17    notEmpty.signal();
18  }
 

11 signal method

Check whether you need to wake up the nodes in the condition queue, and wake up as needed (transfer the node from the condition queue to the CLH queue):

 1  /**
 2   * AbstractQueuedSynchronizer:
 3   */
 4  public final void signal() {
 5    //
 6    if (!isHeldExclusively())
 7        throw new IllegalMonitorStateException();
 8    Node first = firstWaiter;
 9    if (first != null)
10        //notEmpty CLH 
11        doSignal(first);
12  }
13
14  private void doSignal(Node first) {
15    do {
16        if ((firstWaiter = first.nextWaiter) == null)
17            //null lastWaiter null
18            lastWaiter = null;
19        //notEmpty nextWaiter GC
20        first.nextWaiter = null;
21    } while (!transferForSignal(first) &&
22            //CONDITION CLH 
23            (first = firstWaiter) != null);
24  }
25
26  /**
27   *  notEmpty CLH 
28   *  21 
29   */
30  final boolean transferForSignal(Node node) {
31    /*
32     notEmpty CONDITION false 
33     
34     */
35    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
36        return false;
37
38    //0 CLH 
39    Node p = enq(node);
40    int ws = p.waitStatus;
41    /*
42     SIGNAL SIGNAL notEmpty 
43     CLH SIGNAL
44     CAS 
45     acquireQueued 
46     SIGNAL acquireQueued CAS 
47     CANCELLED acquireQueued 
48     CANCELLED 
49     take 
50     2 
51     1 2 acquireQueued 1 put 
52     unlock 
53     3 3 unlock 
54     2 CLH 
55     */
56    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
57        LockSupport.unpark(node.thread);
58    return true;
59  }
 

12 take method

The take method of ArrayBlockingQueue:

 1  /**
 2   * ArrayBlockingQueue:
 3   */
 4  public E take() throws InterruptedException {
 5    final ReentrantLock lock = this.lock;
 6    //
 7    lock.lockInterruptibly();
 8    try {
 9        while (count == 0)
10            //notEmpty 
11            notEmpty.await();
12        //notFull
13        return dequeue();
14    } finally {
15        //
16        lock.unlock();
17    }
18  }
19
20  /**
21   *  13 
22   */
23  private E dequeue() {
24    final Object[] items = this.items;
25    //
26    @SuppressWarnings("unchecked")
27    E x = (E) items[takeIndex];
28    //
29    items[takeIndex] = null;
30    //takeIndex takeIndex 0
31    if (++takeIndex == items.length)
32        takeIndex = 0;
33    //-1
34    count--;
35    //elementDequeued Itrs 
36    if (itrs != null)
37        itrs.elementDequeued();
38    /*
39     notFull notFull CLH 
40     notFull signal put 
41     */
42    notFull.signal();
43    return x;
44  }
 

The longer I have been in this industry, the more I feel: This must be a truth if you rise from a high building on the ground! Many low-level things are often overlooked in the application business for too long. The plan at the beginning of this year is to summarize the commonly used JDK source code tools. As the end of the year is approaching, I can quickly make up for it when I have time.

  1. Do you really understand ArrayList? Talk about the difference between foreach and iterator when remove
  2. Have you ever wondered why Internet companies always ask about collections? Talk about the classic data structure HashMap
  3. Exclusive mode for in-depth analysis of AQS source code-detailed explanation of ReentrantLock lock characteristics
  4. AQS source code in-depth analysis of the sharing mode-why the PROPAGATE status in AQS?
  5. AQS source code in-depth analysis of the conditional queue-how is the blocking queue in Java implemented? (Current article)
  6. CountDownLatch, an application tool for in-depth analysis of AQS source code (under creation)
  7. CyclicBarrier, an application tool for in-depth analysis of AQS source code (under creation)
  8. ConcurrentHashMap source code analysis-ConcurrentHashMap implementation in Java 8 is there any bug? And there is more than one place! This pit is still relatively large, and I will focus on it later! (finished)
  9. ThreadPoolExecutor source code analysis-ask the rotten Java thread pool execution process, in fact, if you ask for details, many people are still confused? (finished)
  10. ScheduledThreadPoolExecutor source code analysis-focus on how to achieve delayed execution and periodic execution in the timed thread pool!
  11. ThreadLocal source code analysis-key summary, memory leaks, soft references, weak references, false references, interviews often like to ask, I also like to ask other people
  12. Red-black tree TreeMap, LinkedHashMap (not sure if you want to write, if you have time to write, it depends on the project situation)
  13. Ordered and threaded Map container ConcurrentSkipListMap (jump table) in-depth understanding
  14. LinkedList (not sure if you want to write, if you have time to write, it depends on the project situation)
  15. Quick sorting of 1T data! Summary of ten classic sorting algorithms

Each summary is a review of the mastery of the knowledge points. The technique is not easy, so I am a little diligent every day and encourage everyone.

In addition, the author s public account: Geek Time, there are more wonderful articles, interested students can follow