RabbitMQ消费端单线程与多线程案例讲解

2025-07-26 20:50

本文主要是介绍RabbitMQ消费端单线程与多线程案例讲解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe...

一、基础概念

模型消费者数量每个消费者内部线程数顺序性场景说明
单消费者单线程11✅ 保序处理逻辑简单,保证顺序的常见场景
单消费者多线程1>1❌ 不保序提升处理能力,放弃顺序要求
多消费者单线程>11❌ 不保序多个队列/分区消费,提升并发
多消费者多线程>1>1❌ 不保序高并发场景下批量处理,放弃顺序
concurrenphpcy# 初始消费者线程数
max-concurrency# 最大消费者线程数
prefetch# 每个消费者预取的消息数
  • concurrency: 2
    • 表示初始创建的消费者线程数量
    • 系统启动时会立即创建 2 个消费者线程
    • 这些线程会持续监听消息队列
  • max-concurrency: 2
    • 表示允许的最大消费者线程数量
    • 这里设置为 2(与 concurrency 相同),表示线程数不会动态扩展
    • 如果设置 max-concurrency > concurrency,系统会在负载高时动态增加消费者

详细解释:

       concurrency和max-concurrency不会影响每个消费者是否是多线程执行,只会导致有多个消费者线程,只有用线程池才会导致每个消费者多线程消费

        而没有用线程池,也设置prefetch是因为消息被大量预取,单线程处理不过来时堆积等待,单线程并不会影响消息的顺序性,只有使用了线程池才会影响

        使用了线程池一定会导致消息顺序性问题这与设不设置prefetch无关,因为使用线程池后,任务交个线程池就返回了属于异步

举个例子:

                1. RabbitMQ 给消费者推送消息1,消费者收到,提交给线程池任务A(耗时长)。

                2. 消费者马上ACK消息1(因为业务交给线程池了,自己处理完毕的感觉

                3.  RabbitMQ 再给消费者推送消息2,消费者收到,提交给线程池任务B(耗时短)。

                4. R线程池调度先跑完任务B,后跑任务A。

✅ 单消费者 + 单线程消费

  • 保证顺序:消费者内部串行执行。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 1
        max-concurrency: 1
        prefetch: 1

消费者代码

@Component
public class MultiConsumerSingleThread {
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {javascript
            e.printStackTrace();
        }
    }
}

❌ 单消费者 + 多线程消费

  • 不保顺序:一个消费者使用线程池异步处理消息。
  • 配置关键:默认配置 + 手动异js步处理
  • 消费者代码
@Component
public class MultiThreadConsumer {
    private final ExecutorService executor = Executors.newFixedThreadPool(5);
    @RabbitListener(queues = "order_queue")
    public void receive(String message) {
        executor.subhttp://www.chinasem.cnmit(() -> {
            System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500); // 模拟耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

说明:消息提交到线程池,先到的不一定先处理完成,顺序可能乱。

❌ 多消费者 + 单线程消费

