BlockingQueue
About 3 min
使用
如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作。
下面列举一个例子:
public class BlockingQueueMain {
static BlockingQueue<String> queue = new LinkedBlockingDeque<>();
public static void main(String[] args) throws InterruptedException {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new Consumer()).start();
}
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 4; i++) {
try {
System.out.println(Thread.currentThread().getName() + " start to put, time: " + LocalDateTime.now());
var value = "producer" + i;
queue.put(value);
System.out.println(Thread.currentThread().getName() + " put the value " + value + ", time: " + LocalDateTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 2; i++) {
try {
System.out.println(Thread.currentThread().getName() + " start to take, time: " + LocalDateTime.now());
System.out.println(Thread.currentThread().getName() + " take the value " + queue.take() + ", time: " + LocalDateTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
输出结果如下:
Thread-1 start to take, time: 2023-09-08T15:22:09.366806
Thread-0 start to put, time: 2023-09-08T15:22:09.366780
Thread-2 start to take, time: 2023-09-08T15:22:09.366770
Thread-0 put the value producer0, time: 2023-09-08T15:22:09.384014
Thread-1 take the value producer0, time: 2023-09-08T15:22:09.384057
Thread-0 start to put, time: 2023-09-08T15:22:09.390801
Thread-1 start to take, time: 2023-09-08T15:22:09.390858
Thread-0 put the value producer1, time: 2023-09-08T15:22:09.390923
Thread-1 take the value producer1, time: 2023-09-08T15:22:09.390930
Thread-0 start to put, time: 2023-09-08T15:22:09.390977
Thread-0 put the value producer2, time: 2023-09-08T15:22:09.391050
Thread-2 take the value producer2, time: 2023-09-08T15:22:09.391060
Thread-0 start to put, time: 2023-09-08T15:22:09.391116
Thread-2 start to take, time: 2023-09-08T15:22:09.391151
Thread-0 put the value producer3, time: 2023-09-08T15:22:09.391177
Thread-2 take the value producer3, time: 2023-09-08T15:22:09.391239
分析
BlockingQueue
是一个接口,它的实现有如下这几种:
其中比较常用的就是ArrayBlockingQueue
和LinkedBiockingQueue
。
ArrayBlockingQueue
是基于数组实现的,适合做有界队列。因为是数组,效率查询优于修改LinkedBlockingQueue
是基于链表实现的,适合做无界队列或者比较大的队列。因为是链表,效率修改优于查询
我们分析就使用ArrayBlockingQueue
。
final Object[] items;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
ArrayBlockingQueue
的初始化:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue
的元素由items数组来维护。
在put
方法中
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
与put
相似的方法offer
不会阻塞,如果当前队列己经满了,它就会立即返回false
。如果没有满,则执行正常的入队操作。
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
而在take
方法中
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}
与take
相似的方法poll
不会阻塞,如果当前队列为空,它就会立即返回null
。如果不是空,则执行正常的出队操作。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
FAQ
总结
BlockingQueue方法的总结
操作 | 异常 | 返回值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
删除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | \ | \ |
贴出add
的方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
有关的BlockingQueue的实现
SynchronousQueue
BlockingQueue的使用场景
BlockingQueue的使用场景