RabbitMQ消费端单线程与多线程案例讲解

2025-07-26 20:50

本文主要是介绍RabbitMQ消费端单线程与多线程案例讲解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe...

一、基础概念

模型消费者数量每个消费者内部线程数顺序性场景说明
单消费者单线程11✅ 保序处理逻辑简单,保证顺序的常见场景
单消费者多线程1>1❌ 不保序提升处理能力,放弃顺序要求
多消费者单线程>11❌ 不保序多个队列/分区消费,提升并发
多消费者多线程>1>1❌ 不保序高并发场景下批量处理,放弃顺序
concurrenphpcy# 初始消费者线程数
max-concurrency# 最大消费者线程数
prefetch# 每个消费者预取的消息数
  • concurrency: 2
    • 表示初始创建的消费者线程数量
    • 系统启动时会立即创建 2 个消费者线程
    • 这些线程会持续监听消息队列
  • max-concurrency: 2
    • 表示允许的最大消费者线程数量
    • 这里设置为 2(与 concurrency 相同),表示线程数不会动态扩展
    • 如果设置 max-concurrency > concurrency,系统会在负载高时动态增加消费者

详细解释:

       concurrency和max-concurrency不会影响每个消费者是否是多线程执行,只会导致有多个消费者线程,只有用线程池才会导致每个消费者多线程消费

        而没有用线程池,也设置prefetch是因为消息被大量预取,单线程处理不过来时堆积等待,单线程并不会影响消息的顺序性,只有使用了线程池才会影响

        使用了线程池一定会导致消息顺序性问题这与设不设置prefetch无关,因为使用线程池后,任务交个线程池就返回了属于异步

举个例子:

                1. RabbitMQ 给消费者推送消息1,消费者收到,提交给线程池任务A(耗时长)。

                2. 消费者马上ACK消息1(因为业务交给线程池了,自己处理完毕的感觉

                3.  RabbitMQ 再给消费者推送消息2,消费者收到,提交给线程池任务B(耗时短)。

                4. R线程池调度先跑完任务B,后跑任务A。

✅ 单消费者 + 单线程消费

  • 保证顺序:消费者内部串行执行。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 1
        max-concurrency: 1
        prefetch: 1

消费者代码

@Component
public class MultiConsumerSingleThread {
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {javascript
            e.printStackTrace();
        }
    }
}

❌ 单消费者 + 多线程消费

  • 不保顺序:一个消费者使用线程池异步处理消息。
  • 配置关键:默认配置 + 手动异js步处理
  • 消费者代码
@Component
public class MultiThreadConsumer {
    private final ExecutorService executor = Executors.newFixedThreadPool(5);
    @RabbitListener(queues = "order_queue")
    public void receive(String message) {
        executor.subhttp://www.chinasem.cnmit(() -> {
            System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500); // 模拟耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

说明:消息提交到线程池,先到的不一定先处理完成,顺序可能乱。

❌ 多消费者 + 单线程消费

  • 不保顺序:多个消费者实例轮询分配消息,各自顺序保留,但整体顺序错乱。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 2
        max-concurrency: 2
        prefetch: 1

消费者代码(共享类,也可拆成多个类模拟多实例)

@Component
public class MultiConsumerSingleThread {
    //concurrency = "2":它和配置文件中的 concurrency: 2 作用一致,但优先级更高。
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

❌ 多消费者 + 多线程消费

  • 不保顺序:每个消费者又使用线程池异步处理消息,最大吞吐量模式。
  • 适合场景:数据导入、日志收集、发送通知等对顺序无要求的批量处理。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 3
        max-concurrency: 3
        prefetch: 10

消费者代码

@Component
public class MultiConsumerMultiThread {
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    @RabbitListener(queues = "order_queue", concurrency = "3")
    public void receive(String message) {
        executor.submit(() -> {
            System.out.println(www.chinasem.cn" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

补充说明

  • concurrency: 控制并发消费者数量,等于消费者数。
  • prefetch: 控制每个消费者本地最多拉取多少条消息(如 1 表示严格串行处理)。
  • 每个 @RabbitListener 本质上是一个容器,可以通过 concurrency 配置“实例个数”。

到此这篇关于RabbitMQ消费端单线程与多线程的文章就介绍到这了,更多相关RabbitMQ单线程与多线程内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于RabbitMQ消费端单线程与多线程案例讲解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155491

相关文章

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

从入门到进阶讲解Python自动化Playwright实战指南

《从入门到进阶讲解Python自动化Playwright实战指南》Playwright是针对Python语言的纯自动化工具,它可以通过单个API自动执行Chromium,Firefox和WebKit... 目录Playwright 简介核心优势安装步骤观点与案例结合Playwright 核心功能从零开始学习

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

Javaee多线程之进程和线程之间的区别和联系(最新整理)

《Javaee多线程之进程和线程之间的区别和联系(最新整理)》进程是资源分配单位,线程是调度执行单位,共享资源更高效,创建线程五种方式:继承Thread、Runnable接口、匿名类、lambda,r... 目录进程和线程进程线程进程和线程的区别创建线程的五种写法继承Thread,重写run实现Runnab

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中re模块结合正则表达式的实际应用案例

《Python中re模块结合正则表达式的实际应用案例》Python中的re模块是用于处理正则表达式的强大工具,正则表达式是一种用来匹配字符串的模式,它可以在文本中搜索和匹配特定的字符串模式,这篇文章主... 目录前言re模块常用函数一、查看文本中是否包含 A 或 B 字符串二、替换多个关键词为统一格式三、提

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

嵌入式数据库SQLite 3配置使用讲解

《嵌入式数据库SQLite3配置使用讲解》本文强调嵌入式项目中SQLite3数据库的重要性,因其零配置、轻量级、跨平台及事务处理特性,可保障数据溯源与责任明确,详细讲解安装配置、基础语法及SQLit... 目录0、惨痛教训1、SQLite3环境配置(1)、下载安装SQLite库(2)、解压下载的文件(3)、