SpringBoot的RabbitMQ消息队列: 三、第二模式Work queues

本文主要是介绍SpringBoot的RabbitMQ消息队列: 三、第二模式Work queues,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  上一节的两个工程,一个负责发送,一个负责接收,也就是一一对于的关系。

     只要消息发出了,接收者就处理;当接收效率较低时,就会出现接收者处理不过来,我们就可能会处理不过来,于是我们就可能多配置接受者。这个模式就是"Work queues",它的结构如下


    多个接收者,它们会出现什么情况呢?是否像大锅饭,有的人撑死,有的人饿死。这个通过例子验证。

一、再建一个接收者工程 HelloReceiving2


1、把HelloReceiver工程中的HelloRabbitConfig、HelloReceiver、logback.xml依次拷贝过去

2、修改application.properties为

[html]  view plain copy
  1. #服务器配置  
  2. spring.application.name=rabbitmq-hello-receiving  
  3. server.port=9092   
  4. #rabbitmq连接参数  
  5. spring.rabbitmq.host=localhost  
  6. spring.rabbitmq.port=5672  
  7. spring.rabbitmq.username=test  
  8. spring.rabbitmq.password=123456  

二、运行工程

1、在工程HelloSending所在文件夹打开cmd,运行mvn spring-boot:run

2、在工程HelloReceiving所在文件夹打开cmd,运行mvn spring-boot:run

3、在工程HelloReceiving2所在文件夹打开cmd,运行mvn spring-boot:run
4、在浏览器中输入http://localhost:9080/send/上帝1,http://localhost:9080/send/上帝2,http://localhost:9080/send/上帝3
观察两个Receiving的日志.

查看出不均衡吧,为了突出这个不公平,我们修改发送代码如下

[html]  view plain copy
  1. package com.example;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import org.slf4j.Logger;  
  6. import org.slf4j.LoggerFactory;  
  7. import org.springframework.amqp.core.AmqpTemplate;  
  8. import org.springframework.beans.factory.annotation.Autowired;  
  9. import org.springframework.stereotype.Component;  
  10.   
  11. @Component  
  12. public class HelloSender {  
  13.   
  14.     protected static Logger logger=LoggerFactory.getLogger(HelloSender.class);   
  15.       
  16.     @Autowired  
  17.     private AmqpTemplate rabbitTemplate;  
  18.   
  19.     public String send(String name) {  
  20.         String context = "hello "+name+" --" + new Date();  
  21.         String sendStr;  
  22.         for(int i=1;i<=100;i++){  
  23.             sendStr="第["+i+"]个 hello "+name+" --" + new Date();   
  24.             logger.debug("HelloSender: " + sendStr);  
  25.             this.rabbitTemplate.convertAndSend("hello", sendStr);  
  26.         }  
  27.         return context;  
  28.     }  
  29. }  

再次http://localhost:9080/send/上帝,会发现更多的不公平。

三、Message acknowledgment 消息确认

1、默认情况下,RabbitMQ 会顺序的分发每个Message。当分发后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin

2、每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。

3、如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。

4、为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

5、在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。

6、如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

7、这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

    消息确认,对于spring-boot来说,就是一个开关,它就是spring.rabbitmq.listener.acknowledge-mode

    acknowledgeMode有三值:

    A、NONE = no acks will be sent (incompatible with channelTransacted=true).

          RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.

    B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().

    C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.

Note that acknowledgeMode is complementary to channelTransacted - if the channel is transacted then the broker requires a commit notification in addition to the ack. This is the default mode. See also txSize.

    非常简单,在application.properties中增加spring.rabbitmq.listener.acknowledge-mode=AUTO

为了更好的演示异常,我们把生产者、消费者都做了sleep.代码如下:

sending

[html]  view plain copy
  1. package com.example;  
  2.   
  3. import java.util.Date;  
  4.   
  5. import org.slf4j.Logger;  
  6. import org.slf4j.LoggerFactory;  
  7. import org.springframework.amqp.core.AmqpTemplate;  
  8. import org.springframework.beans.factory.annotation.Autowired;  
  9. import org.springframework.stereotype.Component;  
  10.   
  11. @Component  
  12. public class HelloSender {  
  13.   
  14.     protected static Logger logger=LoggerFactory.getLogger(HelloSender.class);   
  15.       
  16.     @Autowired  
  17.     private AmqpTemplate rabbitTemplate;  
  18.   
  19.     public String send(String name) throws InterruptedException {  
  20.         String context = "hello "+name+" --" + new Date();  
  21.         String sendStr;  
  22.         for(int i=1;i<=100;i++){  
  23.             sendStr="第["+i+"]个 hello "+name+" --" + new Date();   
  24.             logger.debug("HelloSender: " + sendStr);  
  25.             this.rabbitTemplate.convertAndSend("hello", sendStr);  
  26.             Thread.sleep(1000);  
  27.         }  
  28.         return context;  
  29.     }  
  30. }  
Receiving

[html]  view plain copy
  1. package com.example;  
  2.   
  3. import org.slf4j.Logger;  
  4. import org.slf4j.LoggerFactory;  
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;  
  7. import org.springframework.stereotype.Component;  
  8.   
  9. @Component  
  10. @RabbitListener(queues = "hello")  
  11. public class HelloReceiver {  
  12.     protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);  
  13.   
  14.     @RabbitHandler  
  15.     public void process(String hello) {  
  16.         logger.debug("HelloReceiver : " + hello);  
  17.         try {  
  18.             Thread.sleep(2000);  
  19.         } catch (InterruptedException e) {  
  20.             // TODO Auto-generated catch block  
  21.             e.printStackTrace();  
  22.         }  
  23.     }  
  24. }  

