使用 LF Edge eKuiper 将物联网流处理数据写入 Databend

2023-10-18 20:20

本文主要是介绍使用 LF Edge eKuiper 将物联网流处理数据写入 Databend,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

LF Edge eKuiper

LF Edge eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 的主要目标是在边缘端提供一个流媒体软件框架(类似于 Apache Flink (opens new window))。eKuiper 的规则引擎允许用户提供基于 SQL 或基于图形(类似于 Node-RED)的规则,在几分钟内创建物联网边缘分析应用。具体介绍可以参考 [LF Edge eKuiper - 超轻量物联网边缘流处理软件(https://ekuiper.org/docs/zh/latest/)。 

Databend Sql Sink

eKuiper 支持通过 Golang 或者 Python 在源 (Source)SQL 函数目标 (Sink) 三个方面的扩展,通过支持不同的 Sink,允许用户将分析结果发送到不同的扩展系统中。Databend 作为 Sink 也被集成到了 eKuiper plugin 当中,下面通过一个案例来展示如何使用 eKuiper 将物联网流处理数据写入 Databend。

编译 eKuiper 和 Databend Sql Plugin

eKuiper

git clone https://github.com/lf-edge/ekuiper & cd ekuiper
make

Databend Sql Plugin

go build -trimpath --buildmode=plugin -tags databend -o plugins/sinks/Sql.so extensions/sinks/sql/sql.go

编译后的 sink plugin 拷贝到 build 目录:

cp plugins/sinks/Sql.so _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64/plugins/sinks

Databend 建表

在 Databend 中先创建目标表 ekuiper_test:

create table ekuiper_test (name string,size bigint,id bigint);

启动 eKuiperd

cd _build/kuiper-1.11.1-18-g42d9147f-darwin-arm64 
./bin/kuiperd

服务正常启动:

创建流(stream) 和 规则 (rule)

eKuiper 提供了两种管理各种流、规则,目标端的方式,一种是通过 ekuiper-manager 的 [docker image](https://hub.docker.com/r/lfedge/ekuiper) 启动可视化管理界面,一种是通过 CLI 工具来管理。这里我们使用 CLI。

创建 stream

流是 eKuiper 中数据源连接器的运行形式。它必须指定一个源类型来定义如何连接到外部资源。这里我们创建一个流,从 json 文件数据源中获取数据,并发送到 eKuiper 中。

首先配置文件数据源,连接器的配置文件位于 /etc/sources/file.yaml

default:# 文件的类型,支持 json, csv 和 linesfileType: json# 文件以 eKuiper 为根目录的目录或文件的绝对路径。# 请勿在此处包含文件名。文件名应在流数据源中定义path: data# 读取文件的时间间隔,单位为ms。如果只读取一次,则将其设置为 0interval: 0# 读取后,两条数据发送的间隔时间sendInterval: 0# 是否并行读取目录中的文件parallel: false# 文件读取后的操作# 0: 文件保持不变# 1: 删除文件# 2: 移动文件到 moveTo 定义的位置actionAfterRead: 0# 移动文件的位置, 仅用于 actionAfterRead 为 2 的情况moveTo: /tmp/kuiper/moved# 是否包含文件头,多用于 csv。若为 true,则第一行解析为文件头。hasHeader: false# 定义文件的列。如果定义了文件头,该选项将被覆盖。# columns: [id, name]# 忽略开头多少行的内容。ignoreStartLines: 0# 忽略结尾多少行的内容。最后的空行不计算在内。ignoreEndLines: 0# 使用指定的压缩方法解压缩文件。现在支持`gzip`、`zstd` 方法。decompression: ""

使用 CLI 创建 steam 名为 stream1:

./bin/kuiper create stream stream1 '(id BIGINT, name STRING,size BIGINT) WITH (DATASOURCE="test.json", FORMAT="json", TYPE="file");'

Json 文件的内容为:

[{"id": 1,"size":100, "name": "John Doe"},{"id": 2,"size":200, "name": "Jane Smith"},{"id": 3,"size":300, "name": "Kobe Brant"},{"id": 4,"size":400, "name": "Alen Iverson"}
]

创建 Databend Sink Rule

一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。eKuiper 有两种方法来定义规则的业务逻辑。要么使用 SQL / 动作组合,要么使用新增加的图 API。

这里我们通过指定 sql 和 actions 属性,以声明的方式定义规则的业务逻辑。其中,sql 定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action 路由到多个位置。

规则由 JSON 定义,下面是准备创建的规则 myRule.json:

{"id": "myRule","sql": "SELECT id, name from stream1","actions": [{"log": {},"sql": {"url": "databend://databend:databend@localhost:8000/default?sslmode=disable","table": "ekuiper_test","fields": ["id","name"]}}]
}

执行 CLI 创建规则:

./bin/kuiper create rule myRule -f myRule.json

可以查看所创建规则的运行状态:

./bin/kuiper getstatus rule myRule

规则创建后,会立即将符合规则条件的数据发送到目标端,此时我们查看 Databend 的 ekuiper_test 表,可以看到文件数据源中的数据已经被写入到 Databend:

可以看到由于我们的规则 SQL 中只指定了 idname 字段,所以这里只有这两个字段被写入。

结论

eKuiper 是 EMQ 旗下的一款流处理软件,其体积小、功能强大,在工业物联网、车辆网、公共数据分析等很多场景中得到广泛使用。本文介绍如何使用 eKuiper 将物联网流处理数据写入 Databend。

这篇关于使用 LF Edge eKuiper 将物联网流处理数据写入 Databend的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/234957

相关文章

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

C++中detach的作用、使用场景及注意事项

《C++中detach的作用、使用场景及注意事项》关于C++中的detach,它主要涉及多线程编程中的线程管理,理解detach的作用、使用场景以及注意事项,对于写出高效、安全的多线程程序至关重要,下... 目录一、什么是join()?它的作用是什么?类比一下:二、join()的作用总结三、join()怎么

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

mybatis中resultMap的association及collectio的使用详解

《mybatis中resultMap的association及collectio的使用详解》MyBatis的resultMap定义数据库结果到Java对象的映射规则,包含id、type等属性,子元素需... 目录1.reusltmap的说明2.association的使用3.collection的使用4.总

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be

Java中使用 @Builder 注解的简单示例

《Java中使用@Builder注解的简单示例》@Builder简化构建但存在复杂性,需配合其他注解,导致可变性、抽象类型处理难题,链式编程非最佳实践,适合长期对象,避免与@Data混用,改用@G... 目录一、案例二、不足之处大多数同学使用 @Builder 无非就是为了链式编程,然而 @Builder

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

mybatis-plus QueryWrapper中or,and的使用及说明

《mybatis-plusQueryWrapper中or,and的使用及说明》使用MyBatisPlusQueryWrapper时,因同时添加角色权限固定条件和多字段模糊查询导致数据异常展示,排查发... 目录QueryWrapper中or,and使用列表中还要同时模糊查询多个字段经过排查这就导致只要whe