Java并发学习笔记(八)-LinkedBlockingQueue
LinkedBlockingQueue是由链表组成的阻塞队列,先来看demo
public class LinkedBlockingQueueDemo { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); BlockingQueue<Bread> queue = new LinkedBlockingQueue<Bread>(10); for(int i = 0; i < 2; i++){ es.execute(new Baker(queue)); } for(int i = 0; i < 10; i++){ es.execute(new BreadConsumer(queue)); } es.shutdown(); } } class Baker implements Runnable{ private static int no = 0; private int id = ++no; private int count = 0; private BlockingQueue<Bread> queue; public Baker(BlockingQueue<Bread> queue){ this.queue = queue; } @Override public void run() { for(int i = 0; i < 10; i++){ System.out.println("面包师" + id + "正准备做第" + ++count + "面包"); Bread bread = new Bread(); // 满队列情况下,阻塞 try { queue.put(bread); System.out.println("面包师" + id + "做的第" + count + "面包是面包"+ bread.getId()); } catch (InterruptedException e) { } } } } class BreadConsumer implements Runnable{ private static int no = 0; private int id = ++no; private int count = 0; private BlockingQueue<Bread> queue; public BreadConsumer(BlockingQueue<Bread> queue){ this.queue = queue; } @Override public void run() { for(int i = 0; i < 2; i++){ System.out.println("顾客 " + id + "准备买第" + ++count +"个面包" ); Bread bread = null; // 空队列情况下,阻塞 try { bread = queue.take(); } catch (InterruptedException e) { } <span style="white-space:pre"> </span>System.out.println("顾客" + id + "买到的第" +count+"个面包是面包" + bread.getId()); } } } class Bread { private static int count = 0; private int id = ++count; public int getId() { return id; } }
默认情况下,其容量为Integer.MAX_VALUE
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }也可以指定一个容量以免链表过度扩张
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }在LinkedBlockingQueue的链表里,有两个指针,分别指向队列头和队列尾。与ArrayBlockingQueue不同的是,在LinkedBlockingQueue里,有两把锁,分别锁队列头和队列尾。这样做是有好处的,可以同时入队和出队,比ArrayBlockingQueue性能高
/** 链表头 */ private transient Node<E> head; /** 链表尾 */ private transient Node<E> last; /** 出队的锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** 入队的锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();我们来看put阻塞方法,如果队列满,会一直阻塞,直到队列不满
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }往队列里插入数据时,首先会持有putLock锁,如果当前队列元素个数跟容量相等,阻塞,调用notFull.await。不然,入队。入队后如果元素个数小于队列容量,会唤醒其它的阻塞的插入线程。最后一句,不明白,元素为0,为什么会去执行唤醒空条件?求指教
队列里,插入元素,会插入队尾
private void enqueue(E x) { // assert putLock.isHeldByCurrentThread(); last = last.next = new Node<E>(x); }再来看出队操作take,也是阻塞方法
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }出队获取的是takeLock,类似的,如果当前队列元素个数为0,阻塞,调用notEmpty.await。不然,出队。出队后如果元素个数大于0,会唤醒其它的阻塞的出队线程。出队从队头出队
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。