第七章-Broker-创建topic

2024-03-28 20:44
文章标签 创建 第七章 broker topic

本文主要是介绍第七章-Broker-创建topic,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概念就不讲了,直接上操作和源码,这里就用rocketmq自带的dashboard来创建topic,如下图:

在这里插入图片描述

clusterName:Broker的集群,可以选择多个

BROKER_NAME:Broker 名字,可以选择多个

topicName:topic 名字

writeQueueNums:写队列数

readQueueNums:读队列数

perm:设置topic的读写模式

最终创建topic的方法,由DefaultMQAdminExt.createAndUpdateTopicConfig实现,那么我们顺着这个调用链走下去

DefaultMQAdminExt.createAndUpdateTopicConfig

->DefaultMQAdminExtImpl.createAndUpdateTopicConfig

->MQClientAPIImpl.createTopic

MQClientAPIImpl.createTopic

/**
* 先讲下各个参数代表什么
* addr:选择的单个broker的地址,注意这里是单个,当选择多个broker时,会遍历一个一个的调用创建
* defaultTopic:默认topic => TBW102
* topicConfig:读写队列数、权限等信息
* timeoutMillis:超过毫秒,默认20000
*/
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,final long timeoutMillis)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {// 组装请求头CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();requestHeader.setTopic(topicConfig.getTopicName());requestHeader.setDefaultTopic(defaultTopic);requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());requestHeader.setPerm(topicConfig.getPerm());requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());requestHeader.setOrder(topicConfig.isOrder());// 主要就看这一行,将请求组装成远程命令对象,请求码为 RequestCode.UPDATE_AND_CREATE_TOPIC,在RocketMQ中,所有的网络请求最终都会转化成对应的请求码,所以我们找到处理该请求码的代码就行,翻看源码得知,这里就得转到broker模块,并由AdminBrokerProcessor.processRequest中处理,且该方法中对应 RequestCode.UPDATE_AND_CREATE_TOPIC 请求码的方法是 updateAndCreateTopic(ctx, request),好了,这下我们直接进入该方法就行。RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQClientException(response.getCode(), response.getRemark());
}

AdminBrokerProcessor.updateAndCreateTopic

private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 先创建响应命令对象final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 这里就是转换成当前能识别的,有兴趣的可以进去看看final CreateTopicRequestHeader requestHeader =(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));// 检查 topic 名是否与 broker 集群名字冲突,有冲突就要返回错误if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";log.warn(errorMsg);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorMsg);return response;}try {// 上面检查通过了,设置返回码为成功response.setCode(ResponseCode.SUCCESS);// 唯一标识当前请求的idresponse.setOpaque(request.getOpaque());response.markResponseType();response.setRemark(null);ctx.writeAndFlush(response);// 将响应信息通过netty写回客户端} catch (Exception e) {log.error("Failed to produce a proper response", e);}// 设置topicConfigTopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());topicConfig.setPerm(requestHeader.getPerm());topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());// 正式存放 topic的操作在这里,看 TopicConfigManager.updateTopicConfigthis.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
// 将 broker 的信息再注册到 namesrvthis.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());return null;
}

TopicConfigManager.updateTopicConfig

public void updateTopicConfig(final TopicConfig topicConfig) {// 先在缓存 map中设置 topic的配置TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);if (old != null) {log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);} else {log.info("create new topic [{}]", topicConfig);}// 记录操作版本,其实就是id自增this.dataVersion.nextVersion();// 将topic配置信息持久化到文件this.persist();
}

将topic配置信息持续化到文件

持久化操作在ConfigManager类中执行

public synchronized void persist() {// encode是一个模板方法,由子类实现,在这里指的是 TopicConfigManagerString jsonString = this.encode(true);if (jsonString != null) {// 拿到topic文件存放目录,默认情况下是:{user.home}/store/config/topics.jsonString fileName = this.configFilePath();try {// 将 json 字符串写入 topic 配置文件中,格式如`图7-1`MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}}
}public abstract String encode(final boolean prettyFormat);

