`
cloudeagle_bupt
  • 浏览: 536297 次
文章分类
社区版块
存档分类
最新评论

Java并发包中的同步队列SynchronousQueue实现原理

 
阅读更多

转自: http://hugozhu.myalert.info/2013/03/05/java-SynchronousQueue-notes.html

介绍

Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

实现原理

同步队列的实现方法有许多:

阻塞算法实现

阻塞算法实现通常在内部采用一个锁来保证多个线程中的put()和take()方法是串行执行的。采用锁的开销是比较大的,还会存在一种情况是线程A持有线程B需要的锁,B必须一直等待A释放锁,即使A可能一段时间内因为B的优先级比较高而得不到时间片运行。所以在高性能的应用中我们常常希望规避锁的使用。

  1. public class NativeSynchronousQueue<E> {
  2. boolean putting = false;
  3. E item = null;
  4. public synchronized E take() throws InterruptedException {
  5. while (item == null)
  6. wait();
  7. E e = item;
  8. item = null;
  9. notifyAll();
  10. return e;
  11. }
  12. public synchronized void put(E e) throws InterruptedException {
  13. if (e==null) return;
  14. while (putting)
  15. wait();
  16. putting = true;
  17. item = e;
  18. notifyAll();
  19. while (item!=null)
  20. wait();
  21. putting = false;
  22. notifyAll();
  23. }
  24. }

信号量实现

经典同步队列实现采用了三个信号量,代码很简单,比较容易理解:

  1. public class SemaphoreSynchronousQueue<E> {
  2. E item = null;
  3. Semaphore sync = new Semaphore(0);
  4. Semaphore send = new Semaphore(1);
  5. Semaphore recv = new Semaphore(0);
  6. public E take() throws InterruptedException {
  7. recv.acquire();
  8. E x = item;
  9. sync.release();
  10. send.release();
  11. return x;
  12. }
  13. public void put (E x) throws InterruptedException{
  14. send.acquire();
  15. item = x;
  16. recv.release();
  17. sync.acquire();
  18. }
  19. }

在多核机器上,上面方法的同步代价仍然较高,操作系统调度器需要上千个时间片来阻塞或唤醒线程,而上面的实现即使在生产者put()时已经有一个消费者在等待的情况下,阻塞和唤醒的调用仍然需要。

Java 5实现

  1. public class Java5SynchronousQueue<E> {
  2. ReentrantLock qlock = new ReentrantLock();
  3. Queue waitingProducers = new Queue();
  4. Queue waitingConsumers = new Queue();
  5. static class Node extends AbstractQueuedSynchronizer {
  6. E item;
  7. Node next;
  8. Node(Object x) { item = x; }
  9. void waitForTake() { /* (uses AQS) */ }
  10. E waitForPut() { /* (uses AQS) */ }
  11. }
  12. public E take() {
  13. Node node;
  14. boolean mustWait;
  15. qlock.lock();
  16. node = waitingProducers.pop();
  17. if(mustWait = (node == null))
  18. node = waitingConsumers.push(null);
  19. qlock.unlock();
  20. if (mustWait)
  21. return node.waitForPut();
  22. else
  23. return node.item;
  24. }
  25. public void put(E e) {
  26. Node node;
  27. boolean mustWait;
  28. qlock.lock();
  29. node = waitingConsumers.pop();
  30. if (mustWait = (node == null))
  31. node = waitingProducers.push(e);
  32. qlock.unlock();
  33. if (mustWait)
  34. node.waitForTake();
  35. else
  36. node.item = e;
  37. }
  38. }

Java 5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。

Java6实现

Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 – 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化。

代码实现里的Dual Queue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:

  1. 持有数据 - put()方法的元素
  2. 持有请求 - take()方法

这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。

其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。

  1. /**
  2. * Shared internal API for dual stacks and queues.
  3. */
  4. static abstract class Transferer {
  5. /**
  6. * Performs a put or take.
  7. *
  8. * @param e if non-null, the item to be handed to a consumer;
  9. * if null, requests that transfer return an item
  10. * offered by producer.
  11. * @param timed if this operation should timeout
  12. * @param nanos the timeout, in nanoseconds
  13. * @return if non-null, the item provided or received; if null,
  14. * the operation failed due to timeout or interrupt --
  15. * the caller can distinguish which of these occurred
  16. * by checking Thread.interrupted.
  17. */
  18. abstract Object transfer(Object e, boolean timed, long nanos);
  19. }

TransferQueue实现如下(摘自Java 6源代码),入列和出列都基于Spin和CAS方法:

  1. /**
  2. * Puts or takes an item.
  3. */
  4. Object transfer(Object e, boolean timed, long nanos) {
  5. /* Basic algorithm is to loop trying to take either of
  6. * two actions:
  7. *
  8. * 1. If queue apparently empty or holding same-mode nodes,
  9. * try to add node to queue of waiters, wait to be
  10. * fulfilled (or cancelled) and return matching item.
  11. *
  12. * 2. If queue apparently contains waiting items, and this
  13. * call is of complementary mode, try to fulfill by CAS'ing
  14. * item field of waiting node and dequeuing it, and then
  15. * returning matching item.
  16. *
  17. * In each case, along the way, check for and try to help
  18. * advance head and tail on behalf of other stalled/slow
  19. * threads.
  20. *
  21. * The loop starts off with a null check guarding against
  22. * seeing uninitialized head or tail values. This never
  23. * happens in current SynchronousQueue, but could if
  24. * callers held non-volatile/final ref to the
  25. * transferer. The check is here anyway because it places
  26. * null checks at top of loop, which is usually faster
  27. * than having them implicitly interspersed.
  28. */
  29. QNode s = null; // constructed/reused as needed
  30. boolean isData = (e != null);
  31. for (;;) {
  32. QNode t = tail;
  33. QNode h = head;
  34. if (t == null || h == null) // saw uninitialized value
  35. continue; // spin
  36. if (h == t || t.isData == isData) { // empty or same-mode
  37. QNode tn = t.next;
  38. if (t != tail) // inconsistent read
  39. continue;
  40. if (tn != null) { // lagging tail
  41. advanceTail(t, tn);
  42. continue;
  43. }
  44. if (timed && nanos <= 0) // can't wait
  45. return null;
  46. if (s == null)
  47. s = new QNode(e, isData);
  48. if (!t.casNext(null, s)) // failed to link in
  49. continue;
  50. advanceTail(t, s); // swing tail and wait
  51. Object x = awaitFulfill(s, e, timed, nanos);
  52. if (x == s) { // wait was cancelled
  53. clean(t, s);
  54. return null;
  55. }
  56. if (!s.isOffList()) { // not already unlinked
  57. advanceHead(t, s); // unlink if head
  58. if (x != null) // and forget fields
  59. s.item = s;
  60. s.waiter = null;
  61. }
  62. return (x != null)? x : e;
  63. } else { // complementary-mode
  64. QNode m = h.next; // node to fulfill
  65. if (t != tail || m == null || h != head)
  66. continue; // inconsistent read
  67. Object x = m.item;
  68. if (isData == (x != null) || // m already fulfilled
  69. x == m || // m cancelled
  70. !m.casItem(x, e)) { // lost CAS
  71. advanceHead(h, m); // dequeue and retry
  72. continue;
  73. }
  74. advanceHead(h, m); // successfully fulfilled
  75. LockSupport.unpark(m.waiter);
  76. return (x != null)? x : e;
  77. }
  78. }
  79. }

参考文章

  1. Javadoc of SynchronousQueue
  2. Scalable Synchronous Queues
  3. Nonblocking Concurrent Data Structures with Condition Synchronization
分享到:
评论

相关推荐

    SynchronousQueue实现原理.pdf

    转载的一篇博客资源

    java 同步器SynchronousQueue详解及实例

    主要介绍了java 同步器SynchronousQueue详解及实例的相关资料,需要的朋友可以参考下

    java并发包资源

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    SynchronousQueue详解

    SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存储任何元素。 也就是说...

    SynchronousQueue核心属性和方法源码的分析

    SynchronousQueue核心属性和方法源码的分析的代码

    常见的Java笔试题-JVM-JUC-Core:JUCJVM核心知识点

    常见的Java笔试题 JUC、JMM核心知识点笔记 尚硅谷周阳老师课程——笔记。 / / JUC知识点 JMM volatile关键字 可见性 原子性 有序性 哪些地方用到过volatile? 单例模式的安全问题 CAS CAS底层原理 CAS缺点 ABA问题 ...

    java核心知识点整理.pdf

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

    java线程池概念.txt

    但是如果调用了allowCoreThreadTimeOut(boolean)方法并设置了参数为true,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的阻塞队列大小为0;(这部分通过查看...

    JAVA高并发_学习笔记

    JAVA学习高并发的学习笔记。同步非阻塞式IO:Buffere Channel Selector Concurrent包:Blocking:Queue\Concurrent Hash Map \ExcutorService\lock\原子性 BlockingQueue:ArrayBlockingQueue , DelayQueue , ...

    JAVA核心知识点整理(有效)

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

    第7章-JUC多线程v1.1.pdf

    以SynchronousQueue作为等待队列, 从而每次往队列中插入一个元素, 必须等待另一个线程从这个队列删除一个元素. 定时调度 4.定时调度的线程池 ExecutorService newCachedThreadPool = Executors....

    2004_DISC_dual_DS.pdf

    SynchronousQueue 底层算法相关实现论文

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段03讲、创建并启动线程.mp4 │ 高并发编程第一阶段04讲、线程生命周期以及start方法源码剖析.mp4 │ 高并发编程第一阶段05讲、采用多线程方式模拟银行排队叫号.mp4 │ 高并发编程第一阶段06...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第一阶段03讲、创建并启动线程.mp4 │ 高并发编程第一阶段04讲、线程生命周期以及start方法源码剖析.mp4 │ 高并发编程第一阶段05讲、采用多线程方式模拟银行排队叫号.mp4 │ 高并发编程第一阶段06...

Global site tag (gtag.js) - Google Analytics