基于Vertx实现可配置及可扩展的IOT服务

2023-10-20 17:21

本文主要是介绍基于Vertx实现可配置及可扩展的IOT服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

搭建框架的目标

        相信写过IOT服务的伙伴应该知道,面对各种千奇百怪的通信协议,特别是16进制报文的协议,有些协议看的确实有点让人头疼。但这些协议中也有很多共性,不必针对每过协议都把一些业务无关的代码再撸一遍。

        搭建这个项目主要是针对常见的TCP连接为基础的设备通信协议做一些抽象及规范化处理,减低一些开发的成本,目标是实现一个可配置的,便于扩展各种协议的框架。

Vertx简介

        Vert.x是Eclipse基金会下面的一个开源项目,Vert.x的基本定位是一个事件驱动的编程框架,通过Vert.x使用者可以用相对低的成本就享受到NIO带来的高性能。netty是Vert.x底层使用的通讯组件,Vert.x为了最大限度的降低使用门槛,刻意屏蔽掉了许多底层netty相关的细节,比如ByteBuf、引用计数等等。

        本文主要见解的是搭建一个可配置和可扩展的IOT服务,并不会详细展开讲解Vertx,Vertx相关内容可上官网查看《vertx官网》​​​​​​

IOT通信中的常见概念

1.logicAddress

        逻辑通信地址。在常见的设备协议中,都会有逻辑通信地址这个概念,用于标识当前的连接是具体的某个设备。有了这个逻辑地址之后就可以很方便的找到这个连接。

        在业务系统中建立档案的时候用这个地址,后续也可以通过这个通信地址将指定的命令下发给指定的设备。

2.messageType

        消息类型。在TCP通信中,设备上报的不止一类消息,但在常见的设备通信协议中都会针对不同的消息做不同的标识,借此来区分每条上传消息的含义。所以在设备上报的报文中我们根据通信协议的定义找到报文的标识位,然后再做对应的处理。

3.session

        会话。session主要用于管理连接。设备和服务端建立连接之后会产生一个socket,但很多时候这个socket缺少一些语义和描述,所以我们会对这个socket做一些包装,比如抽象一些方法,绑定设备的逻辑地址以便后续查找和调用。

代码架构流程

        整体的核心流程如下:

核心代码分析

yaml文件配置

        配置多个协议的协议名称和协议通信端口号,这里用多个端口区分不同的协议,避免协议内容相近的时候出现解析错误的情况。

protocols:- name: ZHONGXING #中兴port: 8898- name: HUAWEI #华为port: 9666

 ProtocolServerBootstrap

        这里主要是加载yaml配置,然后启动相应的TcpServer服务监听端口,并根据配置定义找打对应协议的编解码器,将消息转发到对应的编解码器中。

        这里用到了两个自定义注解:

        @CodecScan:标识编解码器要扫描哪些包。

        @Protocol:注解来标识编解码器对应的通信协议。

