RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程

本文主要是介绍RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • NameServer 路由注册机制
      • 生产者的发送消息流程

NameServer 路由注册机制

在 Broker 启动时,通过 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); 向 NameServer 中注册自己

那么 NameServer 中,注册 Broker 信息的入口在: DefaultRequestProcessor # processRequest

  • 判断请求码,如果是 Broker 注册,则进行注册 Broker 信息

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {// ... 省略// 如果是 Broker 注册case RequestCode.REGISTER_BROKER:return this.registerBroker(ctx, request);// ... 省略}
    }
    

  • this.registerBroker 真正开始注册 Broker 信息

    在注册信息之前,会先使用 crc32 来检验消息的正确性(安全检查)

    在这里插入图片描述

    之后会调用 this.namesrvController.getRouteInfoManager().registerBroker() 来注册 Broker 的信息,这个 Broker 的信息是 BrokerController 启动时通过 Netty 发送过来的

    通过 getRouteInfoManager 获取 RouteInfoManager,在该类中注册 Broker 信息,那么 RouteInfoManager 肯定是管理了 Broker 的信息

    可以点进去 RouteInfoManager,可以发现其中管理了很多路由的信息

    在这里插入图片描述

    其中 brokerLiveTable 存储的是存活的 Broker 列表,那么可以查看该变量的引用链,来判断 Nameserver 在哪里进行心跳扫描

    在这里插入图片描述

    可以看到在 scanNotActiveBroker 方法中,会将 brokerLiveTable 中不活跃的 Broker 给剔除掉

    在这里插入图片描述

生产者的发送消息流程

下面会将整体的一个发送消息的流程图片先展示出来,再通过代码进行一步一步梳理:

在这里插入图片描述

既然要看生产者的发送消息流程,就先通过方法的调用作为入口,一步一步探究流程:

在这里插入图片描述

那么通过这个 send 方法点进去,入口为:DefaultMQProducer # send(Message msg) 方法,从该方法点击进入,调用链如下:

在这里插入图片描述

如果你在看源码的话,可以从上边的调用链一步一步点击,最后发送消息的逻辑就在 this.sendDefaultImpl 方法中展开

  1. 首先,会先根据 Topic 获取对应的路由信息,表示该 Topic 需要向哪个 MessageQueue 中进行发送,这个路由信息会先从本地缓存中取,如果没有取到,会向 NameServer 发送请求来获取 Topic 的路由信息
  2. 设置消息发送失败的 重试次数 ,同步情况下重试次数为预设次数 +1,异步情况下默认重试次数为 1
  3. 接下来就根据 重试次数 循环发送消息,为 Topic 选择要发送的队列 MessageQueue 进行消息发送

选择队列之后,就进入到发送消息的核心逻辑:this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

  1. 在该方法中,先通过队列 MessageQueue 找到对应的 brokerAddr
  2. 之后,会尝试对消息进行压缩
  3. 判断是否存在一些需要对消息进行 禁止发送前置拦截 的钩子函数,进行一些消息的拦截处理
  4. 判断通信模式:ASYNC、ONEWAY、SYNC,将消息以对应的方式发送出去,这里以同步 SYNC 为例

如果是同步的话,会通过 this.mQClientFactory.getMQClientAPIImpl().sendMessage() 方法将消息发送出去,接下来又是层层的调用,最后真正通过 Netty 将消息发送出去的地方在 NettyRemotingClient # invokeSync() 的方法中

在这个方法中,还会对消息进行前置拦截和后置拦截,为开发者的使用提供了很多的扩展点,在这里就 真正通过 Netty 将消息发送出去了

在这里插入图片描述

这篇关于RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/511876

相关文章

Django中的函数视图和类视图以及路由的定义方式

《Django中的函数视图和类视图以及路由的定义方式》Django视图分函数视图和类视图,前者用函数处理请求,后者继承View类定义方法,路由使用path()、re_path()或url(),通过in... 目录函数视图类视图路由总路由函数视图的路由类视图定义路由总结Django允许接收的请求方法http

python使用Akshare与Streamlit实现股票估值分析教程(图文代码)

《python使用Akshare与Streamlit实现股票估值分析教程(图文代码)》入职测试中的一道题,要求:从Akshare下载某一个股票近十年的财务报表包括,资产负债表,利润表,现金流量表,保存... 目录一、前言二、核心知识点梳理1、Akshare数据获取2、Pandas数据处理3、Matplotl

Django开发时如何避免频繁发送短信验证码(python图文代码)

《Django开发时如何避免频繁发送短信验证码(python图文代码)》Django开发时,为防止频繁发送验证码,后端需用Redis限制请求频率,结合管道技术提升效率,通过生产者消费者模式解耦业务逻辑... 目录避免频繁发送 验证码1. www.chinasem.cn避免频繁发送 验证码逻辑分析2. 避免频繁

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

python运用requests模拟浏览器发送请求过程

《python运用requests模拟浏览器发送请求过程》模拟浏览器请求可选用requests处理静态内容,selenium应对动态页面,playwright支持高级自动化,设置代理和超时参数,根据需... 目录使用requests库模拟浏览器请求使用selenium自动化浏览器操作使用playwright

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499