微服务:Rabbitmq的WorkQueue模型的使用、默认消费方式(消息队列中间件)

本文主要是介绍微服务:Rabbitmq的WorkQueue模型的使用、默认消费方式(消息队列中间件),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • WorkQueue模型
      • 控制预取消息个数

WorkQueue模型

当然,一个队列,可以由多个消费者去监听。

来实现一下.

生产者:

    @Testpublic void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message-";for (int i = 0; i < 50; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(30); // 每次}}

消费者(这里我们弄两个):

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:" + msg);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:" + msg);}
}

启动看一下结果:

先启动消费者,再发送大量消息:

在这里插入图片描述

这里是因为mq有预分配,一人一半,消费能力一样,所以看起来像是轮流一人执行了一次一样,其实不是,后面会说到。

先发送大量消息,再启动消费者:

在这里插入图片描述

这里是因为消费者1先启动了,2还没启动呢,就被1消费完了。

所以我们改造一下测试代码,让消费者消费能力不同,同时让消费者先都启动,然后再送大量消息:

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:" + msg);Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:" + msg);Thread.sleep(200);}
消费者2接收到消息:hello, message-0
消费者1接收到消息:hello, message-1
消费者1接收到消息:hello, message-3
消费者1接收到消息:hello, message-5
消费者2接收到消息:hello, message-2
消费者1接收到消息:hello, message-7
消费者1接收到消息:hello, message-9
消费者1接收到消息:hello, message-11
消费者2接收到消息:hello, message-4
消费者1接收到消息:hello, message-13
消费者1接收到消息:hello, message-15
消费者1接收到消息:hello, message-17
消费者1接收到消息:hello, message-19
消费者2接收到消息:hello, message-6
消费者1接收到消息:hello, message-21
消费者1接收到消息:hello, message-23
消费者1接收到消息:hello, message-25
消费者2接收到消息:hello, message-8
消费者1接收到消息:hello, message-27
消费者1接收到消息:hello, message-29
消费者1接收到消息:hello, message-31
消费者2接收到消息:hello, message-10
消费者1接收到消息:hello, message-33
消费者1接收到消息:hello, message-35
消费者1接收到消息:hello, message-37
消费者2接收到消息:hello, message-12
消费者1接收到消息:hello, message-39
消费者1接收到消息:hello, message-41
消费者1接收到消息:hello, message-43
消费者2接收到消息:hello, message-14
消费者1接收到消息:hello, message-45
消费者1接收到消息:hello, message-47
消费者1接收到消息:hello, message-49
消费者2接收到消息:hello, message-16
消费者2接收到消息:hello, message-18
消费者2接收到消息:hello, message-20
消费者2接收到消息:hello, message-22
消费者2接收到消息:hello, message-24
消费者2接收到消息:hello, message-26
消费者2接收到消息:hello, message-28
消费者2接收到消息:hello, message-30
消费者2接收到消息:hello, message-32
消费者2接收到消息:hello, message-34
消费者2接收到消息:hello, message-36
消费者2接收到消息:hello, message-38
消费者2接收到消息:hello, message-40
消费者2接收到消息:hello, message-42
消费者2接收到消息:hello, message-44
消费者2接收到消息:hello, message-46
消费者2接收到消息:hello, message-48

可以看到,其实rabbitmq默认有预分配(预取,每个消费者和队列中有一个通道,存放预取的消息),平均分消息,然后各自独立消费,所以消费者2要比消费者1消费完25条(50/2)消息时间长。

显然是不合理的,我们可以改造一下:

控制预取消息个数

配置中prefetch设置为1,每次消费完消息才取下一个。(能力越大,责任越大,消费快的,消费越多)

spring:rabbitmq:host: ip # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码listener:simple:prefetch: 1

重新试一下:

消费者2接收到消息:hello, message-0
消费者1接收到消息:hello, message-1
消费者1接收到消息:hello, message-2
消费者1接收到消息:hello, message-3
消费者1接收到消息:hello, message-4
消费者2接收到消息:hello, message-5
消费者1接收到消息:hello, message-6
消费者1接收到消息:hello, message-7
消费者1接收到消息:hello, message-8
消费者1接收到消息:hello, message-9
消费者2接收到消息:hello, message-10
消费者1接收到消息:hello, message-11
消费者1接收到消息:hello, message-12
消费者1接收到消息:hello, message-13
消费者1接收到消息:hello, message-14
消费者2接收到消息:hello, message-15
消费者1接收到消息:hello, message-16
消费者1接收到消息:hello, message-17
消费者1接收到消息:hello, message-18
消费者2接收到消息:hello, message-19
消费者1接收到消息:hello, message-20
消费者1接收到消息:hello, message-21
消费者1接收到消息:hello, message-22
消费者1接收到消息:hello, message-23
消费者2接收到消息:hello, message-24
消费者1接收到消息:hello, message-25
消费者1接收到消息:hello, message-26
消费者1接收到消息:hello, message-27
消费者1接收到消息:hello, message-28
消费者2接收到消息:hello, message-29
消费者1接收到消息:hello, message-30
消费者1接收到消息:hello, message-31
消费者1接收到消息:hello, message-32
消费者1接收到消息:hello, message-33
消费者2接收到消息:hello, message-34
消费者1接收到消息:hello, message-35
消费者1接收到消息:hello, message-36
消费者1接收到消息:hello, message-37
消费者1接收到消息:hello, message-38
消费者2接收到消息:hello, message-39
消费者1接收到消息:hello, message-40
消费者1接收到消息:hello, message-41
消费者1接收到消息:hello, message-42
消费者1接收到消息:hello, message-43
消费者2接收到消息:hello, message-44
消费者1接收到消息:hello, message-45
消费者1接收到消息:hello, message-46
消费者1接收到消息:hello, message-47
消费者1接收到消息:hello, message-48
消费者2接收到消息:hello, message-49

这样,消费能力大的(消费者1),消费的越多。

这篇关于微服务:Rabbitmq的WorkQueue模型的使用、默认消费方式(消息队列中间件)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1026672

相关文章

Python使用Spire.PDF实现为PDF添加水印

《Python使用Spire.PDF实现为PDF添加水印》在现代数字化办公环境中,PDF已成为一种广泛使用的文件格式,尤其是在需要保持文档格式时,下面我们就来看看如何使用Python为PDF文件添加水... 目录一、准备工作二、实现步骤1. 导入必要的库2. 创建 PdfDocument 对象3. 设置水印

python在word中插入目录和更新目录实现方式

《python在word中插入目录和更新目录实现方式》文章主要介绍了如何在Word文档中插入和更新目录,并提供了具体的代码示例,插入目录时,需要使用`TablesOfContents`对象,并设置使用... 目录1、插入目录2、更新目录总结1、插入目录需要用到对象:TablesOfContents目录的

Java中的ConcurrentBitSet使用小结

《Java中的ConcurrentBitSet使用小结》本文主要介绍了Java中的ConcurrentBitSet使用小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、核心澄清:Java标准库无内置ConcurrentBitSet二、推荐方案:Eclipse

Go语言结构体标签(Tag)的使用小结

《Go语言结构体标签(Tag)的使用小结》结构体标签Tag是Go语言中附加在结构体字段后的元数据字符串,用于提供额外的属性信息,这些信息可以通过反射在运行时读取和解析,下面就来详细的介绍一下Tag的使... 目录什么是结构体标签?基本语法常见的标签用途1.jsON 序列化/反序列化(最常用)2.数据库操作(

Java中ScopeValue的使用小结

《Java中ScopeValue的使用小结》Java21引入的ScopedValue是一种作用域内共享不可变数据的预览API,本文就来详细介绍一下Java中ScopeValue的使用小结,感兴趣的可以... 目录一、Java ScopedValue(作用域值)详解1. 定义与背景2. 核心特性3. 使用方法

spring中Interceptor的使用小结

《spring中Interceptor的使用小结》SpringInterceptor是SpringMVC提供的一种机制,用于在请求处理的不同阶段插入自定义逻辑,通过实现HandlerIntercept... 目录一、Interceptor 的核心概念二、Interceptor 的创建与配置三、拦截器的执行顺

Java中Map的五种遍历方式实现与对比

《Java中Map的五种遍历方式实现与对比》其实Map遍历藏着多种玩法,有的优雅简洁,有的性能拉满,今天咱们盘一盘这些进阶偏基础的遍历方式,告别重复又臃肿的代码,感兴趣的小伙伴可以了解下... 目录一、先搞懂:Map遍历的核心目标二、几种遍历方式的对比1. 传统EntrySet遍历(最通用)2. Lambd

Spring Boot 处理带文件表单的方式汇总

《SpringBoot处理带文件表单的方式汇总》本文详细介绍了六种处理文件上传的方式,包括@RequestParam、@RequestPart、@ModelAttribute、@ModelAttr... 目录方式 1:@RequestParam接收文件后端代码前端代码特点方式 2:@RequestPart接

C#中checked关键字的使用小结

《C#中checked关键字的使用小结》本文主要介绍了C#中checked关键字的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录✅ 为什么需要checked? 问题:整数溢出是“静默China编程”的(默认)checked的三种用

C#中预处理器指令的使用小结

《C#中预处理器指令的使用小结》本文主要介绍了C#中预处理器指令的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录 第 1 名:#if/#else/#elif/#endif✅用途:条件编译(绝对最常用!) 典型场景: 示例