BlockingQueue接口
BlockingQueue接口继承自Queue接口:
public interface BlockingQueue<E>extends Queue<E>
与Queue队列相比,它是线程安全的。添加和移除元素有四类方式,其中add()、remove()、offer()、poll()、element()、peek()方法继承自Queue。
Throws exception | Special value | Blocks | Times out | |
Insert | boolean add(e) |
boolean offer(e) |
void put(e) |
boolean offer(e, time, unit) |
Remove | E remove() |
E poll() |
E take() |
E poll(time, unit) |
Examine | E element() |
E peek() |
not applicable | not applicable |
通俗的说:
- 不能搞时抛出异常;
- 不能搞时返回null或false
- 不能搞时一直等,直到能搞且搞到为止
- 一段时间内尝试去搞,实在搞不了返回null或false
除此之外,还有以下方法:
int remainingCapacity(); 队列剩余存储空间
boolean remove(Object o); 删除队列中的特定元素
public boolean contains(Object o); 队列中是否包含特定元素
int drainTo(Collection<? super E> c); 把队列中的元素删除并添加到指定的集合中
int drainTo(Collection<? super E> c, int maxElements); 把队列中的maxElements个元素删除添加到指定集合
ArrayBlockingQueue
底层以数组来存储元素,且构造函数必须至少指定队列大小,只使用一个可重入锁来来控制线程访问:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 底层存储元素数组 */ private final E[] items; private int takeIndex; private int putIndex; private int count; // 使用经典的two-condition算法进行并发控制 private final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; // 指定队列容器,默认使用非公平锁 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); if (capacity < c.size()) throw new IllegalArgumentException(); for (Iterator<? extends E> it = c.iterator(); it.hasNext();) add(it.next()); } ... }
put()和take()方法是经典的生产者-消费者算法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); //使用while自旋锁 } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); //插入元素,释放notEmpty信号 } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
take()方法如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
LinkedBlockingQueue
LinkedBlockingQueue与ArrayBlockingQueue相比有以下不同:
- 底层使用链表而非数组存储元素;
- 使用两个锁来控制线程访问,这样队列可以同时进行put和take的操作,因此吞吐量相对ArrayBlockingQueue就高。
- 可以不指定队列大小,此时默认大小为Integer.MAX_VALUE
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 队列边界大小,不指定为默认为Integer.MAX_VALUE */ private final int capacity; private final AtomicInteger count = new AtomicInteger(0); private transient Node<E> head; private transient Node<E> last; /** 控制take、poll的锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** 控制put、offer的锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(e); ++n; } count.set(n); } finally { putLock.unlock(); } } }
看下put操作源码:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 当队列大小等于队列边界大小时,使用自旋锁等待 while (count.get() == capacity) { notFull.await(); } enqueue(e); // 队列最后插入元素 c = count.getAndIncrement(); // 队列不满,则发送notFull信号量,其他线程可以进行put if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) //队列为空,发送notEmpty信号量,通知线程可以进行take signalNotEmpty(); } private void enqueue(E x) { // assert putLock.isHeldByCurrentThread(); last = last.next = new Node<E>(x); }
take()方法与put类似:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
使用Executors.newFixedThreadPool()方法,使用的就是LinkedBlockingQueue存储的等待线程队列。
相关推荐
在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的... 使用ArrayBlockingQueue的一个子类BlockingQueue实现一个可阻塞队列,一个线程put另一个线程take,当队列为空时take等待,当线程满时put等待
14-阻塞队列BlockingQueue实战及其原理分析二.pdf
首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: BlockingQueue<Integer> ...
6.6 阻塞队列BlockingQueue 实战及其原 理分析一副本.mp4
2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. ...
6.6 阻塞队列BlockingQueue 实战及其原 理分析一副本副本.mp4
10、阻塞队列BlockingQueue实战及其原理分析_
在新增的Concurrent包中,BlockingQueue...通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
6.6 阻塞队列BlockingQueue 实战及其原 理分析一副本副本副本.mp4
2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...
阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...
BlockingQueue接口 – 阻塞队列2.1 ArrayBlockingQueue类(有界阻塞队列)2.2 LinkedBlockingQueue类(无界阻塞队列)3. 源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public ...
定义全局线程池,将用户的请求放入自定义队列中,排队等候线程调用,等待超时则自动取消该任务,实现超时可取消的异步任务
2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...
6.7 阻塞队列BlockingQueue 实战及其原 理分析二副本.mp4
6.JUC并发工具类在大厂的应用场景详解 (1).pdf 7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf ...10、阻塞队列BlockingQueue 实战及其原理分析.pdf
20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之ScheduledThreadPoolExecutor 23.FutureTask基本操作总结 24.Java中atomic包中的原子操作类...
6.7 阻塞队列BlockingQueue 实战及其原 理分析二副本副本.mp4
2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...
线程----BlockingQueue 的介绍说明