RocketMQ源码分析----Producer启动过程

2024-08-30 09:58

本文主要是介绍RocketMQ源码分析----Producer启动过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

总体流程

首先从demo为入口分析整个启动过程

	public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {String message = "send message";final Message msg = new Message("topic", "tag",// tagmessage.getBytes());// bodySendResult sendResult = producer.send(msg);producer.shutdown();} catch (Exception e) {e.printStackTrace();}}

DefaultMQProducer构造方法如下:

    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}public DefaultMQProducer(final String producerGroup) {this(producerGroup, null);}

然后调用start方法后,即开始初始化流程,其间接调用了DefaultMQProducerImpl的start方法,即上面构造方法创建的DefaultMQProducerImpl,代码如下:

    public void start() throws MQClientException {this.start(true);}public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查producerGroupthis.checkConfig();// 如果producerGroup为该值,则改变instanceName为pidif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 获取或者创建客户端实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 实例注册,实际是放到一个map中,producerGroupboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 是否要启动该实例mQClientFactory.start();}this.serviceState = ServiceState.RUNNING;break;case 其他状态不处理://....}// 心跳发送this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}

MQClientInstance创建和启动

MQClientInstance主要是客户端的一些操作的集合(不知道该怎么总结好…),通过MQClientManager的getAndCreateMQClientInstance方法创建,这里使用了单例模式去创建MQClientManager

   public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {return getAndCreateMQClientInstance(clientConfig, null);}public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {//ip@instanceName@unitNameString clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {// 创建MQClientInstance并放入map中instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;} else {}}return instance;}

构造方法如下:

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;// 通信服务的一些配置this.nettyClientConfig = new NettyClientConfig();//设置线程数this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());//网络请求的处理器this.clientRemotingProcessor = new ClientRemotingProcessor(this);// 客户端一些基础APIthis.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);if (this.clientConfig.getNamesrvAddr() != null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());}this.clientId = clientId;this.mQAdminImpl = new MQAdminImpl(this);// 拉取消息服务this.pullMessageService = new PullMessageService(this);// rebalance服务this.rebalanceService = new RebalanceService(this);//创建一个group为CLIENT_INNER_PRODUCER_GROUP的producer// 该producer主要用于将消息发回Brokerthis.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);// 将用户自建的producer属性复制过来lithis.defaultMQProducer.resetClientConfig(clientConfig);// Consumer一些消费情况(TPS,RT等)的统计管理this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);}

可以看到MQClientInstance里包含了客户端(Producer、Consumer)的一些通用操作以及一些属性的保存,其中一些具体是功能后续文章分析

上面有点需要注意:

  • 在判断producerGroup为MixAll.CLIENT_INNER_PRODUCER_GROUP时会改变InstanceName为pid,为什么需要这样做呢?

个人理解如下:
由于该Producer是系统内置的producer,用户系统行为的消息发送,如果一个JVM内用户启动了多个Producer,那么内置的producer是可以用一个的,所以这里将instanceName改成pid就好了,那么在getAndCreateMQClientInstance中获取的clientId(ip@instanceName@unitName)就会一样,就能取出之前创建的MQClientInstance对象

启动方法如下:

    public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}//netty相关通信的初始化,然后开启一个定时任务扫描responseTablethis.mQClientAPIImpl.start();// 启动定时任务this.startScheduledTask();// 启动消息拉取服务this.pullMessageService.start();// 启动Rebalance拉取服务this.rebalanceService.start();// 启动内置producerthis.defaultMQProducer.getDefaultMQProducerImpl().start(false);this.serviceState = ServiceState.RUNNING;break;case//其他状态不处理;}}}

启动定时任务

MQClientInstance启动的时候会把一些客户端相关的服务都启动,另外启动了很多重要的定时任务:

  1. 拉取NameServer地址
  2. 更新Topic路由信息
  3. 清除下线的Broker
  4. 发送心跳给所有Broker
  5. 持久化Consumer的消费offset
  6. 调整线程池

整个启动的大体过程就是如此,细节很多,不在这里展开,会在另外文章具体分析

这篇关于RocketMQ源码分析----Producer启动过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120498

相关文章

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中Hash从使用过程到原理说明

《Redis中Hash从使用过程到原理说明》RedisHash结构用于存储字段-值对,适合对象数据,支持HSET、HGET等命令,采用ziplist或hashtable编码,通过渐进式rehash优化... 目录一、开篇:Hash就像超市的货架二、Hash的基本使用1. 常用命令示例2. Java操作示例三

Redis中Set结构使用过程与原理说明

《Redis中Set结构使用过程与原理说明》本文解析了RedisSet数据结构,涵盖其基本操作(如添加、查找)、集合运算(交并差)、底层实现(intset与hashtable自动切换机制)、典型应用场... 目录开篇:从购物车到Redis Set一、Redis Set的基本操作1.1 编程常用命令1.2 集

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

k8s中实现mysql主备过程详解

《k8s中实现mysql主备过程详解》文章讲解了在K8s中使用StatefulSet部署MySQL主备架构,包含NFS安装、storageClass配置、MySQL部署及同步检查步骤,确保主备数据一致... 目录一、k8s中实现mysql主备1.1 环境信息1.2 部署nfs-provisioner1.2.

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe