基于Vertx实现可配置及可扩展的IOT服务

2023-10-20 17:21

本文主要是介绍基于Vertx实现可配置及可扩展的IOT服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

搭建框架的目标

        相信写过IOT服务的伙伴应该知道,面对各种千奇百怪的通信协议,特别是16进制报文的协议,有些协议看的确实有点让人头疼。但这些协议中也有很多共性,不必针对每过协议都把一些业务无关的代码再撸一遍。

        搭建这个项目主要是针对常见的TCP连接为基础的设备通信协议做一些抽象及规范化处理,减低一些开发的成本,目标是实现一个可配置的,便于扩展各种协议的框架。

Vertx简介

        Vert.x是Eclipse基金会下面的一个开源项目,Vert.x的基本定位是一个事件驱动的编程框架,通过Vert.x使用者可以用相对低的成本就享受到NIO带来的高性能。netty是Vert.x底层使用的通讯组件,Vert.x为了最大限度的降低使用门槛,刻意屏蔽掉了许多底层netty相关的细节,比如ByteBuf、引用计数等等。

        本文主要见解的是搭建一个可配置和可扩展的IOT服务,并不会详细展开讲解Vertx,Vertx相关内容可上官网查看《vertx官网》​​​​​​

IOT通信中的常见概念

1.logicAddress

        逻辑通信地址。在常见的设备协议中,都会有逻辑通信地址这个概念,用于标识当前的连接是具体的某个设备。有了这个逻辑地址之后就可以很方便的找到这个连接。

        在业务系统中建立档案的时候用这个地址,后续也可以通过这个通信地址将指定的命令下发给指定的设备。

2.messageType

        消息类型。在TCP通信中,设备上报的不止一类消息,但在常见的设备通信协议中都会针对不同的消息做不同的标识,借此来区分每条上传消息的含义。所以在设备上报的报文中我们根据通信协议的定义找到报文的标识位,然后再做对应的处理。

3.session

        会话。session主要用于管理连接。设备和服务端建立连接之后会产生一个socket,但很多时候这个socket缺少一些语义和描述,所以我们会对这个socket做一些包装,比如抽象一些方法,绑定设备的逻辑地址以便后续查找和调用。

代码架构流程

        整体的核心流程如下:

核心代码分析

yaml文件配置

        配置多个协议的协议名称和协议通信端口号,这里用多个端口区分不同的协议,避免协议内容相近的时候出现解析错误的情况。

protocols:- name: ZHONGXING #中兴port: 8898- name: HUAWEI #华为port: 9666

 ProtocolServerBootstrap

        这里主要是加载yaml配置,然后启动相应的TcpServer服务监听端口,并根据配置定义找打对应协议的编解码器,将消息转发到对应的编解码器中。

        这里用到了两个自定义注解:

        @CodecScan:标识编解码器要扫描哪些包。

        @Protocol:注解来标识编解码器对应的通信协议。

/*** @author yan* @date 2023/9/12*/
@Slf4j
public class ProtocolServerBootstrap extends AbstractVerticle {private Class<?> starter;private static Map<String, ProtocolConfig> protocols = new ConcurrentHashMap<>();private static Map<String, AbstractProtocolCodec> codecMap = new ConcurrentHashMap<>();public ProtocolServerBootstrap(Class<?> starter) {this.starter = starter;}@Overridepublic void init(Vertx vertx, Context context) {super.init(vertx, context);loadProfile();loadProtocolCodec();}public void loadProfile() {InputStream inputStream = null;try {inputStream = this.getClass().getClassLoader().getResourceAsStream("protocol.yml");Yaml yaml = new Yaml();Map<String, List<Object>> map = yaml.load(inputStream);List<Object> protocolConfigs = map.get("protocols");String host = NetUtil.getLocalhost().getHostAddress();protocolConfigs.stream().map(item -> JSONUtil.toBean(JSONUtil.toJsonStr(item), ProtocolConfig.class)).forEach(config -> {protocols.put(config.getName(), new ProtocolConfig().setName(config.getName()).setHost(host).setPort(config.getPort()));});} catch (Exception e) {e.printStackTrace();log.error("配置文件解析失败:" + e);} finally {try {inputStream.close();} catch (IOException e) {e.printStackTrace();}}}private void loadProtocolCodec() {try {CodecScan codecScan = starter.getAnnotation(CodecScan.class);if(codecScan == null){this.protocols.clear();return;}String[] packages = codecScan.value();for (String p : packages) {Reflections reflection = new Reflections(p);Set<Class<?>> classes = reflection.getTypesAnnotatedWith(Protocol.class);for (Class<?> aClass : classes) {Protocol annotation = aClass.getAnnotation(Protocol.class);codecMap.put(annotation.value(), (AbstractProtocolCodec) aClass.newInstance());log.info("加载编解码器:" + aClass.getName());}}} catch (Exception e) {log.error("加载编解码器失败:" + e);}}@Overridepublic void start() {protocols.forEach((name, protocol) -> {AbstractProtocolCodec codec = codecMap.get(name);vertx.deployVerticle(codec);SocketAddress address = new SocketAddressImpl(protocol.getPort(), protocol.getHost());NetServer server = vertx.createNetServer();server.connectHandler(codec).listen(address).onComplete(res -> {if (res.succeeded()) {log.info("{}服务启动成功,绑定/{}", protocol.getName(), address);} else {if (res.cause() != null) {log.error("服务启动失败,cause:" + res.cause());}}});});}public AbstractProtocolCodec getProtocolCodec(String protocolName) {return codecMap.get(protocolName);}
}

