1. 与ArrayBlockingQueue进行类比学习,加深各种数据结构的理解
  2. 了解底层实现,能够更好地理解每一种阻塞队列对线程池性能的影响,做到真正的知其然,且知其所以然
  • 源码分析LinkedBlockingQueue的实现
  • 与ArrayBlockingQueue进行比较
  • 说明为什么选择LinkedBlockingQueue作为固定大小的线程池的阻塞队列

LinkedBlockingQueue,见名之意,它是由一个基于链表的阻塞队列,首先看一下的核心组成:

通过上面的分析,我们可以发现LinkedBlockingQueue在入队列和出队列时使用的不是同一个Lock,这也意味着它们之间的操作不会存在互斥操作。在多个CPU的情况下,它们可以做到真正的在同一时刻既消费、又生产,能够做到并行处理。

下面让我们看下LinkedBlockingQueue的构造方法:

  1. * 如果用户没有显示指定capacity的值,默认使用int的最大值
  2. */
  3. public LinkedBlockingQueue() {
  4. this(Integer.MAX_VALUE);
  5. }
  6. /**
  7. 可以看到,当队列中没有任何元素的时候,此时队列的头部就等于队列的尾部,
  8. 指向的是同一个节点,并且元素的内容为null
  9. */
  10. public LinkedBlockingQueue(int capacity) {
  11. if (capacity <= 0) throw new IllegalArgumentException();
  12. this.capacity = capacity;
  13. last = head = new Node<E>(null);
  14. }
  15. /*
  16. 在初始化LinkedBlockingQueue的时候,还可以直接将一个集合
  17. 中的元素全部入队列,此时队列最大容量依然是int的最大值。
  18. */
  19. public LinkedBlockingQueue(Collection<? extends E> c) {
  20. this(Integer.MAX_VALUE);
  21. final ReentrantLock putLock = this.putLock;
  22. //获取锁
  23. putLock.lock(); // Never contended, but necessary for visibility
  24. try {
  25. //迭代集合中的每一个元素,让其入队列,并且更新一下当前队列中的元素数量
  26. int n = 0;
  27. for (E e : c) {
  28. if (e == null)
  29. throw new NullPointerException();
  30. if (n == capacity)
  31. throw new IllegalStateException("Queue full");
  32. //参考下面的enqueue分析
  33. enqueue(new Node<E>(e));
  34. ++n;
  35. }
  36. count.set(n);
  37. } finally {
  38. //释放锁
  39. putLock.unlock();
  40. }
  41. }
  42. /**
  43. * 其实下面的代码等价于如下内容:
  44. * last.next=node;
  45. * last = node;
  46. * 其实也没有什么花样:
  47. 就是让新入队列的元素成为原来的last的next,让进入的元素称为last
  48. */
  49. private void enqueue(Node<E> node) {
  50. // assert putLock.isHeldByCurrentThread();
  51. // assert last.next == null;
  52. last = last.next = node;
  53. }

在分析完LinkedBlockingQueue的核心组成之后,下面让我们再看下核心的几个操作方法,首先分析一下元素入队列的过程:

  1. /**
  2. 在BlockingQueue接口中除了定义put方法外(当队列元素满了之后就会阻塞,
  3. 直到队列有新的空间可以方法线程才会继续执行),还定义一个offer方法,
  4. 该方法会返回一个boolean值,当入队列成功返回true,入队列失败返回false。
  5. 该方法与put方法基本操作基本一致,只是有细微的差异。
  6. */
  7. public boolean offer(E e) {
  8. if (e == null) throw new NullPointerException();
  9. final AtomicInteger count = this.count;
  10. /*
  11. 当队列已经满了,它不会继续等待,而是直接返回。
  12. 因此该方法是非阻塞的。
  13. */
  14. if (count.get() == capacity)
  15. return false;
  16. int c = -1;
  17. Node<E> node = new Node(e);
  18. final ReentrantLock putLock = this.putLock;
  19. putLock.lock();
  20. try {
  21. /*
  22. 当获取到锁时,需要进行二次的检查,因为可能当队列的大小为capacity-1时,
  23. 两个线程同时去抢占锁,而只有一个线程抢占成功,那么此时
  24. 当线程将元素入队列后,释放锁,后面的线程抢占锁之后,此时队列
  25. 大小已经达到capacity,所以将它无法让元素入队列。
  26. 下面的其余操作和put都一样,此处不再详述
  27. */
  28. if (count.get() < capacity) {
  29. enqueue(node);
  30. c = count.getAndIncrement();
  31. if (c + 1 < capacity)
  32. notFull.signal();
  33. }
  34. } finally {
  35. putLock.unlock();
  36. }
  37. if (c == 0)
  38. signalNotEmpty();
  39. return c >= 0;
  40. }

BlockingQueue还定义了一个限时等待插入操作,即在等待一定的时间内,如果队列有空间可以插入,那么就将元素入队列,然后返回true,如果在过完指定的时间后依旧没有空间可以插入,那么就返回false,下面是限时等待操作的分析:

通过上面的分析,我们应该比较清楚地知道了LinkedBlockingQueue的入队列的操作,其主要是通过获取到putLock锁来完成,当队列的数量达到最大值,此时会导致线程处于阻塞状态或者返回false(根据具体的方法来看);如果队列还有剩余的空间,那么此时会新创建出一个Node对象,将其设置到队列的尾部,作为LinkedBlockingQueue的last元素。

在分析完入队列的过程之后,我们接下来看看LinkedBlockingQueue出队列的过程;由于BlockingQueue的方法都具有对称性,此处就只分析take方法的实现,其余方法的实现都如出一辙:

  1. public E take() throws InterruptedException {
  2. int c = -1;
  3. final AtomicInteger count = this.count;
  4. //通过takeLock获取锁,并且支持线程中断
  5. takeLock.lockInterruptibly();
  6. try {
  7. //当队列为空时,则让当前线程处于等待
  8. while (count.get() == 0) {
  9. notEmpty.await();
  10. }
  11. //完成元素的出队列
  12. x = dequeue();
  13. /*
  14. 队列元素个数完成原子化操作-1,可以看到count元素会
  15. 在插入元素的线程和获取元素的线程进行并发修改操作。
  16. */
  17. c = count.getAndDecrement();
  18. /*
  19. 当一个元素出队列之后,队列的大小依旧大于1时
  20. 当前线程会唤醒其他执行元素出队列的线程,让它们也
  21. 可以执行元素的获取
  22. */
  23. if (c > 1)
  24. notEmpty.signal();
  25. } finally {
  26. //完成锁的释放
  27. takeLock.unlock();
  28. }
  29. /*
  30. 当c==capaitcy时,即在获取当前元素之前,
  31. 队列已经满了,而此时获取元素之后,队列就会
  32. 空出一个位置,故当前线程会唤醒执行插入操作的线
  33. 程通知其他中的一个可以进行插入操作。
  34. */
  35. if (c == capacity)
  36. signalNotFull();
  37. return x;
  38. }
  39. /**
  40. * 让头部元素出队列的过程
  41. * 其最终的目的是让原来的head被GC回收,让其的next成为head
  42. * 并且新的head的item为null.
  43. * 因为LinkedBlockingQueue的头部具有一致性:即元素为null。
  44. */
  45. private E dequeue() {
  46. Node<E> h = head;
  47. Node<E> first = h.next;
  48. h.next = h; // help GC
  49. head = first;
  50. E x = first.item;
  51. first.item = null;
  52. }

ArrayBlockingQueue由于其底层基于数组,并且在创建时指定存储的大小,在完成后就会立即在内存分配固定大小容量的数组元素,因此其存储通常有限,故其是一个“有界“的阻塞队列;

而LinkedBlockingQueue可以由用户指定最大存储容量,也可以无需指定,如果不指定则最大存储容量将是Integer.MAX_VALUE,即可以看作是一个“无界”的阻塞队列,由于其节点的创建都是动态创建,并且在节点出队列后可以被GC所回收,因此其具有灵活的伸缩性。但是由于ArrayBlockingQueue的有界性,因此其能够更好的对于性能进行预测,而LinkedBlockingQueue由于没有限制大小,当任务非常多的时候,不停地向队列中存储,就有可能导致内存溢出的情况发生。

其次,ArrayBlockingQueue中在入队列和出队列操作过程中,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行,而LinkedBlockingQueue的读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。

JDK中选用LinkedBlockingQueue作为阻塞队列的原因就在于其无界性。因为线程大小固定的线程池,其线程的数量是不具备伸缩性的,当任务非常繁忙的时候,就势必会导致所有的线程都处于工作状态,如果使用一个有界的阻塞队列来进行处理,那么就非常有可能很快导致队列满的情况发生,从而导致任务无法提交而抛出RejectedExecutionException,而使用无界队列由于其良好的存储容量的伸缩性,可以很好的去缓冲任务繁忙情况下场景,即使任务非常多,也可以进行动态扩容,当任务被处理完成之后,队列中的节点也会被随之被GC回收,非常灵活。