本文主要是介绍rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
-
业务描述
由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个broker组成的集群):

-
producer如何实现
producer只需发送消息时调用如下方法即可
/*** 发送有序消息** @param messageMap 消息数据* @param selector 队列选择器,发送时会回调* @param order 回调队列选择器时,此参数会传入队列选择方法,提供配需规则* @return 发送结果*/
public Result<SendResult> send(Message msg, MessageQueueSelector selector, Object arg)
关键是如何实现MessageQueueSelector:
class IDHashMessageQueueSelector implements MessageQueueSelector{public MessageQueue select(List<MessageQueue> mqs, Message msg,Object arg) {int id = Integer.parseInt(arg.toString());int size = mqs.size();int index = id%size;return mqs.get(index);}
}
这样,所有的消息会根据消息的尾号,轮询的落到相应的queue上。参考图2,假设id=10001231,由于一共有20个queue,所以10001231%20=11,故消息会落到broker-b queue-1上。

-
consumer端如何实现
针对consumer由于没有限制是顺序消费,故可以采用集群消费模式的DefaultMQPushConsumer,由于一个消费者消费一类queue,故需要10个consumer group,比如consumer group0需要消费的queue为broker-a queue-0和broker-b queue-0,如下图的概示:

那么需要自己实现一个AllocateMessageQueueStrategy进行queue的分配,我们假设consumer group的名字格式需要提前定好,如xxx{queueid}ConsumerGroup,那么实现如下:
public class AllocateMessageQueueByHashAveragely extends AllocateMessageQueueAveragely{
private final Logger log = ClientLogger.getLog();
@Override
public String getName() {return super.getName()+"ByIDHash";
}@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID,List<MessageQueue> mqAll, List<String> cidAll) {char idChar = consumerGroup.charAt(consumerGroup.length() - "ConsumerGroup".length() - 1);int id = Integer.parseInt(idChar+"");List<MessageQueue> submq = new ArrayList<MessageQueue>();for(MessageQueue mq : mqAll) {if(mq.getQueueId() == idChar || mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {submq.add(mq);}}if(submq.size() == 0) {log.warn("allocate err:"+consumerGroup+","+currentCID+","+cidAll+","+mqAll);}return super.allocate(consumerGroup, currentCID, submq, cidAll);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
借助AllocateMessageQueueAveragely来实现,以便有多个jvm的消费者时,能够进行集群消费,但是针对上面这个例子,消费者jvm实例不能超过2个,至于为什么,参照下图:

这篇关于rocketmq问题汇总-如何将特定消息发送至特定queue,消费者从特定queue消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!