全网最细RocketMQ源码一:NameSrv

2024-01-11 21:36

本文主要是介绍全网最细RocketMQ源码一:NameSrv,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、入口

在这里插入图片描述
NameServer的启动源码在NameStartup,现在开始debug之旅

二、createNamesrcController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());// 启动时的参数信息 有commandLine 管理了。commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// namesrv 配置。final NamesrvConfig namesrvConfig = new NamesrvConfig();// netty 服务器配置。final NettyServerConfig nettyServerConfig = new NettyServerConfig();// namesrv服务器 监听端口 修改为9876nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {// 读取 -c 选项的值String file = commandLine.getOptionValue('c');if (file != null) {// 读取 config 文件数据 到 properties 内InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);// 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 的字段,那么进行复写。MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);// 将读取的 配置文件 路径 保存 到 字段。namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}// 将启动时 命令行 设置的kv 复写到 namesrvConfig内。MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 创建日志对象。LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);// 创建 控制器// 参数1:namesrvConfig// 参数2:网络层配置 nettyServerConfigfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}

主要做了几件事:

  1. 创建了NamesrvConfig
  2. 创建了nettyServerConfig
  3. 创建了NamesrvController

NamesrvController详解:

public class NamesrvController {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvConfig namesrvConfig;private final NettyServerConfig nettyServerConfig;// 调度线程池,执行定时任务,两件事:1. 检查存活的broker状态  2. 打印配置private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));// 管理kv配置。private final KVConfigManager kvConfigManager;// 管理路由信息的对象,重要。private final RouteInfoManager routeInfoManager;// 网络层封装对象,重要。private RemotingServer remotingServer;// ChannelEventListener ,用于监听channel 状态,当channel状态 发生改变时 close idle... 会向 事件队列发起事件,事件最终由 该service处理。private BrokerHousekeepingService brokerHousekeepingService;// 业务线程池,netty 线程 主要任务是 解析报文 将 报文 解析成 RemotingCommand 对象,然后 就将 该对象 交给 业务 线程池 再继续处理。private ExecutorService remotingExecutor;private Configuration configuration;private FileWatchService fileWatchService;// 参数1:namesrvConfig// 参数2:网络层配置 nettyServerConfigpublic NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {this.namesrvConfig = namesrvConfig;this.nettyServerConfig = nettyServerConfig;this.kvConfigManager = new KVConfigManager(this);this.routeInfoManager = new RouteInfoManager();this.brokerHousekeepingService = new BrokerHousekeepingService(this);this.configuration = new Configuration(log,this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");}}

start(NamesrvController)

  public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化方法..boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// JVM HOOK ,平滑关闭的逻辑。 当JVM 被关闭时,主动调用 controller.shutdown() 方法,让服务器平滑关机。Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));// 启动服务器。controller.start();return controller;}

主要做了几件事:

  1. controller初始化
   public boolean initialize() {// 加载本地kv配置this.kvConfigManager.load();// 创建网络服务器对象this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// 创建业务线程池,默认线程数 8this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));// 注册协议处理器(缺省协议处理器)this.registerProcessor();// 定时任务1:每10秒钟检查 broker 存活状态,将idle状态的 broker 移除。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 定时任务2:每10分钟 打印一遍 kv 配置。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);}
  1. controller启动
    在这里插入图片描述
    会调用到NettyRemotingServer.start方法
 public void start() {// 当向channel pipeline 添加 handler 时 指定了 group 时,网络事件传播到 当前handler时,事件处理 由 分配给 handler 的线程执行。this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 创建共享的 处理器 handlerprepareSharableHandlers();ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端 ServerSocketChannel类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池是  PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 服务器 绑定端口。ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();// 将服务器成功绑定的端口号 赋值给 字段 port。this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// housekeepingService 不为空,则创建 网络异常事件 处理器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 提交定时任务,每一秒 执行一次。// 扫描 responseTable 表,将过期的 responseFuture 移除。this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}


这篇关于全网最细RocketMQ源码一:NameSrv的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/595760

相关文章

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Spring 中 BeanFactoryPostProcessor 的作用和示例源码分析

《Spring中BeanFactoryPostProcessor的作用和示例源码分析》Spring的BeanFactoryPostProcessor是容器初始化的扩展接口,允许在Bean实例化前... 目录一、概览1. 核心定位2. 核心功能详解3. 关键特性二、Spring 内置的 BeanFactory

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Java对象和JSON字符串之间的转换方法(全网最清晰)

《Java对象和JSON字符串之间的转换方法(全网最清晰)》:本文主要介绍如何在Java中使用Jackson库将对象转换为JSON字符串,并提供了一个简单的工具类示例,该工具类支持基本的转换功能,... 目录前言1. 引入 Jackson 依赖2. 创建 jsON 工具类3. 使用示例转换 Java 对象为

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听