  • 不保顺序:多个消费者实例轮询分配消息,各自顺序保留,但整体顺序错乱。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 2
        max-concurrency: 2
        prefetch: 1

消费者代码(共享类,也可拆成多个类模拟多实例)

@Component
public class MultiConsumerSingleThread {
    //concurrency = "2":它和配置文件中的 concurrency: 2 作用一致,但优先级更高。
    @RabbitListener(queues = "order_queue", concurrency = "2")
    public void receive(String message) {
        System.out.println(" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

❌ 多消费者 + 多线程消费

  • 不保顺序:每个消费者又使用线程池异步处理消息,最大吞吐量模式。
  • 适合场景:数据导入、日志收集、发送通知等对顺序无要求的批量处理。
  • 配置关键
spring:
  rabbitmq:
    listener:
      simple:
        concurrency: 3
        max-concurrency: 3
        prefetch: 10

消费者代码

@Component
public class MultiConsumerMultiThread {
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    @RabbitListener(queues = "order_queue", concurrency = "3")
    public void receive(String message) {
        executor.submit(() -> {
            System.out.println(www.chinasem.cn" [线程:" + Thread.currentThread().getName() + "] 收到消息:" + message);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

补充说明

  • concurrency: 控制并发消费者数量,等于消费者数。
  • prefetch: 控制每个消费者本地最多拉取多少条消息(如 1 表示严格串行处理)。
  • 每个 @RabbitListener 本质上是一个容器,可以通过 concurrency 配置“实例个数”。

到此这篇关于RabbitMQ消费端单线程与多线程的文章就介绍到这了,更多相关RabbitMQ单线程与多线程内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于RabbitMQ消费端单线程与多线程案例讲解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155491

相关文章

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

Java 正则表达式的使用实战案例

《Java正则表达式的使用实战案例》本文详细介绍了Java正则表达式的使用方法,涵盖语法细节、核心类方法、高级特性及实战案例,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、正则表达式语法详解1. 基础字符匹配2. 字符类([]定义)3. 量词(控制匹配次数)4. 边

Python Counter 函数使用案例

《PythonCounter函数使用案例》Counter是collections模块中的一个类,专门用于对可迭代对象中的元素进行计数,接下来通过本文给大家介绍PythonCounter函数使用案例... 目录一、Counter函数概述二、基本使用案例(一)列表元素计数(二)字符串字符计数(三)元组计数三、C

Python多线程实现大文件快速下载的代码实现

《Python多线程实现大文件快速下载的代码实现》在互联网时代,文件下载是日常操作之一,尤其是大文件,然而,网络条件不稳定或带宽有限时,下载速度会变得很慢,本文将介绍如何使用Python实现多线程下载... 目录引言一、多线程下载原理二、python实现多线程下载代码说明:三、实战案例四、注意事项五、总结引

Unity新手入门学习殿堂级知识详细讲解(图文)

《Unity新手入门学习殿堂级知识详细讲解(图文)》Unity是一款跨平台游戏引擎,支持2D/3D及VR/AR开发,核心功能模块包括图形、音频、物理等,通过可视化编辑器与脚本扩展实现开发,项目结构含A... 目录入门概述什么是 UnityUnity引擎基础认知编辑器核心操作Unity 编辑器项目模式分类工程

spring AMQP代码生成rabbitmq的exchange and queue教程

《springAMQP代码生成rabbitmq的exchangeandqueue教程》使用SpringAMQP代码直接创建RabbitMQexchange和queue,并确保绑定关系自动成立,简... 目录spring AMQP代码生成rabbitmq的exchange and 编程queue执行结果总结s

Spring Boot 整合 SSE(Server-Sent Events)实战案例(全网最全)

《SpringBoot整合SSE(Server-SentEvents)实战案例(全网最全)》本文通过实战案例讲解SpringBoot整合SSE技术,涵盖实现原理、代码配置、异常处理及前端交互,... 目录Spring Boot 整合 SSE(Server-Sent Events)1、简述SSE与其他技术的对

Python多线程应用中的卡死问题优化方案指南

《Python多线程应用中的卡死问题优化方案指南》在利用Python语言开发某查询软件时,遇到了点击搜索按钮后软件卡死的问题,本文将简单分析一下出现的原因以及对应的优化方案,希望对大家有所帮助... 目录问题描述优化方案1. 网络请求优化2. 多线程架构优化3. 全局异常处理4. 配置管理优化优化效果1.

MySQL 临时表与复制表操作全流程案例

《MySQL临时表与复制表操作全流程案例》本文介绍MySQL临时表与复制表的区别与使用,涵盖生命周期、存储机制、操作限制、创建方法及常见问题,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随小... 目录一、mysql 临时表(一)核心特性拓展(二)操作全流程案例1. 复杂查询中的临时表应用2. 临时