ThingsBoard MQTT 连接认证过程 源码分析+图例

2024-06-03 03:44

本文主要是介绍ThingsBoard MQTT 连接认证过程 源码分析+图例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

整个连接过程如图所示:

 高清图片链接

1、环境准备

  • thingsboard3.5.1 源码启动。(不懂怎么启动的,大家可以看我的博文ThingsBoard3.5.1源码启动)
  • MQTTX 客户端(用来连接 thingsboard MQTT)
  • 默认配置。queue.type=in-memory,cache.type=caffeine

因为我们的目的,是快速了解 thingsboard 的启动过程,所以所有的配置全部采用默认的方式。默认消息队列采用内存队列ConcurrentHashMap,缓存也采用内存缓存caffeine。

使用 customerA 用户账号密码登录,使用设备A1 AccessToken 连接。

2、源码分析

2.1 连接消息生产

2.1.1 入口

大家知道MQTT是基于TCP协议之上的轻量级通信协议,而TCP协议是面向连接、请求响应的通信协议。所以在 thingsboard 这一侧必然有一个服务器实现,用来等待客户端的连接。这个实现就是MqttTransportService

thingsboard 采用 netty 来实现一个MQTT server。

org.thingsboard.server.transport.mqtt.MqttTransportService@PostConstructpublic void init() throws Exception {log.info("Setting resource leak detector level to {}", leakDetectorLevel);ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));log.info("Starting MQTT transport...");bossGroup = new NioEventLoopGroup(bossGroupThreadCount);workerGroup = new NioEventLoopGroup(workerGroupThreadCount);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, false)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);serverChannel = b.bind(host, port).sync().channel();if (sslEnabled) {b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MqttTransportServerInitializer(context, true)).childOption(ChannelOption.SO_KEEPALIVE, keepAlive);sslServerChannel = b.bind(sslHost, sslPort).sync().channel();}log.info("Mqtt transport started!");}

其中,关系到 netty server 性能的 bossGroupThreadCount,workerGroupThreadCount

thingsboard 提取出两个参数变量 

NETTY_BOSS_GROUP_THREADS

NETTY_WORKER_GROUP_THREADS

方便用户根据自己的设备台数、部署架构,来优化自己的 netty 性能。

 netty server 的请求处理过程如下图所示,圆圈为具体实现类,方框为方法。

在 MqttTransportHandler#processMqttMsg 方法中,因为我们的消息类型是连接,所以我们会进入 processConnect 方法。

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {if (msg.fixedHeader() == null) {log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());ctx.close();return;}deviceSessionCtx.setChannel(ctx);if (CONNECT.equals(msg.fixedHeader().messageType())) {processConnect(ctx, (MqttConnectMessage) msg);} else if (deviceSessionCtx.isProvisionOnly()) {processProvisionSessionMsg(ctx, msg);} else {enqueueRegularSessionMsg(ctx, msg);}}

 在 MqttTransportHandler#processConnect 方法中,由于采用 AccessToken 的授权方式,所以会进入 processAuthTokenConnect

MqttTransportHandler#processAuthTokenConnect 方法中,获取我们在MQTTX填的用户名、密码,然后委托给 DefaultTransportService#process 处理

org.thingsboard.server.transport.mqtt.MqttTransportHandlervoid processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());String userName = msg.payload().userName();String clientId = msg.payload().clientIdentifier();deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {deviceSessionCtx.setProvisionOnly(true);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));} else {X509Certificate cert;if (sslHandler != null && (cert = getX509Certificate()) != null) {processX509CertConnect(ctx, cert, msg);} else {processAuthTokenConnect(ctx, msg);}}}private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {String userName = connectMessage.payload().userName();log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder().setClientId(connectMessage.payload().clientIdentifier());if (userName != null) {request.setUserName(userName);}byte[] passwordBytes = connectMessage.payload().passwordInBytes();if (passwordBytes != null) {String password = new String(passwordBytes, CharsetUtil.UTF_8);request.setPassword(password);}transportService.process(DeviceTransportType.MQTT, request.build(),new TransportServiceCallback<>() {@Overridepublic void onSuccess(ValidateDeviceCredentialsResponse msg) {onValidateDeviceResponse(msg, ctx, connectMessage);}@Overridepublic void onError(Throwable e) {log.trace("[{}] Failed to process credentials: {}", address, userName, e);ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));ctx.close();}});}

2.1.2 DefaultTransportService

 一路跟下去

DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法会调用 TbQueueProducer接口 send 方法,往主题 tb_transport.api.requests 发送消息。TbQueueProducer实现类是InMemoryTbQueueProducer

void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);if (messagesStats != null) {messagesStats.incrementTotal();}// 将消息发送给消息队列topic是tb_transport.api.requestsrequestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {@Overridepublic void onSuccess(TbQueueMsgMetadata metadata) {if (messagesStats != null) {messagesStats.incrementSuccessful();}log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);}@Overridepublic void onFailure(Throwable t) {if (messagesStats != null) {messagesStats.incrementFailed();}pendingRequests.remove(requestId);future.setException(t);}});}
1、TbQueueProducer 接口的实现类有很多个,具体发送消息的实现类是哪一个呢?
因为我们使用内存队列方式启动,所以实现类是 InMemoryTbQueueProducer

2、怎么确定发送的主题是 tb_transport.api.requests

主题是通过 requestTemplate获取的

而 requestTemplate又是 DefaultTbQueueRequestTemplate的一个属性,通过 Builder 构建器注入进来的。

对于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同种消息队列的实现方式。我们现在所用的是内存队列,所以进入InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory中,对于DefaultTbQueueRequestTemplate.requestTemplate

的初始化,使用的是TbQueueTransportApiSettings的配置。

requestsTopic 读取的,就是 tb_transport.api.requests 这一主题。

3、更进一步

认真分析初始化过程,得出下面请求主题的初始化图。

2.1.3 InMemoryTbQueueProducer

InMemoryTbQueueProducer#send 调用 DefaultInMemoryStorage#put 方法 

DefaultInMemoryStorage 往自己持有的 ConcurrentHashMap 中存放消息,

key 是主题 tb_transport.api.requests,value 是存放有消息的 LinkedBlockingQueue 内存队列

2.1.4 一个更抽象的发送模型

TbQueueProducer 往队列 queue 发送消息,主题 tb_transport.api.requests,而不管这个消息的实现是内存队列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 从queue中消费消息。至此,生产连接请求消息的过程结束。

2.2 消费消息

2.2.1 InMemoryTbQueueConsumer

我们知道现在消息生产者接口 TbQueueProducer 的实现类是 InMemoryTbQueueProducer,则它必然有一个消息消费者实现接口 TbQueueConsumer,消费者实现类是 InMemoryTbQueueConsumer

 InMemoryTbQueueConsumer 中对于消息的消费只有把消息从 ConcurrentHashMap 拉取出来的逻辑,而没有具体处理的逻辑,则处理的逻辑,是存在于调用这个 poll 方法的地方。

org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer@Overridepublic List<T> poll(long durationInMillis) {if (subscribed) {@SuppressWarnings("unchecked")List<T> messages = partitions.stream().map(tpi -> {try {return storage.get(tpi.getFullTopicName());} catch (InterruptedException e) {if (!stopped) {log.error("Queue was interrupted.", e);}return Collections.emptyList();}}).flatMap(List::stream).map(msg -> (T) msg).collect(Collectors.toList());if (messages.size() > 0) {return messages;}try {Thread.sleep(durationInMillis);} catch (InterruptedException e) {if (!stopped) {log.error("Failed to sleep.", e);}}}return Collections.emptyList();}

poll 方法的调用端,全局是搜不到的。

 我们可以探究一下它的构造方法,看看谁初始化了它,则谁就有可能调用它的 poll 方法。排除掉它自己,有两个类初始化了 InMemoryTbQueueConsumer,分别是 InMemoryMonolithQueueFactory 和 InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory 订阅的主题,是 tb_transport.api.responses 不是我们要找的 tb_transport.api.requests,忽略。

2.2.2 InMemoryMonolithQueueFactory

我们先来看一下 InMemoryMonolithQueueFactoryInMemoryMonolithQueueFactory 里面有一个方法,传入的 TbQueueTransportApiSettings,刚好就是我们请求消息的主题配置类。

org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Overridepublic TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());}org.thingsboard.server.queue.settings.TbQueueTransportApiSettings 
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {// tb_transport.api.requests@Value("${queue.transport_api.requests_topic}")private String requestsTopic;
}

查看对方法 createTransportApiRequestConsumer 的调用,找到一个非具体队列实现的调用类TbCoreTransportApiService

2.2.3 TbCoreTransportApiService

TbCoreTransportApiService 初始化 init 方法,会创建 TbQueueConsumer——也就是具体的实现类 InMemoryTbQueueConsumer 注入到 DefaultTbQueueResponseTemplate.requestTemplate,然后执行 DefaultTbQueueResponseTemplate#init() 方法。

 2.2.4 DefaultTbQueueResponseTemplate

至此,我们找到了 InMemoryTbQueueConsumer#poll 调用的地方。

继续往下,看看对于消息 requests,是怎么消费的。 

 2.2.5 DefaultTransportApiService

 通过 AccessToken 查找到设备的授权 DeviceCredentials (即device_credentials表记录)然后构造 DeviceInfo 返回给设备端。

org.thingsboard.server.service.transport.DefaultTransportApiService// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);if (credentials != null && credentials.getCredentialsType() == credentialsType) {return getDeviceInfo(credentials);} else {return getEmptyTransportApiResponseFuture();}}

 

2.2.6 消费消息流程图

3、总结

这篇关于ThingsBoard MQTT 连接认证过程 源码分析+图例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1025932

相关文章

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

oracle 11g导入\导出(expdp impdp)之导入过程

《oracle11g导入导出(expdpimpdp)之导入过程》导出需使用SEC.DMP格式,无分号;建立expdir目录(E:/exp)并确保存在;导入在cmd下执行,需sys用户权限;若需修... 目录准备文件导入(impdp)1、建立directory2、导入语句 3、更改密码总结上一个环节,我们讲了

ShardingProxy读写分离之原理、配置与实践过程

《ShardingProxy读写分离之原理、配置与实践过程》ShardingProxy是ApacheShardingSphere的数据库中间件,通过三层架构实现读写分离,解决高并发场景下数据库性能瓶... 目录一、ShardingProxy技术定位与读写分离核心价值1.1 技术定位1.2 读写分离核心价值二

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

Mac电脑如何通过 IntelliJ IDEA 远程连接 MySQL

《Mac电脑如何通过IntelliJIDEA远程连接MySQL》本文详解Mac通过IntelliJIDEA远程连接MySQL的步骤,本文通过图文并茂的形式给大家介绍的非常详细,感兴趣的朋友跟... 目录MAC电脑通过 IntelliJ IDEA 远程连接 mysql 的详细教程一、前缀条件确认二、打开 ID

AOP编程的基本概念与idea编辑器的配合体验过程

《AOP编程的基本概念与idea编辑器的配合体验过程》文章简要介绍了AOP基础概念,包括Before/Around通知、PointCut切入点、Advice通知体、JoinPoint连接点等,说明它们... 目录BeforeAroundAdvise — 通知PointCut — 切入点Acpect — 切面

C++ STL-string类底层实现过程

《C++STL-string类底层实现过程》本文实现了一个简易的string类,涵盖动态数组存储、深拷贝机制、迭代器支持、容量调整、字符串修改、运算符重载等功能,模拟标准string核心特性,重点强... 目录实现框架一、默认成员函数1.默认构造函数2.构造函数3.拷贝构造函数(重点)4.赋值运算符重载函数

Go语言连接MySQL数据库执行基本的增删改查

《Go语言连接MySQL数据库执行基本的增删改查》在后端开发中,MySQL是最常用的关系型数据库之一,本文主要为大家详细介绍了如何使用Go连接MySQL数据库并执行基本的增删改查吧... 目录Go语言连接mysql数据库准备工作安装 MySQL 驱动代码实现运行结果注意事项Go语言执行基本的增删改查准备工作

MySQ中出现幻读问题的解决过程

《MySQ中出现幻读问题的解决过程》文章解析MySQLInnoDB通过MVCC与间隙锁机制在可重复读隔离级别下解决幻读,确保事务一致性,同时指出性能影响及乐观锁等替代方案,帮助开发者优化数据库应用... 目录一、幻读的准确定义与核心特征幻读 vs 不可重复读二、mysql隔离级别深度解析各隔离级别的实现差异