用简易代码拆解物联网IoT平台骨架

2024-09-05 14:52

本文主要是介绍用简易代码拆解物联网IoT平台骨架,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、功能实现

完成平台基础数据配置,MQ服务,数据流转(网关读取设备数据,自定义报文上传,触发器判断,自定义报文下发,网关写入设备数据)

JSON串转换过程

网关发送编码 {"ts":"2024-09-05T03:03:40.174Z","d":[{"tag":"40105","value":50}]}
IoT接收解码  {"temperature":50}
IoT触发规则(写入设备) {"ts":"2024-09-05T03:03:40.301Z","w":[{"tag":"00105","value":true}]}
网关接收解码 >> {"GateCode":"---","Equips":[{"EquipCode":"---","Tags":[{"TagCode":"00105","TagValue":true}]}]}

*** 麻雀虽小,五章俱全 ***

二、使用技术

HslCommunication:用于跟Modbus交互

LiteDB:非关系型数据库,用于存储设备模型(属性,触发器),存储告警日志,进行分页查询

Microsoft.ClearScript.V8:在js文件里自定义编解码规则,打通数据包格式

DynamicExpresso.Core:规则引擎,评判触发条件

MQTTnet:MQServer/MQClient,用于订阅/发布消息

三、实现效果 

【设备模拟器】- Modbus TCP

场景测试(变化上报):

1. 轮询Modbus设备点位 -> 检测到数据变化 -> 发布mq报文(自定义编码)

2. IoT订阅接收报文 -> 自定义解码 -> 规则引擎校验 -> 发布mq报文(自定义编码)

3. IoT订阅接收报文 -> 自定义解码 -> 写入点位值

场景测试(定时上报):

四、核心代码

【设备层】边缘网关

using HslCommunication;
using HslCommunication.Core;
using HslCommunication.Core.Device;
using HslCommunication.ModBus;
using IotDataFlow.Section.gate.model;
using IotDataFlow.Section.iot.model;
using IotDataFlow.Util;
using Microsoft.ClearScript.V8;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;namespace IotDataFlow.Section.gate
{/// <summary>/// 【设备层】边缘网关/// </summary>public class Gate{static GateModel GateModel_Time = new GateModel();static GateModel GateModel_Change = new GateModel();static Dictionary<string, EquipModel> DicTag = new Dictionary<string, EquipModel>();static MQClient MqttGate = new MQClient();static string ScriptPathGate = AppDomain.CurrentDomain.BaseDirectory + $"Script{Path.DirectorySeparatorChar}gate{Path.DirectorySeparatorChar}script1.js";static V8ScriptEngine Engine = new V8ScriptEngine();static string PubTopic;/// <summary>/// 配置基础信息:网关信息/// </summary>public static void ConfigBaseInfo(){try{ GateModel_Time.GateCode = "gw1";GateModel_Time.Equips = new List<EquipModel>(){new EquipModel(){EquipCode = "JY355",HostAddress = "127.0.0.1",PortNumber = 502,Tags = new List<TagModel>(){new TagModel() { TagCode = "40105", TagDT = (typeof(ushort)).Name.ToLower(), TagValue = 0 },new TagModel() { TagCode = "40106", TagDT = (typeof(ushort)).Name.ToLower(), TagValue = 0 },new TagModel() { TagCode = "00105", TagDT = (typeof(bool)).Name.ToLower(), TagValue = false },new TagModel() { TagCode = "00106", TagDT = (typeof(bool)).Name.ToLower(), TagValue = false }}}};// 添加地址与数据类型对应关系,用于写入时查询foreach (var equip in GateModel_Time.Equips){foreach (var tag in equip.Tags){if (!DicTag.ContainsKey(tag.TagCode)){DicTag.Add(tag.TagCode, equip);}}}// 变化上传ModelGateModel_Change.GateCode = GateModel_Time.GateCode;GateModel_Change.Equips = new List<EquipModel>() { new EquipModel() { Tags = new List<TagModel>() { new TagModel() } } };}catch (Exception ex){Logger.Error($"Gate,ConfigBaseInfo,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");}}/// <summary>/// 运行服务(定时轮询&定时上报)/// </summary>public static async Task Run(MqttModel mqModel, string subTopic, string pubTopic){try{PubTopic = pubTopic;var scriptContent = File.ReadAllText(ScriptPathGate);Engine.Execute(scriptContent);await MqttGate.InitConnect("127.0.0.1", mqModel.Port, mqModel.UserName, mqModel.Password, "mqgate", subTopic, SubCallBack);Task.Run(() => TskPoll());Task.Run(() => TskUpload());}catch (Exception ex){Logger.Error($"Gate,Run,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");}}/// <summary>/// 定时轮询/// </summary>static async void TskPoll(){ while (true){try{foreach (var equip in GateModel_Time.Equips){var hsl = GetHslBase(equip);foreach (var tag in equip.Tags){bool status = false; object result = null; string message = null;var startaddress = tag.TagCode; ConvertStandardModbusAddress2HSLAddress(ref startaddress);switch (tag.TagDT){case "uint16":{OperateResult<ushort> response = await hsl.ReadUInt16Async(startaddress);if (null != response) { status = response.IsSuccess; result = response.Content; message = response.ToMessageShowString(); } }break;case "boolean":{OperateResult<bool> response = await hsl.ReadBoolAsync(startaddress);if (null != response) { status = response.IsSuccess; result = response.Content; message = response.ToMessageShowString(); }}break;}if (!status){Logger.Error($"网关读取Modbus数据异常,address:{tag.TagCode},errmsg:{message}");}else{// 变化上报if (!tag.TagValue.ToString().Equals(result.ToString())){tag.TagValue = result; //GateModel_Change.Equips[0].EquipCode = equip.EquipCode;GateModel_Change.Equips[0].Tags[0] = tag; UploadData("变化上报", GateModel_Change);// {"ts":"2024-09-04T03:13:41.874Z","d":[{"tag":"40105","value":0}]}} }await Task.Delay(100);} }}catch (Exception ex){Logger.Error($"Gate,TskPoll,errmsg={ex.Message}\r\nstacktrace:{ex.StackTrace}");}await Task.Delay(500);}}/// <summary>/// 定时上报/// </summary>static async void TskUpload(){ while (true){await Task.Delay(60000);try{await UploadData("定时上报", GateModel_Time);// {"ts":"2024-09-03T07:57:58.471Z","d":[{"tag":"40105","value":"5"},{"tag":"40106","value":"6"}]} }catch (Exception ex){Logger.Error($"Gate,TskUpload,errmsg={ex.Message}\r\nstacktrace:{ex.StackTrace}");} }}/// <summary>/// 上报数据/// </summary>static async Task UploadData(string note, GateModel gateModel){string dataJson = JsonSerializer.Serialize(gateModel);var encodeJson = Engine.Invoke("encodeMqttPayload", dataJson);Logger.Info($"网关自定义编码 >> 备注=【{note}】 Topic主题=【{PubTopic}】 消息=【{encodeJson}】");await MqttGate.Publish(PubTopic, encodeJson.ToString());}/// <summary>/// 收到消息事件/// </summary>/// <param name="arg"></param>/// <returns></returns> static async Task SubCallBack(MqttApplicationMessageReceivedEventArgs arg){try{ string topic = arg.ApplicationMessage.Topic;string mqttPayload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment); Logger.Info($"网关接收报文 >> 客户端ID=【{arg.ClientId}】 Topic主题=【{topic}】 消息=【{mqttPayload}】");// 写入 {"ts":"2024-09-04T03:41:34.747Z","w":[{"tag":"10105"}]} var decodeJson = Engine.Invoke("decodeMqttPayload", mqttPayload);Logger.Info($"网关自定义解码 >> {decodeJson}");var model = JsonSerializer.Deserialize<GateModel>(decodeJson.ToString());foreach (var equip in model.Equips){foreach (var tag in equip.Tags){var tagCode = tag.TagCode;var address = tagCode; ConvertStandardModbusAddress2HSLAddress(ref address);var content = tag.TagValue.ToString(); OperateResult? operateResult = null; if (DicTag.ContainsKey(tagCode)){var tempDevice = DicTag[tagCode];var hsl = GetHslBase(tempDevice);var tempTag = tempDevice.Tags.Where(x => x.TagCode == tagCode).First();if (null != tempTag){switch (tempTag.TagDT){case "uint16":{operateResult = await hsl.WriteAsync(address, Convert.ToUInt16(content));}break;case "boolean":{operateResult = await hsl.WriteAsync(address, Convert.ToBoolean(content));}break;}if (null != operateResult && operateResult.IsSuccess){if (operateResult.IsSuccess){Logger.Info($"网关写入数据 >> 状态=【成功】 地址=【{tagCode}】 值=【{content}】");}else{Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】错误信息=【{operateResult.ToMessageShowString()}】");}}else{Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】 错误信息=【operateResult==null】");}}else{Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】 错误信息=【tempTag==null】");} }else{Logger.Error($"网关写入数据 >> 状态=【失败】 地址=【{tagCode}】 值=【{content}】错误信息=【DicTag字典查无数据】");}}}}catch (Exception ex){Logger.Error($"Gate,SubCallBack,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");}}/// <summary>/// 获取Hsl进行设备交互的对象/// </summary>static DeviceCommunication GetHslBase(EquipModel equip){var hsl = (DeviceCommunication)UidMgr.GetClient(equip.EquipCode);if (null == hsl){var temp = new ModbusTcpNet(equip.HostAddress, equip.PortNumber);temp.Station = 1;temp.AddressStartWithZero = false;temp.DataFormat = DataFormat.CDAB;temp.ReceiveTimeOut = 5000;hsl = temp;UidMgr.AddClient(equip.EquipCode, hsl);Logger.Info($"网关初始化设备交互 >> 设备编码=【{equip.EquipCode}】"); }return hsl;}/// <summary>/// 地址转换/// </summary>/// <param name="val"></param>static void ConvertStandardModbusAddress2HSLAddress(ref string val){if (!val.Contains("x=")){int code = 1;ushort address = Convert.ToUInt16(val);if (address >= 00001 && address <= 09999)// 00001 ~ 09999{code = 1;// 读线圈状态}else if (address >= 10001 && address <= 19999)// 10001 ~ 19999{code = 2;// 读离散输入状态(只读)}else if (address >= 30001 && address <= 39999)// 30001 ~ 39999 04指令{code = 4;// 读输入寄存器(只读)}else if (address >= 40001 && address <= 49999)// 40001 ~ 49999 03指令{code = 3;// 读保存寄存器}var temp = Convert.ToUInt16(val.Substring(1));val = $"x={code};{temp}";}}}
}

【应用层】管理后台

using IotDataFlow.Section.iot.model;
using IotDataFlow.Util;
using System.Text.Encodings.Web;
using System.Text.Json;namespace IotDataFlow.Section.iot
{/// <summary>/// 【应用层】管理后台/// </summary>public class Web{/// <summary>/// 配置基础数据:设备信息(物模型)/// </summary>public static void ConfigBaseInfo(string deviceId, string subTopic, string pubTopic){// 设备注册 var insert = false;var deviceModel = LitedbHelper.GetDeviceModel(deviceId);if (null == deviceModel){deviceModel = new DeviceModel(); insert = true;}deviceModel.SubTopic = subTopic;deviceModel.PubTopic = pubTopic;deviceModel.DeviceId = deviceId;deviceModel.DeviceName = "温湿度计";deviceModel.Propertys = new Dictionary<string, PropertyModel>(){["temperature"] = new PropertyModel() { DataType = (typeof(double)).Name.ToLower(), DataValue = 0 },["humidity"]= new PropertyModel() { DataType = (typeof(double)).Name.ToLower(), DataValue = 0 },["status"] = new PropertyModel() { DataType = (typeof(string)).Name.ToLower(), DataValue = "offline" },["switchstate1"] = new PropertyModel() { DataType = (typeof(bool)).Name.ToLower(), DataValue = false },["switchstate2"] = new PropertyModel() { DataType = (typeof(bool)).Name.ToLower(), DataValue = false }};deviceModel.Triggers = new List<TriggerModel>(){new TriggerModel() { TriggerId = 1, Description = "温度超过阈值警报", Condition = "temperature > 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate1"] = true } } },new TriggerModel() { TriggerId = 2, Description = "温度正常", Condition = "temperature <= 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate1"] = false } } },new TriggerModel() { TriggerId = 3, Description = "湿度低于阈值警报", Condition = "humidity > 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate2"] = true } } },new TriggerModel() { TriggerId = 4, Description = "湿度正常", Condition = "humidity <= 30", Action = new ActionModel(){ Type = ActionType.WriteDevice, Arg = new Dictionary<string, object>() { ["switchstate2"] = false } } },new TriggerModel() { TriggerId = 5, Description = "温度和湿度都正常", Condition = "temperature <= 30 and humidity >= 30", Action = new ActionModel(){ Type = ActionType.WxNotice, Arg = new Dictionary<string, object>() { ["openid"] = "112233" } } },new TriggerModel() { TriggerId = 6, Description = "继电器1开", Condition = "switchstate1 == true", Action = null },new TriggerModel() { TriggerId = 7, Description = "继电器2开", Condition = "switchstate2 == true", Action = null }};if (insert){LitedbHelper.InsertDeviceModel(deviceModel);}else{LitedbHelper.UpdateDeviceModel(deviceModel);}Logger.Info($"Web基础信息:{ JsonSerializer.Serialize(deviceModel, new JsonSerializerOptions { Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping })}"); }}
}

【服务层】通信服务

using DynamicExpresso;
using IotDataFlow.Section.iot.model;
using IotDataFlow.Util;
using Microsoft.ClearScript.V8;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;namespace IotDataFlow.Section.iot
{/// <summary>/// 【服务层】通信服务/// </summary>public class Service{static MQClient mqIot = new MQClient();static string scriptPathIot = AppDomain.CurrentDomain.BaseDirectory + $"Script{Path.DirectorySeparatorChar}iot{Path.DirectorySeparatorChar}script1.js";static V8ScriptEngine engine = new V8ScriptEngine();/// <summary>/// 运行服务:MQ服务/// </summary>public static async Task Run(MqttModel mqModel, string subTopic){string scriptContent = File.ReadAllText(scriptPathIot);engine = new V8ScriptEngine();engine.Execute(scriptContent);// 启动mqttserverawait MQServer.Start(mqModel.Port, mqModel.UserName, mqModel.Password);// 监听订阅信息await mqIot.InitConnect("127.0.0.1", mqModel.Port, mqModel.UserName, mqModel.Password, "mqiot", subTopic, SubCallBack); }/// <summary>/// IOT收到消息事件/// </summary>/// <param name="arg"></param>/// <returns></returns> static async Task SubCallBack(MqttApplicationMessageReceivedEventArgs arg){try{// mq接收string topic = arg.ApplicationMessage.Topic;string mqttPayload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);Logger.Info($"IoT接收报文 >> 客户端ID=【{arg.ClientId}】 Topic主题=【{topic}】 消息=【{mqttPayload}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");var encodeJson = engine.Invoke("encodeMqttPayload", mqttPayload);// {"temperature":5,"humidity":6} Logger.Info($"IoT自定义解码 >> {encodeJson}");// 规则引擎var deviceId = topic.Split("/").Last();var deviceModel = LitedbHelper.GetDeviceModel(deviceId);var propertys = deviceModel.Propertys;var rules = deviceModel.Triggers;if (null == deviceModel || null == propertys || null == rules) return;var datas = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(encodeJson.ToString());var lstParam = new List<Parameter>();var lstJosnProperty = new List<string>();foreach (var data in datas){var propertyKey = data.Key.ToLower();if (propertys.ContainsKey(propertyKey)){// 收集json报文中含有的属性lstJosnProperty.Add(propertyKey);// 填充表达式的参数var propertyModel = propertys[propertyKey];switch (propertyModel.DataType){case "double": lstParam.Add(new Parameter(data.Key, data.Value.GetDouble())); break;case "string": lstParam.Add(new Parameter(data.Key, data.Value.GetString())); break;case "boolean": lstParam.Add(new Parameter(data.Key, data.Value.GetBoolean())); break;}}}// 评估规则  var interpreter = new Interpreter();  var filteredRules = rules.Where(rule => lstJosnProperty.Any(property => rule.Condition.Contains(property))).ToList();foreach (var rule in filteredRules){try{var condition = rule.Condition.Replace(" and ", " && ").Replace(" or ", " || ");bool result = (bool)interpreter.Eval(condition, lstParam.ToArray());if (result){// 添加记录 LitedbHelper.InsertAlarm(new AlarmModel() { Description= rule.Description });if (null != rule.Action && null != rule.Action.Arg){string argJson = JsonSerializer.Serialize(rule.Action.Arg);switch (rule.Action.Type){case ActionType.WxNotice:{Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【微信通知】 执行参数=【{argJson}】");}break;case ActionType.WriteDevice:{var decodeJson = engine.Invoke("decodeMqttPayload", argJson);// {"temperature":5,"humidity":6}Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【写入设备】 执行参数=【{argJson}】 自定义解码=【{decodeJson}】");mqIot.Publish(deviceModel.PubTopic, decodeJson.ToString());}break;}}else{Logger.Info($"IoT触发规则 >> {rule.Description} 联动场景=【无】");}}}catch (Exception ex){//Logger.Error($"Service,Condition,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");}}}catch (Exception ex){Logger.Error($"Service,SubCallBack,errmsg:{ex.Message}\r\nstacktrace:{ex.StackTrace}");}}}
}

编解码文件

// =======================================================================================
// ** 脚本名称:script2.js
// ** 输入Json:{"GateCode":"gw1","Equips":[{"EquipCode":"JY355","Tags":[{"TagCode":"40105","TagValue":"5"},{"TagCode":"40106","TagValue":"6"}]},{"DeviceCode":"JY356","Tags":[{"TagCode":"40107","TagValue":"7"},{"TagCode":"40108","TagValue":"8"}]}]}
// ** 输出Json:{"clientid":"gw1","time":"2024-08-30 11:34:35","JY355":[{"tag":"40105","value":"5"},{"tag":"40106","value":"6"}],"JY356":[{"tag":"40107","value":"7"},{"tag":"40108","value":"8"}]}
// =======================================================================================
function createMqttPayload(dataJson) { let gate = JSON.parse(dataJson);let clientid = gate.GateCode;let device = gate.Devices;let result = {clientid: gate.GateCode, time: new Date(),};  device.forEach(function (d) {let equipCode = d.EquipCode;let tag = d.Tags;if (!result[equipCode]) {result[equipCode] = [];}  tag.forEach(function (t) { result[equipCode].push({tag: t.TagCode,value: t.TagValue});   }); });return JSON.stringify(result);
} 

这篇关于用简易代码拆解物联网IoT平台骨架的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1139268

相关文章

Java集合之Iterator迭代器实现代码解析

《Java集合之Iterator迭代器实现代码解析》迭代器Iterator是Java集合框架中的一个核心接口,位于java.util包下,它定义了一种标准的元素访问机制,为各种集合类型提供了一种统一的... 目录一、什么是Iterator二、Iterator的核心方法三、基本使用示例四、Iterator的工

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

JS纯前端实现浏览器语音播报、朗读功能的完整代码

《JS纯前端实现浏览器语音播报、朗读功能的完整代码》在现代互联网的发展中,语音技术正逐渐成为改变用户体验的重要一环,下面:本文主要介绍JS纯前端实现浏览器语音播报、朗读功能的相关资料,文中通过代码... 目录一、朗读单条文本:① 语音自选参数,按钮控制语音:② 效果图:二、朗读多条文本:① 语音有默认值:②

Vue实现路由守卫的示例代码

《Vue实现路由守卫的示例代码》Vue路由守卫是控制页面导航的钩子函数,主要用于鉴权、数据预加载等场景,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、概念二、类型三、实战一、概念路由守卫(Navigation Guards)本质上就是 在路

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni

JAVA实现Token自动续期机制的示例代码

《JAVA实现Token自动续期机制的示例代码》本文主要介绍了JAVA实现Token自动续期机制的示例代码,通过动态调整会话生命周期平衡安全性与用户体验,解决固定有效期Token带来的风险与不便,感兴... 目录1. 固定有效期Token的内在局限性2. 自动续期机制:兼顾安全与体验的解决方案3. 总结PS

C#中通过Response.Headers设置自定义参数的代码示例

《C#中通过Response.Headers设置自定义参数的代码示例》:本文主要介绍C#中通过Response.Headers设置自定义响应头的方法,涵盖基础添加、安全校验、生产实践及调试技巧,强... 目录一、基础设置方法1. 直接添加自定义头2. 批量设置模式二、高级配置技巧1. 安全校验机制2. 类型

Python屏幕抓取和录制的详细代码示例

《Python屏幕抓取和录制的详细代码示例》随着现代计算机性能的提高和网络速度的加快,越来越多的用户需要对他们的屏幕进行录制,:本文主要介绍Python屏幕抓取和录制的相关资料,需要的朋友可以参考... 目录一、常用 python 屏幕抓取库二、pyautogui 截屏示例三、mss 高性能截图四、Pill

使用MapStruct实现Java对象映射的示例代码

《使用MapStruct实现Java对象映射的示例代码》本文主要介绍了使用MapStruct实现Java对象映射的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、什么是 MapStruct?二、实战演练:三步集成 MapStruct第一步:添加 Mave

Java抽象类Abstract Class示例代码详解

《Java抽象类AbstractClass示例代码详解》Java中的抽象类(AbstractClass)是面向对象编程中的重要概念,它通过abstract关键字声明,用于定义一组相关类的公共行为和属... 目录一、抽象类的定义1. 语法格式2. 核心特征二、抽象类的核心用途1. 定义公共接口2. 提供默认实