第十八章-消息重推-客户端发起

2024-05-16 01:28

本文主要是介绍第十八章-消息重推-客户端发起,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

消息重推主要是针对消费端消费失败,需要下次再消费的情况,RocketMq中提供了多个重推的入口,这里不一一介绍,只延续章节16.5中的重推逻辑继续讲。

ConsumeMessageConcurrentlyService.sendMessageBack

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {/*** 消息消费的重试策略* -1,不重试,直接放到DLQ(死信队列)* 0,由broker控制重试频率,默认是这个* >0,由客户自己控制重试频率*/int delayLevel = context.getDelayLevelWhenNextConsume();// 在消费重推前,将 namespace包裹topicmsg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));try {// 进一步调用this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;
}

DefaultMQPushConsumerImpl.sendMessageBack

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {try {// 根据brokerName查找对应的broker 地址String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName): RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());// 进一步调用this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());} catch (Exception e) {log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);// 调用失败,那就重新生成一条新的消息,走正常发送消息的口径 send 方法来发送消息Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId = MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());// 正常消息发送口径this.mQClientFactory.getDefaultMQProducer().send(newMsg);} finally {// 将topic卸掉namespace的包装msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));}
}

MQClientAPIImpl.consumerSendMessageBack

public void consumerSendMessageBack(final String addr, // broker 地址final MessageExt msg, // 要重推的消息对象final String consumerGroup, // 消费者组名final int delayLevel,  // 消息重试策略final long timeoutMillis, // 超时时间,默认5000msfinal int maxConsumeRetryTimes // 最大重试次数,默认16次
) throws RemotingException, MQBrokerException, InterruptedException {// 组装发送到Broker的请求ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();// 消息重推的请求码是 RequestCode.CONSUMER_SEND_MSG_BACK,Broker端也根据这个码来处理的RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);// 设置消费者组名requestHeader.setGroup(consumerGroup);// 保留原始topic,因为消息重推,要修改这个topicrequestHeader.setOriginTopic(msg.getTopic());// 该消息的物理偏移,也就是在commitlog文件中的偏移requestHeader.setOffset(msg.getCommitLogOffset());// 消息重试策略requestHeader.setDelayLevel(delayLevel);// 保留消息的原始msgIdrequestHeader.setOriginMsgId(msg.getMsgId());// 设置最大重试次数requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 通过vip通道发送重试消息,vip通道就是原broker的端口-2,进一步调用RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;// 调用发送成功,直接返回上一层switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}

NettyRemotingClient.invokeSync

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {// 记录开始时间long beginStartTime = System.currentTimeMillis();// netty channelfinal Channel channel = this.getAndCreateChannel(addr);// 通道是激活状态if (channel != null && channel.isActive()) {try {// 执行前钩子方法,可以自定义doBeforeRpcHooks(addr, request);// 计算所花费时间,是否超过超时时间long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {// 超时,则抛出超时异常throw new RemotingTimeoutException("invokeSync call timeout");}// 再进一步调用RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);// 执行后钩子方法,可以自定义doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);// 返回调用结果return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {// channel 非激活状态,则关闭,并抛出连接异常this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
}

NettyRemotingClient.invokeSyncImpl

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque(); // 唯一标识此次请求的idtry {// 将响应处理封装成Future,并存放到 responseTable map 中,等待消息拉取成功后再处理final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();// 客户端channel(这里就是消费端),发送请求到服务端broker,并注册拉取后的监听channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Override// operationComplete 方法由谁调用呢,这个问题在`章节16.4`中讲过,可以看 ResponseFuture.executeInvokeCallback 方法,由channel收到拉取的消息后,再调用public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {// 拉取成功,设置发送状态成功标志responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}// 请求失败处理responseTable.remove(opaque);responseFuture.setCause(f.cause()); //失败原因responseFuture.putResponse(null); // 返回为nulllog.warn("send a request command to channel <" + addr + "> failed.");}});// 等待发送完成RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// null ,表示超时了if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {// 表示发送失败了,抛出失败原因throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}// 返回结果return responseCommand;} finally {this.responseTable.remove(opaque);}
}

至此,客户端的操作完成了,重点工作在Broker端的处理,将在下一章介绍。

这篇关于第十八章-消息重推-客户端发起的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/993478

相关文章

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

C#使用MQTTnet实现服务端与客户端的通讯的示例

《C#使用MQTTnet实现服务端与客户端的通讯的示例》本文主要介绍了C#使用MQTTnet实现服务端与客户端的通讯的示例,包括协议特性、连接管理、QoS机制和安全策略,具有一定的参考价值,感兴趣的可... 目录一、MQTT 协议简介二、MQTT 协议核心特性三、MQTTNET 库的核心功能四、服务端(BR

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

SpringBoot快速搭建TCP服务端和客户端全过程

《SpringBoot快速搭建TCP服务端和客户端全过程》:本文主要介绍SpringBoot快速搭建TCP服务端和客户端全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录TCPServerTCPClient总结由于工作需要,研究了SpringBoot搭建TCP通信的过程

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Redis客户端工具之RedisInsight的下载方式

《Redis客户端工具之RedisInsight的下载方式》RedisInsight是Redis官方提供的图形化客户端工具,下载步骤包括访问Redis官网、选择RedisInsight、下载链接、注册... 目录Redis客户端工具RedisInsight的下载一、点击进入Redis官网二、点击RedisI