RocketMQ源码分析----Producer启动过程

2024-08-30 09:58

本文主要是介绍RocketMQ源码分析----Producer启动过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

总体流程

首先从demo为入口分析整个启动过程

	public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {String message = "send message";final Message msg = new Message("topic", "tag",// tagmessage.getBytes());// bodySendResult sendResult = producer.send(msg);producer.shutdown();} catch (Exception e) {e.printStackTrace();}}

DefaultMQProducer构造方法如下:

    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}public DefaultMQProducer(final String producerGroup) {this(producerGroup, null);}

然后调用start方法后,即开始初始化流程,其间接调用了DefaultMQProducerImpl的start方法,即上面构造方法创建的DefaultMQProducerImpl,代码如下:

    public void start() throws MQClientException {this.start(true);}public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查producerGroupthis.checkConfig();// 如果producerGroup为该值,则改变instanceName为pidif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 获取或者创建客户端实例this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 实例注册,实际是放到一个map中,producerGroupboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 是否要启动该实例mQClientFactory.start();}this.serviceState = ServiceState.RUNNING;break;case 其他状态不处理://....}// 心跳发送this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}

MQClientInstance创建和启动

MQClientInstance主要是客户端的一些操作的集合(不知道该怎么总结好…),通过MQClientManager的getAndCreateMQClientInstance方法创建,这里使用了单例模式去创建MQClientManager

   public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) {return getAndCreateMQClientInstance(clientConfig, null);}public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {//ip@instanceName@unitNameString clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {// 创建MQClientInstance并放入map中instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;} else {}}return instance;}

构造方法如下:

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;// 通信服务的一些配置this.nettyClientConfig = new NettyClientConfig();//设置线程数this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());//网络请求的处理器this.clientRemotingProcessor = new ClientRemotingProcessor(this);// 客户端一些基础APIthis.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);if (this.clientConfig.getNamesrvAddr() != null) {this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());}this.clientId = clientId;this.mQAdminImpl = new MQAdminImpl(this);// 拉取消息服务this.pullMessageService = new PullMessageService(this);// rebalance服务this.rebalanceService = new RebalanceService(this);//创建一个group为CLIENT_INNER_PRODUCER_GROUP的producer// 该producer主要用于将消息发回Brokerthis.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);// 将用户自建的producer属性复制过来lithis.defaultMQProducer.resetClientConfig(clientConfig);// Consumer一些消费情况(TPS,RT等)的统计管理this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);}

可以看到MQClientInstance里包含了客户端(Producer、Consumer)的一些通用操作以及一些属性的保存,其中一些具体是功能后续文章分析

上面有点需要注意:

  • 在判断producerGroup为MixAll.CLIENT_INNER_PRODUCER_GROUP时会改变InstanceName为pid,为什么需要这样做呢?

个人理解如下:
由于该Producer是系统内置的producer,用户系统行为的消息发送,如果一个JVM内用户启动了多个Producer,那么内置的producer是可以用一个的,所以这里将instanceName改成pid就好了,那么在getAndCreateMQClientInstance中获取的clientId(ip@instanceName@unitName)就会一样,就能取出之前创建的MQClientInstance对象

启动方法如下:

    public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());}//netty相关通信的初始化,然后开启一个定时任务扫描responseTablethis.mQClientAPIImpl.start();// 启动定时任务this.startScheduledTask();// 启动消息拉取服务this.pullMessageService.start();// 启动Rebalance拉取服务this.rebalanceService.start();// 启动内置producerthis.defaultMQProducer.getDefaultMQProducerImpl().start(false);this.serviceState = ServiceState.RUNNING;break;case//其他状态不处理;}}}

启动定时任务

MQClientInstance启动的时候会把一些客户端相关的服务都启动,另外启动了很多重要的定时任务:

  1. 拉取NameServer地址
  2. 更新Topic路由信息
  3. 清除下线的Broker
  4. 发送心跳给所有Broker
  5. 持久化Consumer的消费offset
  6. 调整线程池

整个启动的大体过程就是如此,细节很多,不在这里展开,会在另外文章具体分析

这篇关于RocketMQ源码分析----Producer启动过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120498

相关文章

解决hive启动时java.net.ConnectException:拒绝连接的问题

《解决hive启动时java.net.ConnectException:拒绝连接的问题》Hadoop集群连接被拒,需检查集群是否启动、关闭防火墙/SELinux、确认安全模式退出,若问题仍存,查看日志... 目录错误发生原因解决方式1.关闭防火墙2.关闭selinux3.启动集群4.检查集群是否正常启动5.

Django HTTPResponse响应体中返回openpyxl生成的文件过程

《DjangoHTTPResponse响应体中返回openpyxl生成的文件过程》Django返回文件流时需通过Content-Disposition头指定编码后的文件名,使用openpyxl的sa... 目录Django返回文件流时使用指定文件名Django HTTPResponse响应体中返回openp

python使用Akshare与Streamlit实现股票估值分析教程(图文代码)

《python使用Akshare与Streamlit实现股票估值分析教程(图文代码)》入职测试中的一道题,要求:从Akshare下载某一个股票近十年的财务报表包括,资产负债表,利润表,现金流量表,保存... 目录一、前言二、核心知识点梳理1、Akshare数据获取2、Pandas数据处理3、Matplotl

Linux线程同步/互斥过程详解

《Linux线程同步/互斥过程详解》文章讲解多线程并发访问导致竞态条件,需通过互斥锁、原子操作和条件变量实现线程安全与同步,分析死锁条件及避免方法,并介绍RAII封装技术提升资源管理效率... 目录01. 资源共享问题1.1 多线程并发访问1.2 临界区与临界资源1.3 锁的引入02. 多线程案例2.1 为

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

分布式锁在Spring Boot应用中的实现过程

《分布式锁在SpringBoot应用中的实现过程》文章介绍在SpringBoot中通过自定义Lock注解、LockAspect切面和RedisLockUtils工具类实现分布式锁,确保多实例并发操作... 目录Lock注解LockASPect切面RedisLockUtils工具类总结在现代微服务架构中,分布

Win10安装Maven与环境变量配置过程

《Win10安装Maven与环境变量配置过程》本文介绍Maven的安装与配置方法,涵盖下载、环境变量设置、本地仓库及镜像配置,指导如何在IDEA中正确配置Maven,适用于Java及其他语言项目的构建... 目录Maven 是什么?一、下载二、安装三、配置环境四、验证测试五、配置本地仓库六、配置国内镜像地址

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比