设计模式一:生产者消费者模式(及LinkedBlockingQueue的介绍使用)

本文主要是介绍设计模式一:生产者消费者模式(及LinkedBlockingQueue的介绍使用),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

一、为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

二、什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

三、生产者消费者模式实战

实例代码1:
package cn.thread;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;/*** 多线程模拟实现生产者/消费者模型*  * @author tkhoon*/
public class BlockingQueueTest2 {/*** * 定义装苹果的篮子* */public class Basket {// 篮子,能够容纳3个苹果BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);// 生产苹果,放入篮子public void produce() throws InterruptedException {// put方法放入一个苹果,若basket满了,等到basket有位置basket.put("An apple");}// 消费苹果,从篮子中取走public String consume() throws InterruptedException {// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)return basket.take();}}// 定义苹果生产者class Producer implements Runnable {private String instance;private Basket basket;public Producer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 生产苹果System.out.println("生产者准备生产苹果:" + instance);basket.produce();System.out.println("!生产者生产苹果完毕:" + instance);// 休眠300msThread.sleep(300);}} catch (InterruptedException ex) {System.out.println("Producer Interrupted");}}}// 定义苹果消费者class Consumer implements Runnable {private String instance;private Basket basket;public Consumer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 消费苹果System.out.println("消费者准备消费苹果:" + instance);System.out.println(basket.consume());System.out.println("!消费者消费苹果完毕:" + instance);// 休眠1000msThread.sleep(1000);}} catch (InterruptedException ex) {System.out.println("Consumer Interrupted");}}}public static void main(String[] args) {BlockingQueueTest2 test = new BlockingQueueTest2();// 建立一个装苹果的篮子Basket basket = test.new Basket();ExecutorService service = Executors.newCachedThreadPool();Producer producer = test.new Producer("生产者001", basket);Producer producer2 = test.new Producer("生产者002", basket);Consumer consumer = test.new Consumer("消费者001", basket);service.submit(producer);service.submit(producer2);service.submit(consumer);// 程序运行5s后,所有任务停止
//        try {
//            Thread.sleep(1000 * 5);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        service.shutdownNow();}}

代码中关于LinkedBlockingQueue的介绍会在稍后进行介绍。


实例代码2:
public class QuickEmailToWikiExtractor extends AbstractExtractor {private ThreadPoolExecutor      threadsPool;private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;public QuickEmailToWikiExtractor() {emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000));}
//执行的主方法
public void extract() {logger.debug("开始" + getExtractorName() + "。。");long start = System.currentTimeMillis();//抽取所有邮件放到队列里(生产者)new ExtractEmailTask().start();// 把队列里的文章插入到Wiki(消费者)insertToWiki();long end = System.currentTimeMillis();double cost = (end - start) / 1000;logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒");}/*** 把队列里的文章插入到Wiki*/private void insertToWiki() {//登录wiki,每间隔一段时间需要登录一次confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);while (true) {//2秒内取不到就退出ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);if (email == null) {break;}threadsPool.submit(new insertToWikiTask(email));}}protected List<Article> extractEmail() {List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();if (allEmails == null) {return null;}for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {emailQueue.offer(exchangeEmailShallowDTO);}return null;}/*** 抽取邮件任务(生产者类)** @author tengfei.fangtf*/public class ExtractEmailTask extends Thread {public void run() {extractEmail();}}
}


四、LinkedBlockingQueue介绍使用
在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

注:什么叫线程安全?这个首先要明确。线程安全就是说多线程访问同一代码,不会产生不确定的结果。

由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

常用API:

offer

将元素插入队列,成功返回true,如果当前没有可用的空间,则返回false

offer(E e, long timeout, TimeUnit unit) 

将元素插入队列,在到达指定的等待时间前等待可用的空间

E poll(long timeout, TimeUnit unit) 

获取并移除队列的头部,在指定的等待时间前等待可用的元素

void put(E e) 

将元素插入队列,将等待可用的空间(堵塞)

take() 

获取并移除队列的头部,在元素变得可用之前一直等待(堵塞)
  1.     //offer方法为非堵塞的  
  2.    //queue.offer(rnd.nextInt(100), 1, TimeUnit.SECONDS); //等待1秒后还不能加入队列则返回失败,放弃加入  
  3.  //queue.offer(rnd.nextInt(100));  
  1.     //poll方法为非堵塞的  
  2.    //Integer value = queue.poll(1, TimeUnit.SECONDS); //等待1秒后还没有数据可取则返回失败,放弃获取  
  3.   //Integer value = queue.poll();  


总结: 在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。

另外他们都是线程安全的,不用考虑线程同步问题。


这篇关于设计模式一:生产者消费者模式(及LinkedBlockingQueue的介绍使用)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/807813

相关文章

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

MyBatis ParameterHandler的具体使用

《MyBatisParameterHandler的具体使用》本文主要介绍了MyBatisParameterHandler的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录一、概述二、源码1 关键属性2.setParameters3.TypeHandler1.TypeHa

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

使用docker搭建嵌入式Linux开发环境

《使用docker搭建嵌入式Linux开发环境》本文主要介绍了使用docker搭建嵌入式Linux开发环境,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录1、前言2、安装docker3、编写容器管理脚本4、创建容器1、前言在日常开发全志、rk等不同

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca