`
liuluo129
  • 浏览: 114995 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

(转)支持生产阻塞的线程池

    博客分类:
  • java
阅读更多

在各种并发编程模型中,生产者-消费者模式大概是最常用的了。在实际工作中,对于生产消费的速度,通常需要做一下权衡。通常来说,生产任务的速度要大于消费的速度。一个细节问题是,队列长度,以及如何匹配生产和消费的速度。

一个典型的生产者-消费者模型如下:

producer-consumer

在并发环境下利用J.U.C提供的Queue实现可以很方便地保证生产和消费过程中的线程安全。这里需要注意的是,Queue必须设置初始容量,防止生产者生产过快导致队列长度暴涨,最终触发OutOfMemory。

对于一般的生产快于消费的情况。当队列已满时,我们并不希望有任何任务被忽略或得不到执行,此时生产者可以等待片刻再提交任务,更好的做法是,把生产者阻塞在提交任务的方法上,待队列未满时继续提交任务,这样就没有浪费的空转时间了。阻塞这一点也很容易,BlockingQueue就是为此打造的,ArrayBlockingQueue和LinkedBlockingQueue在构造时都可以提供容量做限制,其中LinkedBlockingQueue是在实际操作队列时在每次拿到锁以后判断容量。

更进一步,当队列为空时,消费者拿不到任务,可以等一会儿再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,当有任务时便可以立即获得执行,建议调用take的带超时参数的重载方法,超时后线程退出。这样当生产者事实上已经停止生产时,不至于让消费者无限等待。

于是一个高效的支持阻塞的生产消费模型就实现了。

等一下,既然J.U.C已经帮我们实现了线程池,为什么还要采用这一套东西?直接用ExecutorService不是更方便?

我们来看一下ThreadPoolExecutor的基本结构:

threadpoolexecutor

可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已经帮我们实现好了,并且直接采用线程池的实现还有很多优势,例如线程数的动态调整等。

但问题在于,即便你在构造ThreadPoolExecutor时手动指定了一个BlockingQueue作为队列实现,事实上当队列满时,execute方法并不会阻塞,原因在于ThreadPoolExecutor调用的是BlockingQueue非阻塞的offer方法:

public void execute(Runnable command){
    if(command ==null)
        thrownewNullPointerException();
    if(poolSize >= corePoolSize ||!addIfUnderCorePoolSize(command)){
        if(runState == RUNNING && workQueue.offer(command)){
            if(runState != RUNNING || poolSize ==0)
                ensureQueuedTaskHandled(command);
        }
        elseif(!addIfUnderMaximumPoolSize(command))
            reject(command);// is shutdown or saturated
    }
}

 这时候就需要做一些事情来达成一个结果:当生产者提交任务,而队列已满时,能够让生产者阻塞住,等待任务被消费。

关键在于,在并发环境下,队列满不能由生产者去判断,不能调用ThreadPoolExecutor.getQueue().size()来判断队列是否满。

线程池的实现中,当队列满时会调用构造时传入的RejectedExecutionHandler去拒绝任务的处理。默认的实现是AbortPolicy,直接抛出一个RejectedExecutionException。

几种拒绝策略在这里就不赘述了,这里和我们的需求比较接近的是CallerRunsPolicy,这种策略会在队列满时,让提交任务的线程去执行任务,相当于让生产者临时去干了消费者干的活儿,这样生产者虽然没有被阻塞,但提交任务也会被暂停。

public static class CallerRunsPolicy implements RejectedExecutionHandler{
    /**
     * Creates a <tt>CallerRunsPolicy</tt>.
     */
    public CallerRunsPolicy(){}

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r,ThreadPoolExecutor e){
        if(!e.isShutdown()){
            r.run();
        }
    }
}

 但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

new RejectedExecutionHandler(){
        @Override
        public void rejectedExecution(Runnable r,ThreadPoolExecutor executor){
                if(!executor.isShutdown()){
                        try{
                                executor.getQueue().put(r);
                        }catch(InterruptedException e){
                                // should not be interrupted
                        }
                }
        }
};

 这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。

 

转自:http://blog.hesey.net/2013/04/blocking-threadpool-executor.html

分享到:
评论

相关推荐

    支持生产阻塞的Java线程池

    在各种并发编程模型中,生产者-消费者模式大概是最常用的了。在实际工作中,对于生产消费的速度,通常需要做一下权衡

    米哈游笔试题目-Java方向.docx

    线程池类:需要设计一个线程池类,能够动态管理线程数量,支持可配置的线程池大小,并能够正确处理任务队列为空和线程池关闭的情况。 带超时功能的缓存类:需要设计一个缓存类,能够设置缓存项的过期时间,并能够...

    异步HttpClient

    主要解决生产环境中同步httpclient造成的IO阻塞问题。同步http请求将导致 tomcat 的业务线程被阻塞。一旦某接口网络出现问题,可能会阻塞tomcat业务线程,从而无法处理正常业务。很多公司使用另开线程池的方式进行...

    kafka-on-actors:用于 Scala 的基于 Akka Actors 的 Kafka 消费者生产者

    Kafka 的消费者本质上是阻塞的,其想法是让消费者在自己的线程池中运行,并将消息委托给在自己的线程池中运行的 worker-actor,同时实现以防止邮箱溢出且不阻塞。 val (zookepers, brokers, groupId) = ( " zk1:...

    Java并发编程实践 PDF 高清版

    5.3 阻塞队列和生产者一消费者模式 5.4 阻塞和可中断的方法 5.5 Synchronizer 5.6 为计算结果建立高效、可伸缩的高速缓存 第2部分 构建并发应用程序 第6章 任务执行 6.1 在线程中执行任务 6.2 Executor 框架 6.3 ...

    Java并发编程原理与实战

    Spring对并发的支持:Spring的异步任务.mp4 使用jdk8提供的lambda进行并行计算.mp4 了解多线程所带来的安全风险.mp4 从线程的优先级看饥饿问题.mp4 从Java字节码的角度看线程安全性问题.mp4 synchronized保证线程...

    Java并发编程(学习笔记).xmind

    生产者-消费者模式 中断的处理策略 传递InterruptedException 恢复中断,让更高层的代码处理 PriorityQueue(非并发) ConcurrentSkipListMap 替代同步的SortedMap ...

    Java并发编程实战

    5.3 阻塞队列和生产者-消费者模式73 5.3.1 示例:桌面搜索75 5.3.2 串行线程封闭76 5.3.3 双端队列与工作密取77 5.4 阻塞方法与中断方法77 5.5 同步工具类78 5.5.1 闭锁79 5.5.2 FutureTask80 5.5.3 信号量...

    龙果 java并发编程原理实战

    第9节Spring对并发的支持:Spring的异步任务00:11:10分钟 | 第10节使用jdk8提供的lambda进行并行计算00:14:22分钟 | 第11节了解多线程所带来的安全风险00:13:16分钟 | 第12节从线程的优先级看饥饿问题00:18:42...

    Java 并发编程原理与实战视频

    第9节Spring对并发的支持:Spring的异步任务00:11:10分钟 | 第10节使用jdk8提供的lambda进行并行计算00:14:22分钟 | 第11节了解多线程所带来的安全风险00:13:16分钟 | 第12节从线程的优先级看饥饿问题00:18:42...

    龙果java并发编程完整视频

    第9节Spring对并发的支持:Spring的异步任务00:11:10分钟 | 第10节使用jdk8提供的lambda进行并行计算00:14:22分钟 | 第11节了解多线程所带来的安全风险00:13:16分钟 | 第12节从线程的优先级看饥饿问题00:18:42...

    java并发编程

    第9节Spring对并发的支持:Spring的异步任务00:11:10分钟 | 第10节使用jdk8提供的lambda进行并行计算00:14:22分钟 | 第11节了解多线程所带来的安全风险00:13:16分钟 | 第12节从线程的优先级看饥饿问题00:18:42...

    一种基于Java企业内部及时通讯软件设计.doc

    IM 0引言 由于企业内管理、生产、销售等各个环节的信息流动与传递已成为企业正常生产与 运转的重要条件,搭建一个基于企业内部网络的即时通信平台的重要性不言而喻。然而 ,通用的商业IM软件依赖于互联网接入技术,...

    Java优化编程(第2版)

    9.5 优化消息目的地destination及消息生产者与消费者 9.6 优化消息对象及合理使用事务机制 9.7 影响jms性能的其他因素 小结 …… 第12章 java多线程技术与应用性能优化 12.1 java多线程技术 12.1.1 进程与线程 ...

    Java SE实践教程 源代码 下载

    6.5.2 生产者、消费者问题.. 137 6.6 小结 140 第7章 我要彩色照片——SWING的基本概念 141 7.1 讲解 142 7.1.1 Swing的基本概念 142 7.1.2 Swing组件继承关系 142 7.1.3 Swing组件一览 143 7.1.4 Swing和MVC...

    Java SE实践教程 pdf格式电子书 下载(一) 更新

    6.5.2 生产者、消费者问题.. 137 6.6 小结 140 第7章 我要彩色照片——SWING的基本概念 141 7.1 讲解 142 7.1.1 Swing的基本概念 142 7.1.2 Swing组件继承关系 142 7.1.3 Swing组件一览 143 7.1.4 Swing和MVC...

    Java SE实践教程 pdf格式电子书 下载(四) 更新

    6.5.2 生产者、消费者问题.. 137 6.6 小结 140 第7章 我要彩色照片——SWING的基本概念 141 7.1 讲解 142 7.1.1 Swing的基本概念 142 7.1.2 Swing组件继承关系 142 7.1.3 Swing组件一览 143 7.1.4 Swing和MVC...

    javaSE代码实例

    17.6 阻塞的栈操作 401 17.6.1 BlockingDeque接口与LinkedBlockingDeque类简介 401 17.6.2 LinkedBlockingDeque类的具体使用 402 17.7 线程安全的单变量操作 403 17.7.1 atomic包简介 403 17.7.2 atomic...

Global site tag (gtag.js) - Google Analytics