Rocketmq源码分析(1)

2024-09-07 02:44
文章标签 分析 源码 rocketmq

本文主要是介绍Rocketmq源码分析(1),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

此次源码分析-rocketmq-spring-boot-starter,starter众所周知入口点就是AutoConfiguration.RocketMQAutoConfiguration.class
// 标识为配置类
@Configuration
//将RocketMQProperties识别为配置属性类,创建对象并注入到spring容器中
@EnableConfigurationProperties(RocketMQProperties.class)
// 当类路径中有MQAdmin.class 才启用本配置
@ConditionalOnClass({MQAdmin.class})
//条件启用配置,如果配置rocketmq.name-server不存在,默认会加载此配置,如果没有配置havingValue就意味着rocketmq.name-server 不是 false 就会加载此配置
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
// 把MessageConverterConfiguration、ListenerContainerConfiguration、ExtProducerResetConfiguration、RocketMQTransactionConfiguration 当做配置类加载进来
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
// 指定该配置类会在MessageConverterConfiguration配置类的后面加载
@AutoConfigureAfter({MessageConverterConfiguration.class})
// 指定该配置类会在RocketMQTransactionConfiguration配置类的前面加载
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})public class RocketMQAutoConfiguration {private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);public static final String ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME ="rocketMQTemplate";@Autowiredprivate Environment environment;@PostConstructpublic void checkProperties() {String nameServer = environment.getProperty("rocketmq.name-server", String.class);log.debug("rocketmq.nameServer = {}", nameServer);if (nameServer == null) {log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");}}@Bean@ConditionalOnMissingBean(DefaultMQProducer.class)@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();String nameServer = rocketMQProperties.getNameServer();String groupName = producerConfig.getGroup();Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");String accessChannel = rocketMQProperties.getAccessChannel();String ak = rocketMQProperties.getProducer().getAccessKey();String sk = rocketMQProperties.getProducer().getSecretKey();boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);producer.setNamesrvAddr(nameServer);if (!StringUtils.isEmpty(accessChannel)) {producer.setAccessChannel(AccessChannel.valueOf(accessChannel));}producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());producer.setMaxMessageSize(producerConfig.getMaxMessageSize());producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());return producer;}@Bean(destroyMethod = "destroy")@ConditionalOnBean(DefaultMQProducer.class)@ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME)public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,RocketMQMessageConverter rocketMQMessageConverter) {RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();rocketMQTemplate.setProducer(mqProducer);rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());return rocketMQTemplate;}
}

重点 是import注解 引入的这几个配置类。 首先第一个RocketMQTransactionConfiguration配置类,它是来解析RocketMQMessageListener 注解修饰的业务逻辑类的。想要知道RocketMQMessageListener 具体实现方式,就要阅读此类的代码。

@Configuration
// 此处实现ApplicationContextAware是为了获取容器上下文, 实现SmartInitializingSingleton是为了在单例bean全部初始化后的回调中做一些处理
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {...public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,...}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = (ConfigurableApplicationContext) applicationContext;}/**** 此处是重点*/@Overridepublic void afterSingletonsInstantiated() {// 获取容器中所有RocketMQMessageListener注解修饰的bean,并且不是代理对象。Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));// 循环处理 beans.forEach(this::registerContainer);}private void registerContainer(String beanName, Object bean) {Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);...// 获取注解RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);// 解析占位符 获取最终的消费者组的名称String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());String topic = this.environment.resolvePlaceholders(annotation.topic());// 获取消费者监听器的状态 默认为trueboolean listenerEnabled =(boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP).getOrDefault(topic, true);...// bean的名字  加后缀区分String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;// 注册监听器genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);if (!container.isRunning()) {try {// 启动监听器container.start();} catch (Exception e) {log.error("Started container failed. {}", container, e);throw new RuntimeException(e);}}log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);}...}

最后进入的start

public synchronized void start() throws MQClientException {switch (this.serviceState) {//状态的处理}// 当订阅改变的时候 需要更新topic订阅信息this.updateTopicSubscribeInfoWhenSubscriptionChanged();// 检查客户端是否已经在 Broker 中注册this.mQClientFactory.checkClientInBroker();// 发送心跳包给所有的brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 触发负载均衡消费者组this.mQClientFactory.rebalanceImmediately();}

总结上述源码,扫描所有mq的监听器注解,把注解修饰的类注册到容器中,并启动监听。再往深入的start()方法中看,就可以看到消费者、topic、tag相关信息的获取级处理过程,最后是启动。

这篇关于Rocketmq源码分析(1)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1143833

相关文章

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1

Olingo分析和实践之ODataImpl详细分析(重要方法详解)

《Olingo分析和实践之ODataImpl详细分析(重要方法详解)》ODataImpl.java是ApacheOlingoOData框架的核心工厂类,负责创建序列化器、反序列化器和处理器等组件,... 目录概述主要职责类结构与继承关系核心功能分析1. 序列化器管理2. 反序列化器管理3. 处理器管理重要方

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

解决1093 - You can‘t specify target table报错问题及原因分析

《解决1093-Youcan‘tspecifytargettable报错问题及原因分析》MySQL1093错误因UPDATE/DELETE语句的FROM子句直接引用目标表或嵌套子查询导致,... 目录报js错原因分析具体原因解决办法方法一:使用临时表方法二:使用JOIN方法三:使用EXISTS示例总结报错原

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,