AbstractProtocolCodec

         抽象编解码器类,主要包含监听服务端收到的消息,会话管理,管理处理器等。

/*** @author yan* @date 2023/9/12*/
@Slf4j
public abstract class AbstractProtocolCodec extends AbstractVerticle implements Handler<NetSocket> {private Map<String, BaseSession> logicAddressSessionMap = new ConcurrentHashMap<>();private Map<NetSocket, BaseSession> socketSessionMap = new ConcurrentHashMap<>();private Map<String, AbstractProtocolHandler> handlerMap = new ConcurrentHashMap<>();@Overridepublic void init(Vertx vertx, Context context) {super.init(vertx, context);vertx.eventBus().registerDefaultCodec(BaseMessage.class, new GenericMessageCodec<BaseMessage>() {});vertx.eventBus().registerDefaultCodec(BaseSession.class, new GenericMessageCodec<BaseSession>() {});registerHandlers();}@Overridepublic void handle(NetSocket socket) {log.info("收到新的连接:" + socket);activeSocket(socket);socket.closeHandler(handler -> {log.info("连接已断开:" + socket);afterCloseSocket(socket);removeSession(socket);});socket.handler(data -> {try {BaseMessage message = new BaseMessage().setSocket(socket).setBuffer(data);if(!socketSessionMap.containsKey(socket)){String logicAddress = getLogicAddress(message);registerSession(logicAddress, socket);}decode(message);} catch (Exception e) {e.printStackTrace();log.error("解码处理失败,throw:" + e);}});}private BaseSession registerSession(String logicAddress, NetSocket socket) {BaseSession session = new BaseSession().setLogicAddress(logicAddress).setSocket(socket);logicAddressSessionMap.put(logicAddress, session);socketSessionMap.put(socket, session);return session;}private void removeSession(NetSocket socket) {BaseSession session = socketSessionMap.remove(socket);if(session != null){logicAddressSessionMap.remove(session.getLogicAddress());}}public BaseSession getSessionByLogicAddress(String logicAddress) {return logicAddressSessionMap.get(logicAddress);}protected abstract List<AbstractProtocolHandler> getHandlers();private void registerHandlers() {List<AbstractProtocolHandler> handlers = getHandlers();handlers.forEach(handler -> {handlerMap.put(handler.getMessageType(), handler);vertx.deployVerticle(handler);});}public AbstractProtocolHandler getHandlerByMessageType(String messageType) {return handlerMap.get(messageType);}protected abstract void decode(BaseMessage message);protected abstract String getLogicAddress(BaseMessage message);protected void activeSocket(NetSocket socket) {}protected void afterCloseSocket(NetSocket socket) {}
}

AbstractProtocolHandler

        处理器抽象类

/*** @author yan* @date 2023/9/14*/
public abstract class AbstractProtocolHandler<T, R> extends AbstractVerticle implements Handler<Message<T>>, InvokeHandler<R> {@Overridepublic void start() throws Exception {vertx.eventBus().consumer(getTopic(), this::handle);}protected abstract String getTopic();protected abstract String getMessageType();@Overridepublic void write(BaseSession session, Buffer buffer) {session.getSocket().write(buffer);}
}

InvokeHandler

/*** @author yan* @date 2023/9/14*/
public interface InvokeHandler<T> {/*** 根据传入参数获取buffer* @param req* @return*/Buffer getBuffer(T req);/*** 下发消息* @param session* @param buffer*/void write(BaseSession session, Buffer buffer);
}

InvokeAdapter

/*** 服务调用适配器** @author yan* @date 2023/9/14*/
public class InvokeAdapter {private ProtocolServerBootstrap bootstrap;public InvokeAdapter(ProtocolServerBootstrap bootstrap){this.bootstrap = bootstrap;}public void send(String protocolName, String logicAddress, String messageType, Object param) {AbstractProtocolCodec codec = bootstrap.getProtocolCodec(protocolName);BaseSession session = codec.getSessionByLogicAddress(logicAddress);if (session == null || session.getSocket() == null) {throw new RuntimeException("session is not exist or closed");}AbstractProtocolHandler handler = codec.getHandlerByMessageType(messageType);Buffer buffer = handler.getBuffer(param);handler.write(session, buffer);}}

华为协议实现

HuaweiCodec

        华为协议编解码器

/*** @author yan* @date 2023/9/12*/
@Slf4j
@Protocol("HUAWEI")
public class HuaweiCodec extends AbstractProtocolCodec {@Overrideprotected List<AbstractProtocolHandler> getHandlers() {return Arrays.asList(new HuaweiParamReadHandler(), new HuaweiParamWriteHandler(), new HuaweiParamWriteBatchHandler());}@Overrideprotected void decode(BaseMessage message) {String dataStr = ByteUtils.hexToHexString(message.getBuffer().getBytes());String messageType = dataStr.substring(14, 16);vertx.eventBus().publish(HuaweiMessageTypeConstants.getMessageTopic(messageType), message);}@Overrideprotected String getLogicAddress(BaseMessage message) {// 这里根据消息解析出对应的通信地址return "001";}
}

Handler处理器

/*** @author yan* @date 2023/9/13*/
@Slf4j
public class HuaweiParamReadHandler extends AbstractProtocolHandler<BaseMessage, Object> {@Overrideprotected String getTopic() {return HuaweiMessageTypeConstants.READ;}@Overridepublic void handle(Message<BaseMessage> message) {BaseMessage baseMessage = message.body();log.info("收到读参数命令返回:" + ByteUtils.hexToHexString(baseMessage.getBuffer().getBytes()));baseMessage.getSocket().write(baseMessage.getBuffer());}@Overridepublic String getMessageType() {return HuaweiMessageTypeConstants.READ;}@Overridepublic Buffer getBuffer(Object req) {log.info("发送read消息:" + req);return Buffer.buffer(new byte[]{0x11, 0x11, 0x11});}
}

测试调用

编写主类

/*** @author yan* @date 2023/9/11*/
@CodecScan("com.cdw.pv.iot.modules")
public class PvApplication {public static void main(String[] args) {ProtocolServerBootstrap bootstrap = new ProtocolServerBootstrap(PvApplication.class);Vertx vertx = Vertx.vertx();vertx.deployVerticle(bootstrap);// 开启一个http服务,模拟外部调用startHttpServer(vertx, bootstrap);}private static void startHttpServer(Vertx vertx, ProtocolServerBootstrap bootstrap) {InvokeAdapter adapter = new InvokeAdapter(bootstrap);HttpServer httpServer = vertx.createHttpServer();httpServer.requestHandler(handler -> {System.out.println("request请求:" + handler);adapter.send("HUAWEI", "001", HuaweiMessageTypeConstants.READ, "123");handler.response().end(Buffer.buffer("success"));}).listen(8899).onComplete(handler -> {if (handler.succeeded()) {System.out.println("http服务器启动成功");}});}
}

发生指令

      使用TCP连接根据发送指令  

服务调用

        模拟发送请求

        收到消息

总结

        以上就是通用IOT服务的整体架构了。

        这个只是一个基本版本的,后续可以根据实际情况做调整。

这篇关于基于Vertx实现可配置及可扩展的IOT服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/248563

相关文章

mybatis映射器配置小结

《mybatis映射器配置小结》本文详解MyBatis映射器配置,重点讲解字段映射的三种解决方案(别名、自动驼峰映射、resultMap),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定... 目录select中字段的映射问题使用SQL语句中的别名功能使用mapUnderscoreToCame

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决