`
liuluo129
  • 浏览: 114948 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

BlockingQueue阻塞队列分析(一)-ArrayBlockingQueue和LinkedBlockingQueue

    博客分类:
  • java
阅读更多

BlockingQueue接口

BlockingQueue接口继承自Queue接口:

public interface BlockingQueue<E>extends Queue<E>

与Queue队列相比,它是线程安全的。添加和移除元素有四类方式,其中add()、remove()、offer()、poll()、element()、peek()方法继承自Queue。

  Throws exception Special value Blocks Times out
Insert boolean add(e) boolean offer(e) void put(e) boolean offer(e, time, unit)
Remove E remove() E poll() E take() E poll(time, unit)
Examine E element() E peek() not applicable not applicable

 

通俗的说:

  • 不能搞时抛出异常;
  • 不能搞时返回null或false
  • 不能搞时一直等,直到能搞且搞到为止
  • 一段时间内尝试去搞,实在搞不了返回null或false

除此之外,还有以下方法:

int remainingCapacity();                  队列剩余存储空间

boolean remove(Object o);             删除队列中的特定元素

public boolean contains(Object o); 队列中是否包含特定元素

int drainTo(Collection<? super E> c);     把队列中的元素删除并添加到指定的集合中

int drainTo(Collection<? super E> c, int maxElements);   把队列中的maxElements个元素删除添加到指定集合

ArrayBlockingQueue

底层以数组来存储元素,且构造函数必须至少指定队列大小,只使用一个可重入锁来来控制线程访问

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 底层存储元素数组  */
    private final E[] items;
    private int takeIndex;
    private int putIndex;
    private int count;

    // 使用经典的two-condition算法进行并发控制
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    // 指定队列容器,默认使用非公平锁
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        if (capacity < c.size())
            throw new IllegalArgumentException();

        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    }
...
}

put()和take()方法是经典的生产者-消费者算法:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();  //使用while自旋锁
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e); //插入元素,释放notEmpty信号
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

  take()方法如下:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

 

LinkedBlockingQueue

LinkedBlockingQueue与ArrayBlockingQueue相比有以下不同:

  • 底层使用链表而非数组存储元素;
  • 使用两个锁来控制线程访问,这样队列可以同时进行put和take的操作,因此吞吐量相对ArrayBlockingQueue就高。
  • 可以不指定队列大小,此时默认大小为Integer.MAX_VALUE
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** 队列边界大小,不指定为默认为Integer.MAX_VALUE */
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger(0);
    private transient Node<E> head;
    private transient Node<E> last;

    /** 控制take、poll的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** 控制put、offer的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(e);
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
}

 看下put操作源码:

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 当队列大小等于队列边界大小时,使用自旋锁等待
            while (count.get() == capacity) { 
                    notFull.await();
            }
            enqueue(e);  // 队列最后插入元素
            c = count.getAndIncrement();
            // 队列不满,则发送notFull信号量,其他线程可以进行put
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //队列为空,发送notEmpty信号量,通知线程可以进行take
            signalNotEmpty(); 
    }
    private void enqueue(E x) {
        // assert putLock.isHeldByCurrentThread();
        last = last.next = new Node<E>(x);
    }

 take()方法与put类似:

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

 使用Executors.newFixedThreadPool()方法,使用的就是LinkedBlockingQueue存储的等待线程队列。

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics