rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费

本文主要是介绍rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  1. 业务描述

    由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群): 
    图1

  2. producer如何实现

    producer只需发送消息时调用如下方法即可

    /*** 发送有序消息** @param messageMap 消息数据* @param selector   队列选择器,发送时会回调* @param order      回调队列选择器时,此参数会传入队列选择方法,提供配需规则* @return 发送结果*/
    public Result<SendResult> send(Message msg, MessageQueueSelector selector, Object arg)
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    关键是如何实现MessageQueueSelector:

    class IDHashMessageQueueSelector implements MessageQueueSelector{public MessageQueue select(List<MessageQueue> mqs, Message msg,Object arg) {int id = Integer.parseInt(arg.toString());int size = mqs.size();int index = id%size;return mqs.get(index);}
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这样,所有的消息会根据消息的尾号,轮询的落到相应的queue上。参考图2,假设id=10001231,由于一共有20个queue,所以10001231%20=11,故消息会落到broker-b queue-1上。 
    图2

  3. consumer端如何实现

    针对consumer由于没有限制是顺序消费,故可以采用集群消费模式的DefaultMQPushConsumer,由于一个消费者消费一类queue,故需要10个consumer group,比如consumer group0需要消费的queue为broker-a queue-0和broker-b queue-0,如下图的概示: 
    这里写图片描述
    那么需要自己实现一个AllocateMessageQueueStrategy进行queue的分配,我们假设consumer group的名字格式需要提前定好,如xxx{queueid}ConsumerGroup,那么实现如下:

    public class AllocateMessageQueueByHashAveragely extends AllocateMessageQueueAveragely{
    private final Logger log = ClientLogger.getLog();
    @Override
    public String getName() {return super.getName()+"ByIDHash";
    }@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID,List<MessageQueue> mqAll, List<String> cidAll) {//解析queue idchar idChar = consumerGroup.charAt(consumerGroup.length() - "ConsumerGroup".length() - 1);int id = Integer.parseInt(idChar+"");List<MessageQueue> submq = new ArrayList<MessageQueue>();//根据queue id分配相应的MessageQueuefor(MessageQueue mq : mqAll) {if(mq.getQueueId() == idChar || mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {submq.add(mq);}}if(submq.size() == 0) {log.warn("allocate err:"+consumerGroup+","+currentCID+","+cidAll+","+mqAll);}return super.allocate(consumerGroup, currentCID, submq, cidAll);
    }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    借助AllocateMessageQueueAveragely来实现,以便有多个jvm的消费者时,能够进行集群消费,但是针对上面这个例子,消费者jvm实例不能超过2个,至于为什么,参照下图: 
    这里写图片描述

这篇关于rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1089600

相关文章

解决tomcat启动时报Junit相关错误java.lang.ClassNotFoundException: org.junit.Test问题

《解决tomcat启动时报Junit相关错误java.lang.ClassNotFoundException:org.junit.Test问题》:本文主要介绍解决tomcat启动时报Junit相... 目录tomcat启动时报Junit相关错误Java.lang.ClassNotFoundException

解决Maven项目报错:failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.13.0的问题

《解决Maven项目报错:failedtoexecutegoalorg.apache.maven.plugins:maven-compiler-plugin:3.13.0的问题》这篇文章主要介... 目录Maven项目报错:failed to execute goal org.apache.maven.pl

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

MySQL主从同步延迟问题的全面解决方案

《MySQL主从同步延迟问题的全面解决方案》MySQL主从同步延迟是分布式数据库系统中的常见问题,会导致从库读取到过期数据,影响业务一致性,下面我将深入分析延迟原因并提供多层次的解决方案,需要的朋友可... 目录一、同步延迟原因深度分析1.1 主从复制原理回顾1.2 延迟产生的关键环节二、实时监控与诊断方案

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

SQLyog中DELIMITER执行存储过程时出现前置缩进问题的解决方法

《SQLyog中DELIMITER执行存储过程时出现前置缩进问题的解决方法》在SQLyog中执行存储过程时出现的前置缩进问题,实际上反映了SQLyog对SQL语句解析的一个特殊行为,本文给大家介绍了详... 目录问题根源正确写法示例永久解决方案为什么命令行不受影响?最佳实践建议问题根源SQLyog的语句分

Pandas中统计汇总可视化函数plot()的使用

《Pandas中统计汇总可视化函数plot()的使用》Pandas提供了许多强大的数据处理和分析功能,其中plot()函数就是其可视化功能的一个重要组成部分,本文主要介绍了Pandas中统计汇总可视化... 目录一、plot()函数简介二、plot()函数的基本用法三、plot()函数的参数详解四、使用pl

解决IDEA报错:编码GBK的不可映射字符问题

《解决IDEA报错:编码GBK的不可映射字符问题》:本文主要介绍解决IDEA报错:编码GBK的不可映射字符问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录IDEA报错:编码GBK的不可映射字符终端软件问题描述原因分析解决方案方法1:将命令改为方法2:右下jav

MyBatis模糊查询报错:ParserException: not supported.pos 问题解决

《MyBatis模糊查询报错:ParserException:notsupported.pos问题解决》本文主要介绍了MyBatis模糊查询报错:ParserException:notsuppo... 目录问题描述问题根源错误SQL解析逻辑深层原因分析三种解决方案方案一:使用CONCAT函数(推荐)方案二:

Redis 热 key 和大 key 问题小结

《Redis热key和大key问题小结》:本文主要介绍Redis热key和大key问题小结,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、什么是 Redis 热 key?热 key(Hot Key)定义: 热 key 常见表现:热 key 的风险:二、