/*** @author yan* @date 2023/9/12*/
@Slf4j
public class ProtocolServerBootstrap extends AbstractVerticle {private Class<?> starter;private static Map<String, ProtocolConfig> protocols = new ConcurrentHashMap<>();private static Map<String, AbstractProtocolCodec> codecMap = new ConcurrentHashMap<>();public ProtocolServerBootstrap(Class<?> starter) {this.starter = starter;}@Overridepublic void init(Vertx vertx, Context context) {super.init(vertx, context);loadProfile();loadProtocolCodec();}public void loadProfile() {InputStream inputStream = null;try {inputStream = this.getClass().getClassLoader().getResourceAsStream("protocol.yml");Yaml yaml = new Yaml();Map<String, List<Object>> map = yaml.load(inputStream);List<Object> protocolConfigs = map.get("protocols");String host = NetUtil.getLocalhost().getHostAddress();protocolConfigs.stream().map(item -> JSONUtil.toBean(JSONUtil.toJsonStr(item), ProtocolConfig.class)).forEach(config -> {protocols.put(config.getName(), new ProtocolConfig().setName(config.getName()).setHost(host).setPort(config.getPort()));});} catch (Exception e) {e.printStackTrace();log.error("配置文件解析失败:" + e);} finally {try {inputStream.close();} catch (IOException e) {e.printStackTrace();}}}private void loadProtocolCodec() {try {CodecScan codecScan = starter.getAnnotation(CodecScan.class);if(codecScan == null){this.protocols.clear();return;}String[] packages = codecScan.value();for (String p : packages) {Reflections reflection = new Reflections(p);Set<Class<?>> classes = reflection.getTypesAnnotatedWith(Protocol.class);for (Class<?> aClass : classes) {Protocol annotation = aClass.getAnnotation(Protocol.class);codecMap.put(annotation.value(), (AbstractProtocolCodec) aClass.newInstance());log.info("加载编解码器:" + aClass.getName());}}} catch (Exception e) {log.error("加载编解码器失败:" + e);}}@Overridepublic void start() {protocols.forEach((name, protocol) -> {AbstractProtocolCodec codec = codecMap.get(name);vertx.deployVerticle(codec);SocketAddress address = new SocketAddressImpl(protocol.getPort(), protocol.getHost());NetServer server = vertx.createNetServer();server.connectHandler(codec).listen(address).onComplete(res -> {if (res.succeeded()) {log.info("{}服务启动成功,绑定/{}", protocol.getName(), address);} else {if (res.cause() != null) {log.error("服务启动失败,cause:" + res.cause());}}});});}public AbstractProtocolCodec getProtocolCodec(String protocolName) {return codecMap.get(protocolName);}
}

AbstractProtocolCodec

         抽象编解码器类,主要包含监听服务端收到的消息,会话管理,管理处理器等。

/*** @author yan* @date 2023/9/12*/
@Slf4j
public abstract class AbstractProtocolCodec extends AbstractVerticle implements Handler<NetSocket> {private Map<String, BaseSession> logicAddressSessionMap = new ConcurrentHashMap<>();private Map<NetSocket, BaseSession> socketSessionMap = new ConcurrentHashMap<>();private Map<String, AbstractProtocolHandler> handlerMap = new ConcurrentHashMap<>();@Overridepublic void init(Vertx vertx, Context context) {super.init(vertx, context);vertx.eventBus().registerDefaultCodec(BaseMessage.class, new GenericMessageCodec<BaseMessage>() {});vertx.eventBus().registerDefaultCodec(BaseSession.class, new GenericMessageCodec<BaseSession>() {});registerHandlers();}@Overridepublic void handle(NetSocket socket) {log.info("收到新的连接:" + socket);activeSocket(socket);socket.closeHandler(handler -> {log.info("连接已断开:" + socket);afterCloseSocket(socket);removeSession(socket);});socket.handler(data -> {try {BaseMessage message = new BaseMessage().setSocket(socket).setBuffer(data);if(!socketSessionMap.containsKey(socket)){String logicAddress = getLogicAddress(message);registerSession(logicAddress, socket);}decode(message);} catch (Exception e) {e.printStackTrace();log.error("解码处理失败,throw:" + e);}});}private BaseSession registerSession(String logicAddress, NetSocket socket) {BaseSession session = new BaseSession().setLogicAddress(logicAddress).setSocket(socket);logicAddressSessionMap.put(logicAddress, session);socketSessionMap.put(socket, session);return session;}private void removeSession(NetSocket socket) {BaseSession session = socketSessionMap.remove(socket);if(session != null){logicAddressSessionMap.remove(session.getLogicAddress());}}public BaseSession getSessionByLogicAddress(String logicAddress) {return logicAddressSessionMap.get(logicAddress);}protected abstract List<AbstractProtocolHandler> getHandlers();private void registerHandlers() {List<AbstractProtocolHandler> handlers = getHandlers();handlers.forEach(handler -> {handlerMap.put(handler.getMessageType(), handler);vertx.deployVerticle(handler);});}public AbstractProtocolHandler getHandlerByMessageType(String messageType) {return handlerMap.get(messageType);}protected abstract void decode(BaseMessage message);protected abstract String getLogicAddress(BaseMessage message);protected void activeSocket(NetSocket socket) {}protected void afterCloseSocket(NetSocket socket) {}
}

AbstractProtocolHandler

        处理器抽象类

/*** @author yan* @date 2023/9/14*/
public abstract class AbstractProtocolHandler<T, R> extends AbstractVerticle implements Handler<Message<T>>, InvokeHandler<R> {@Overridepublic void start() throws Exception {vertx.eventBus().consumer(getTopic(), this::handle);}protected abstract String getTopic();protected abstract String getMessageType();@Overridepublic void write(BaseSession session, Buffer buffer) {session.getSocket().write(buffer);}
}

InvokeHandler

/*** @author yan* @date 2023/9/14*/
public interface InvokeHandler<T> {/*** 根据传入参数获取buffer* @param req* @return*/Buffer getBuffer(T req);/*** 下发消息* @param session* @param buffer*/void write(BaseSession session, Buffer buffer);
}

InvokeAdapter

/*** 服务调用适配器** @author yan* @date 2023/9/14*/
public class InvokeAdapter {private ProtocolServerBootstrap bootstrap;public InvokeAdapter(ProtocolServerBootstrap bootstrap){this.bootstrap = bootstrap;}public void send(String protocolName, String logicAddress, String messageType, Object param) {AbstractProtocolCodec codec = bootstrap.getProtocolCodec(protocolName);BaseSession session = codec.getSessionByLogicAddress(logicAddress);if (session == null || session.getSocket() == null) {throw new RuntimeException("session is not exist or closed");}AbstractProtocolHandler handler = codec.getHandlerByMessageType(messageType);Buffer buffer = handler.getBuffer(param);handler.write(session, buffer);}}

华为协议实现

HuaweiCodec

        华为协议编解码器

/*** @author yan* @date 2023/9/12*/
@Slf4j
@Protocol("HUAWEI")
public class HuaweiCodec extends AbstractProtocolCodec {@Overrideprotected List<AbstractProtocolHandler> getHandlers() {return Arrays.asList(new HuaweiParamReadHandler(), new HuaweiParamWriteHandler(), new HuaweiParamWriteBatchHandler());}@Overrideprotected void decode(BaseMessage message) {String dataStr = ByteUtils.hexToHexString(message.getBuffer().getBytes());String messageType = dataStr.substring(14, 16);vertx.eventBus().publish(HuaweiMessageTypeConstants.getMessageTopic(messageType), message);}@Overrideprotected String getLogicAddress(BaseMessage message) {// 这里根据消息解析出对应的通信地址return "001";}
}

Handler处理器

/*** @author yan* @date 2023/9/13*/
@Slf4j
public class HuaweiParamReadHandler extends AbstractProtocolHandler<BaseMessage, Object> {@Overrideprotected String getTopic() {return HuaweiMessageTypeConstants.READ;}@Overridepublic void handle(Message<BaseMessage> message) {BaseMessage baseMessage = message.body();log.info("收到读参数命令返回:" + ByteUtils.hexToHexString(baseMessage.getBuffer().getBytes()));baseMessage.getSocket().write(baseMessage.getBuffer());}@Overridepublic String getMessageType() {return HuaweiMessageTypeConstants.READ;}@Overridepublic Buffer getBuffer(Object req) {log.info("发送read消息:" + req);return Buffer.buffer(new byte[]{0x11, 0x11, 0x11});}
}

测试调用

编写主类

/*** @author yan* @date 2023/9/11*/
@CodecScan("com.cdw.pv.iot.modules")
public class PvApplication {public static void main(String[] args) {ProtocolServerBootstrap bootstrap = new ProtocolServerBootstrap(PvApplication.class);Vertx vertx = Vertx.vertx();vertx.deployVerticle(bootstrap);// 开启一个http服务,模拟外部调用startHttpServer(vertx, bootstrap);}private static void startHttpServer(Vertx vertx, ProtocolServerBootstrap bootstrap) {InvokeAdapter adapter = new InvokeAdapter(bootstrap);HttpServer httpServer = vertx.createHttpServer();httpServer.requestHandler(handler -> {System.out.println("request请求:" + handler);adapter.send("HUAWEI", "001", HuaweiMessageTypeConstants.READ, "123");handler.response().end(Buffer.buffer("success"));}).listen(8899).onComplete(handler -> {if (handler.succeeded()) {System.out.println("http服务器启动成功");}});}
}

发生指令

      使用TCP连接根据发送指令  

服务调用

        模拟发送请求

        收到消息

总结

        以上就是通用IOT服务的整体架构了。

        这个只是一个基本版本的,后续可以根据实际情况做调整。

这篇关于基于Vertx实现可配置及可扩展的IOT服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/248563

相关文章

python生成随机唯一id的几种实现方法

《python生成随机唯一id的几种实现方法》在Python中生成随机唯一ID有多种方法,根据不同的需求场景可以选择最适合的方案,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习... 目录方法 1:使用 UUID 模块(推荐)方法 2:使用 Secrets 模块(安全敏感场景)方法

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

Spring Boot 结合 WxJava 实现文章上传微信公众号草稿箱与群发

《SpringBoot结合WxJava实现文章上传微信公众号草稿箱与群发》本文将详细介绍如何使用SpringBoot框架结合WxJava开发工具包,实现文章上传到微信公众号草稿箱以及群发功能,... 目录一、项目环境准备1.1 开发环境1.2 微信公众号准备二、Spring Boot 项目搭建2.1 创建

IntelliJ IDEA2025创建SpringBoot项目的实现步骤

《IntelliJIDEA2025创建SpringBoot项目的实现步骤》本文主要介绍了IntelliJIDEA2025创建SpringBoot项目的实现步骤,文中通过示例代码介绍的非常详细,对大家... 目录一、创建 Spring Boot 项目1. 新建项目2. 基础配置3. 选择依赖4. 生成项目5.

nginx 负载均衡配置及如何解决重复登录问题

《nginx负载均衡配置及如何解决重复登录问题》文章详解Nginx源码安装与Docker部署,介绍四层/七层代理区别及负载均衡策略,通过ip_hash解决重复登录问题,对nginx负载均衡配置及如何... 目录一:源码安装:1.配置编译参数2.编译3.编译安装 二,四层代理和七层代理区别1.二者混合使用举例

Java JDK1.8 安装和环境配置教程详解

《JavaJDK1.8安装和环境配置教程详解》文章简要介绍了JDK1.8的安装流程,包括官网下载对应系统版本、安装时选择非系统盘路径、配置JAVA_HOME、CLASSPATH和Path环境变量,... 目录1.下载JDK2.安装JDK3.配置环境变量4.检验JDK官网下载地址:Java Downloads

Linux下进程的CPU配置与线程绑定过程

《Linux下进程的CPU配置与线程绑定过程》本文介绍Linux系统中基于进程和线程的CPU配置方法,通过taskset命令和pthread库调整亲和力,将进程/线程绑定到特定CPU核心以优化资源分配... 目录1 基于进程的CPU配置1.1 对CPU亲和力的配置1.2 绑定进程到指定CPU核上运行2 基于

Linux下删除乱码文件和目录的实现方式

《Linux下删除乱码文件和目录的实现方式》:本文主要介绍Linux下删除乱码文件和目录的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下删除乱码文件和目录方法1方法2总结Linux下删除乱码文件和目录方法1使用ls -i命令找到文件或目录

Spring Boot spring-boot-maven-plugin 参数配置详解(最新推荐)

《SpringBootspring-boot-maven-plugin参数配置详解(最新推荐)》文章介绍了SpringBootMaven插件的5个核心目标(repackage、run、start... 目录一 spring-boot-maven-plugin 插件的5个Goals二 应用场景1 重新打包应用

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并