TopicConfigManager.encode

public String encode(final boolean prettyFormat) {// 这个方法挺简单,就是把刚才在 updateTopicConfig 方法中存放在本地缓存 map中的值设置到 topicConfigSerializeWrapper对象,同时,将 dataVersion 也设置进去,然后再将对象序列化成 json 字符串TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);topicConfigSerializeWrapper.setDataVersion(this.dataVersion);// 生成 json,格式如`图7-1`return topicConfigSerializeWrapper.toJson(prettyFormat);
}

图7-1

在这里插入图片描述

这篇关于第七章-Broker-创建topic的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/856654

相关文章

Linux线程之线程的创建、属性、回收、退出、取消方式

《Linux线程之线程的创建、属性、回收、退出、取消方式》文章总结了线程管理核心知识:线程号唯一、创建方式、属性设置(如分离状态与栈大小)、回收机制(join/detach)、退出方法(返回/pthr... 目录1. 线程号2. 线程的创建3. 线程属性4. 线程的回收5. 线程的退出6. 线程的取消7.

创建Java keystore文件的完整指南及详细步骤

《创建Javakeystore文件的完整指南及详细步骤》本文详解Java中keystore的创建与配置,涵盖私钥管理、自签名与CA证书生成、SSL/TLS应用,强调安全存储及验证机制,确保通信加密和... 目录1. 秘密键(私钥)的理解与管理私钥的定义与重要性私钥的管理策略私钥的生成与存储2. 证书的创建与

python如何创建等差数列

《python如何创建等差数列》:本文主要介绍python如何创建等差数列的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录python创建等差数列例题运行代码回车输出结果总结python创建等差数列import numpy as np x=int(in

怎么用idea创建一个SpringBoot项目

《怎么用idea创建一个SpringBoot项目》本文介绍了在IDEA中创建SpringBoot项目的步骤,包括环境准备(JDK1.8+、Maven3.2.5+)、使用SpringInitializr... 目录如何在idea中创建一个SpringBoot项目环境准备1.1打开IDEA,点击New新建一个项

如何使用Maven创建web目录结构

《如何使用Maven创建web目录结构》:本文主要介绍如何使用Maven创建web目录结构的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录创建web工程第一步第二步第三步第四步第五步第六步第七步总结创建web工程第一步js通过Maven骨架创pytho

MySQL 用户创建与授权最佳实践

《MySQL用户创建与授权最佳实践》在MySQL中,用户管理和权限控制是数据库安全的重要组成部分,下面详细介绍如何在MySQL中创建用户并授予适当的权限,感兴趣的朋友跟随小编一起看看吧... 目录mysql 用户创建与授权详解一、MySQL用户管理基础1. 用户账户组成2. 查看现有用户二、创建用户1. 基

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

Java中实现线程的创建和启动的方法

《Java中实现线程的创建和启动的方法》在Java中,实现线程的创建和启动是两个不同但紧密相关的概念,理解为什么要启动线程(调用start()方法)而非直接调用run()方法,是掌握多线程编程的关键,... 目录1. 线程的生命周期2. start() vs run() 的本质区别3. 为什么必须通过 st

Macos创建python虚拟环境的详细步骤教学

《Macos创建python虚拟环境的详细步骤教学》在macOS上创建Python虚拟环境主要通过Python内置的venv模块实现,也可使用第三方工具如virtualenv,下面小编来和大家简单聊聊... 目录一、使用 python 内置 venv 模块(推荐)二、使用 virtualenv(兼容旧版 P

Linux lvm实例之如何创建一个专用于MySQL数据存储的LVM卷组

《Linuxlvm实例之如何创建一个专用于MySQL数据存储的LVM卷组》:本文主要介绍使用Linux创建一个专用于MySQL数据存储的LVM卷组的实例,具有很好的参考价值,希望对大家有所帮助,... 目录在Centos 7上创建卷China编程组并配置mysql数据目录1. 检查现有磁盘2. 创建物理卷3. 创