⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://cmsblogs.com/?p=2381 「小明哥」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 简介

ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用 FIFO 的原则对元素进行排序添加的。

ArrayBlockingQueue 为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。

ArrayBlockingQueue 支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。

2. 构造方法

先看看 java.util.concurrent.ArrayBlockingQueue 的构造方法,代码如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {

private static final long serialVersionUID = -817911632652898426L;

final Object[] items;
int takeIndex;
int putIndex;
int count;
// 重入锁
final ReentrantLock lock;
// notEmpty condition
private final Condition notEmpty;
// notFull condition
private final Condition notFull;
transient ArrayBlockingQueue.Itrs itrs;

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

}

  • 可以清楚地看到 ArrayBlockingQueue 继承 java.util.AbstractQueue ,实现 java.util.concurrent.BlockingQueue 接口。看过 java.util 包源码的同学应该都认识AbstractQueue,该类在 java.util.Queue 接口中扮演着非常重要的作用,该类提供了对queue 操作的骨干实现(具体内容移驾其源码)。

  • java.util.concurrent.BlockingQueue 继承 java.util.Queue 接口,为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作。作为使用者,则不需要关心队列在什么时候阻塞线程,什么时候唤醒线程,所有一切均由 BlockingQueue 来完成。

  • ArrayBlockingQueue 内部使用可重入锁 ReentrantLock + Condition 来完成多线程环境的并发操作。

    • items 变量,一个定长数组,维护 ArrayBlockingQueue 的元素。
      • takeIndex 变量,int ,为 ArrayBlockingQueue 队首位置。
      • putIndex 变量,int ,ArrayBlockingQueue 队尾位置。
      • count 变量,元素个数。
    • lock 变量,ReentrantLock ,ArrayBlockingQueue 出列入列都必须获取该锁,两个步骤共用一个锁。
      • notEmpty 变量,非空,即出列条件。
      • notFull 变量,未满,即入列条件。

3. 入队

ArrayBlockingQueue 提供了诸多方法,可以将元素加入队列尾部。

  • #add(E e) 方法:将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true ,如果此队列已满,则抛出 IllegalStateException 异常。
  • #offer(E e) 方法:将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true ,如果此队列已满,则返回 false 。
  • #offer(E e, long timeout, TimeUnit unit) 方法:将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
  • #put(E e) 方法:将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。

3.1 add

// ArrayBlockingQueue.java
@Override
public boolean add(E e) {
return super.add(e);
}

// AbstractQueue.java
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

  • #add(E e)方法,调用 #offer(E e) 方法,如果返回false,则直接抛出 IllegalStateException 异常。

3.2 offer

@Override
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

  • 首先,检查是否为 null
  • 然后,获取 Lock 锁。获取锁成功后,如果队列已满则,直接返回 false 。
  • 最后,调用 #enqueue(E e) 方法,它为入列的核心方法,所有入列的方法最终都将调用该方法,在队列尾部插入元素。

3.2.1 enqueue

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 添加元素
final Object[] items = this.items;
items[putIndex] = x;
// 到达队尾,回归队头
if (++putIndex == items.length)
putIndex = 0;
// 总数+1
count++;
// 通知阻塞在出列的线程
notEmpty.signal();
}

  • 该方法就是在 putIndex(对尾)位置处,添加元素,最后调用 notEmpty#signal() 方法,通知阻塞在出列的线程(如果队列为空,则进行出列操作是会阻塞)。

3.3 可超时的 offer

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 获得锁
lock.lockInterruptibly();
try {
// <1> 若队列已满,循环等待被通知,再次检查队列是否非空
while (count == items.length) {
// 可等待的时间小于等于零,直接返回失败
if (nanos <= 0)
return false;
// 等待,直到超时
nanos = notFull.awaitNanos(nanos); // 返回的为剩余可等待时间,相当于每次等待,都会扣除相应已经等待的时间。
}
// 入队
enqueue(e);
return true;
} finally {
// 解锁
lock.unlock();
}
}

  • 相比 #offer(E e) 方法,增加了 <1> 处:
    • 若队列已满,调用 notFull#awaitNanos(long nanos) 方法,等待被通知(元素出列时,会调用 notFull#signal() 方法,进行通知阻塞等待的入列线程)或者超时
    • 被通知后,再次检查队列是否非空。若非空,继续向下执行,否则继续等待被通知。

3.4 put

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获得锁
lock.lockInterruptibly();
try {
// <1> 若队列已满,循环等待被通知,再次检查队列是否非空
while (count == items.length)
notFull.await();
// 入队
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}

  • 相比 #offer(E e) 方法,增加了 <1> 处:
    • 若队列已满,调用 notFull#await() 方法,等待被通知(元素出列时,会调用 notFull#await() 方法,进行通知阻塞等待的入列线程)。
    • 被通知后,再次检查队列是否非空。若非空,继续向下执行,否则继续等待被通知。

4. 出队

ArrayBlockingQueue 提供的出队方法如下:

  • #poll() 方法:获取并移除此队列的头,如果此队列为空,则返回 null
  • #poll(long timeout, TimeUnit unit) 方法:获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
  • #take() 方法:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
  • #remove(Object o) 方法:从此队列中移除指定元素的单个实例(如果存在)。

4.1 poll

public E poll() {
// 获得锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获得头元素
return (count == 0) ? null : dequeue();
} finally {
// 释放锁
lock.unlock();
}
}

  • 如果队列为空,则返回 null,否则,调用 #dequeue() 方法,获取列头元素。

3.1.1 dequeue

 private E dequeue() {
final Object[] items = this.items;
// 去除队首元素
E x = (E) items[takeIndex];
items[takeIndex] = null; // 置空
// 到达队尾,回归队头
if (++takeIndex == items.length)
takeIndex = 0;
// 总数 - 1
count--;
// 维护下迭代器
if (itrs != null)
itrs.elementDequeued();
// 通知阻塞在入列的线程
notFull.signal();
return x;
}

  • 该方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器 itrs 不为 null ,则需要维护下该迭代器。最后,调用 notFull#signal() 方法,唤醒阻塞在入列线程。

4.2 可超时的 poll

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 获得锁
lock.lockInterruptibly();
try {
// <1> 若队列已空,循环等待被通知,再次检查队列是否非空
while (count == 0) {
// 可等待的时间小于等于零,直接返回 null
if (nanos <= 0)
return null;
// 等待,直到超时
nanos = notEmpty.awaitNanos(nanos); // 返回的为剩余可等待时间,相当于每次等待,都会扣除相应已经等待的时间。
}
// 出队
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}

  • 相比 #poll() 方法,增加了 <1> 处:
    • 若队列已空,调用 notEmpty#awaitNanos(long nanos) 方法,等待被通知(元素入列时,会调用 notEmpty#signal() 方法,进行通知阻塞等待的出列线程)或者超时返回 null
    • 被通知后,再次检查队列是否为空。若非空,继续向下执行,否则继续等待被通知。

4.3 take

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获得锁
lock.lockInterruptibly();
try {
// <1> 若队列已空,循环等待被通知,再次检查队列是否非空
while (count == 0)
notEmpty.await();
// 出列
return dequeue();
} finally {
// 解锁
lock.unlock();
}
}

  • 相比 #poll() 方法,增加了 <1> 处:
    • 若队列已空,调用 notEmpty#await() 方法,等待被通知(元素入列时,会调用 notEmpty#signal() 方法,进行通知阻塞等待的出列线程)。
    • 被通知后,再次检查队列是否为空。若非空,继续向下执行,否则继续等待被通知。

4.4 remove

public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
// 获得锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 循环向下查找,若匹配,则进行移除。
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
// 释放锁
lock.unlock();
}
}

  • 详细解析,见代码注释。

  • #removeAt(int removeIndex) 方法,移除指定位置的元素。代码如下:

    void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    // 移除的为队头,直接移除即可
    if (removeIndex == takeIndex) {
    // removing front item; just advance
    items[takeIndex] = null;
    if (++takeIndex == items.length)
    takeIndex = 0;
    count--;
    if (itrs != null)
    itrs.elementDequeued();
    // 移除非队头,移除的同时,需要向前复制,填补这个空缺。
    } else {
    // an "interior" remove

    // slide over all others up through putIndex.
    final int putIndex = this.putIndex;
    for (int i = removeIndex;;) {
    int next = i + 1;
    if (next == items.length)
    next = 0;
    if (next != putIndex) {
    items[i] = items[next];
    i = next;
    } else {
    items[i] = null;
    this.putIndex = i;
    break;
    }
    }
    count--;
    if (itrs != null)
    itrs.removedAt(removeIndex);
    }
    // 通知
    notFull.signal();
    }

5. 补充说明

老艿艿:因为本文的重心在 ArrayBlockingQueue 的入队和出队,所以其他方法,例如迭代器等等,并未解析。所以,胖友,你懂的,自己研究哈。

666. 彩蛋

如果你对 Java 并发感兴趣,欢迎加入我的知识星球一起交流。

知识星球

文章目录
  1. 1. 1. 简介
  2. 2. 2. 构造方法
  3. 3. 3. 入队
    1. 3.1. 3.1 add
    2. 3.2. 3.2 offer
      1. 3.2.1. 3.2.1 enqueue
    3. 3.3. 3.3 可超时的 offer
    4. 3.4. 3.4 put
  4. 4. 4. 出队
    1. 4.1. 4.1 poll
      1. 4.1.1. 3.1.1 dequeue
    2. 4.2. 4.2 可超时的 poll
    3. 4.3. 4.3 take
    4. 4.4. 4.4 remove
  5. 5. 5. 补充说明
  6. 6. 666. 彩蛋