【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse

2023-12-16 13:28

本文主要是介绍【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse

  • 1)需求分析
  • 2)功能实现
  • 3)准备工作
    • 3.1.Kafka
    • 3.2.ClickHouse
  • 4)Flink-Sql
  • 5)验证

1)需求分析

1、数据源为 Kafka,定义 Kafka-Topic 为动态临时视图表。

2、写入到 ClickHouse,自定义 Sink 表。

3、source 和 sink 都使用 Flink 集成的 Connector

2)功能实现

导入ClickHouse连接器

<dependency><groupId>com.aliyun</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.14.0</version>
</dependency>

如果在服务器上执行,需要将 jar 放到 Flink 的 lib 目录下。

3)准备工作

3.1.Kafka

1、创建好Topic

2、准备测试数据

{"id": 1,"eventId": "TEST123","eventStDt": "2022-11-3023:37:49","bak6": "测试","bak7": "https://test?user","businessId": "17279811111111111111111111111111","phone": "12345678910","bak1": "1234","bak2": "2022-12-0100:00:00","bak13": "17279811111111111111111111111111","bak14": "APP","bak11": "TEST"
}

3.2.ClickHouse

1、创建表(此处我们使用生产环境中较为常用的 cluster 集群模式建表)

注意集群模式要创建两次表,一次为 local 本地表,一次为 cluster 集群表。

  • local
CREATE TABLE test.kafka2ck_test_local on cluster test_cluster 
(`id` UInt32,`eventId` LowCardinality(Nullable(String)),`eventStDt` LowCardinality(Nullable(String)),`bak6` LowCardinality(Nullable(String)),`bak7` LowCardinality(Nullable(String)),`businessId` LowCardinality(Nullable(String)),`phone` LowCardinality(Nullable(String)),`bak1` LowCardinality(Nullable(String)),`bak2` LowCardinality(Nullable(String)),`bak13` LowCardinality(Nullable(String)),`bak14` LowCardinality(Nullable(String)),`bak11` LowCardinality(Nullable(String))
)
ENGINE = ReplicatedMergeTree
PARTITION BY id
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192;
  • cluster
CREATE TABLE test.kafka2ck_test on cluster test_cluster 
(`id` UInt32,`eventId` LowCardinality(Nullable(String)),`eventStDt` LowCardinality(Nullable(String)),`bak6` LowCardinality(Nullable(String)),`bak7` LowCardinality(Nullable(String)),`businessId` LowCardinality(Nullable(String)),`phone` LowCardinality(Nullable(String)),`bak1` LowCardinality(Nullable(String)),`bak2` LowCardinality(Nullable(String)),`bak13` LowCardinality(Nullable(String)),`bak14` LowCardinality(Nullable(String)),`bak11` LowCardinality(Nullable(String))
)
ENGINE = Distributed('test_cluster', 'test', 'kafka2ck_test_local', rand());

4)Flink-Sql

  • source
CREATE TABLE source_kafka_test (id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING) WITH ('connector' = 'kafka','topic' = 'test','format'='json','properties.bootstrap.servers' = '${kafka-bootstrap-server}','properties.group.id' = 'test01','scan.startup.mode' = 'earliest-offset','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka'
);
  • sink
CREATE TABLE sink_ck_test (id INT,eventId STRING,eventStDt STRING,bak6 STRING,bak7 STRING,businessId STRING,phone STRING,bak1 STRING,bak2 STRING,bak13 STRING,bak14 STRING,bak11 STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'clickhouse','url' = 'jdbc:clickhouse://123.1.1.1:9090','database-name'='test','table-name' = 'kafka2ck_test_local','username' = 'test','password' = '123456','sink.batch-size' = '100','sink.flush-interval' = '1000','sink.max-retries' = '3'
);
  • insert
insert into sink_ck_test select * from source_kafka_test;

5)验证

在 Kafka 中写入对应 ClickHouse 格式的 Json 测试数据,观察 ClickHouse 中是否有数据写入。

{"id": 1,"eventId": "TEST123","eventStDt": "2022-11-3023:37:49","bak6": "测试","bak7": "https://test?user","businessId": "17279811111111111111111111111111","phone": "12345678910","bak1": "1234","bak2": "2022-12-0100:00:00","bak13": "17279811111111111111111111111111","bak14": "APP","bak11": "TEST"
}

这篇关于【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/500635

相关文章

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.