上面我们己经提到了 ConcurrentLinkedQueue
作为高性能的非阻塞队列。下面我们要讲到的是阻塞队列——BlockingQueue
。阻塞队列(BlockingQueue
)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue
提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
下面主要介绍一下 3 个常见的 BlockingQueue
的实现类:ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
。
ArrayBlockingQueue
一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock
,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue
默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue
。而非公平性则是指访问 ArrayBlockingQueue
的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue
可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue
。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue
,可采用如下代码:
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
LinkedBlockingQueue
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue
相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue
容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue
对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE
。
相关构造方法:
/**
*某种意义上的无界队列
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
*有界队列
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo()
方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator
来指定排序规则。
PriorityBlockingQueue
并发控制采用的是可重入锁 ReentrantLock
,队列为无界队列(ArrayBlockingQueue
是有界队列,LinkedBlockingQueue
也可以通过在构造函数中传入 capacity
指定队列最大的容量,但是 PriorityBlockingQueue
只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue
的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException
异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。