Receiving2

[html]  view plain copy
  1. package com.example;  
  2.   
  3. import org.slf4j.Logger;  
  4. import org.slf4j.LoggerFactory;  
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;  
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;  
  7. import org.springframework.stereotype.Component;  
  8.   
  9. @Component  
  10. @RabbitListener(queues = "hello")  
  11. public class HelloReceiver {  
  12.     protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);  
  13.   
  14.     @RabbitHandler  
  15.     public void process(String hello) {  
  16.         logger.debug("HelloReceiver : " + hello);  
  17.         try {  
  18.             Thread.sleep(3000);  
  19.         } catch (InterruptedException e) {  
  20.             // TODO Auto-generated catch block  
  21.             e.printStackTrace();  
  22.         }  
  23.     }  
  24. }  
你会注意到两个消费者的sleep时间不一样,这是为了方便异常退出一个之后,查看另一个是否接收并处理。

四、消息持久化

在上一节中我们知道了即使Consumer异常退出,Message也不会丢失。但是如果RabbitMQ Server退出呢?软件都有bug,即使RabbitMQ Server是完美毫无bug的(当然这是不可能的,是软件就有bug,没有bug的那不叫软件),它还是有可能退出的:被其它软件影响,或者系统重启了,系统panic了。。。

为了保证在RabbitMQ退出或者crash了数据仍没有丢失,需要将queue和Message都要持久化。

queue持久化,就是在实例时调用具有参数durable的构造函数.

[java]  view plain copy
  1. package com.example;  
  2.   
  3. import org.springframework.amqp.core.Queue;  
  4. import org.springframework.context.annotation.Bean;  
  5. import org.springframework.context.annotation.Configuration;  
  6.   
  7. @Configuration  
  8. public class HelloRabbitConfig {  
  9.   
  10.     @Bean  
  11.     public Queue helloQueue() {  
  12.         return new Queue("hello",true);  
  13.     }  
  14. }  

五、单消费者,排他队列

个人理解: 这个是queue的重载构造方法,要使用单消费者,就得使用这个构造方法

消息只能一个消费者接收处理,其它消费者只能看着,这也是队列实例时调用具有参数exclusive 的构造函数。

    /**
     * Construct a new queue, given a name, durability, exclusive and auto-delete flags.
     * @param name the name of the queue.
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
     * connection)
     * @param autoDelete true if the server should delete the queue when it is no longer in use
     */
    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, null);
    } 

六、参数


个人理解: 这个应该是配置文件里面的相关配置.

  • spring.rabbitmq.addresses指定client连接到的server的地址,多个以逗号分隔.

  • spring.rabbitmq.dynamic是否创建AmqpAdmin bean. 默认为: true)

  • spring.rabbitmq.host指定RabbitMQ host.默认为: localhost)

  • spring.rabbitmq.listener.acknowledge-mode指定Acknowledge的模式.

  • spring.rabbitmq.listener.auto-startup是否在启动时就启动mq,默认: true)

  • spring.rabbitmq.listener.concurrency指定最小的消费者数量.

  • spring.rabbitmq.listener.max-concurrency指定最大的消费者数量.

  • spring.rabbitmq.listener.prefetch指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.

  • spring.rabbitmq.listener.transaction-size指定一个事务处理的消息数量,最好是小于等于prefetch的数量.

  • spring.rabbitmq.password指定broker的密码.

  • spring.rabbitmq.port指定RabbitMQ 的端口,默认: 5672)

  • spring.rabbitmq.requested-heartbeat指定心跳超时,0为不指定.

  • spring.rabbitmq.ssl.enabled是否开始SSL,默认: false)

  • spring.rabbitmq.ssl.key-store指定持有SSL certificate的key store的路径

  • spring.rabbitmq.ssl.key-store-password指定访问key store的密码.

  • spring.rabbitmq.ssl.trust-store指定持有SSL certificates的Trust store.

  • spring.rabbitmq.ssl.trust-store-password指定访问trust store的密码.

  • spring.rabbitmq.username指定登陆broker的用户名.

  • spring.rabbitmq.virtual-host指定连接到broker的Virtual host.

这篇关于SpringBoot的RabbitMQ消息队列: 三、第二模式Work queues的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1095398

相关文章

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

Java中的StringBuilder之如何高效构建字符串

《Java中的StringBuilder之如何高效构建字符串》本文将深入浅出地介绍StringBuilder的使用方法、性能优势以及相关字符串处理技术,结合代码示例帮助读者更好地理解和应用,希望对大家... 目录关键点什么是 StringBuilder?为什么需要 StringBuilder?如何使用 St

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Maven中引入 springboot 相关依赖的方式(最新推荐)

《Maven中引入springboot相关依赖的方式(最新推荐)》:本文主要介绍Maven中引入springboot相关依赖的方式(最新推荐),本文给大家介绍的非常详细,对大家的学习或工作具有... 目录Maven中引入 springboot 相关依赖的方式1. 不使用版本管理(不推荐)2、使用版本管理(推

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

如何在 Spring Boot 中实现 FreeMarker 模板

《如何在SpringBoot中实现FreeMarker模板》FreeMarker是一种功能强大、轻量级的模板引擎,用于在Java应用中生成动态文本输出(如HTML、XML、邮件内容等),本文... 目录什么是 FreeMarker 模板?在 Spring Boot 中实现 FreeMarker 模板1. 环

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll