Kafka connect 构建ETL方案

2024-05-26 08:08
文章标签 构建 connect 方案 kafka etl

本文主要是介绍Kafka connect 构建ETL方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一.背景介绍

Kafka connect是Confluent公司(当时开发出Apache Kafka的核心团队成员出来创立的新公司)开发的confluent platform的核心功能.

大家都知道现在数据的ETL过程经常会选择kafka作为消息中间件应用在离线和实时的使用场景中,而kafka的数据上游和下游一直没有一个

无缝衔接的pipeline来实现统一,比如会选择flume或者logstash采集数据到kafka,然后kafka又通过其他方式pull或者push数据到目标存储.

而kafka connect旨在围绕kafka构建一个可伸缩的,可靠的数据流通道,通过kafka connect可以快速实现大量数据进出kafka从而和其

他源数据源或者目标数据源进行交互构造一个低延迟的数据pipeline.给个图更直观点,大家感受下.

0

二.Kafka-connect快速配置

这里Confluent官方很贴心的提供了一个集成的镜像以便quickstart,如下链接

https://s3-us-west-2.amazonaws.com/confluent-files/kafka_connect_blog.ova

这是存储在Amazon S3上的,直接点击即可下载.这里我使用VMWare直接打开,刚开始会提示一个错误,不用管它直接点击重试即可

系统加载的过程中会默认初始化虚拟机的网络配置,这里我建议提前设置好桥接网络,让该虚拟机使用桥接网络初始化.

加载成功后,登录进入该Ubuntu系统,默认的用户名和密码都是:vagrant.

然后ls查看vagrant用户目录,查看几个关键的脚本内容后,我分别介绍它们的功能

1>setup.sh:自动下载mysql,mysql jdbc driver,配置好mysql以及做为hive的metastore

2>start.sh:启动confluent platform,kafka,hadoop,hive相关服务

3>clean_up.sh:和start.sh相反的,会关闭掉所有的服务,而且还会删除掉所有的数据(例如hdfs namenode和 datanode的数据,其实相当于fs format了)

那么很明显,第一步肯定是执行setup.sh,这里执行后会报错如下

setupFailed

这里无法下载相关的软件包,好吧,那么我们需要更新一下下载源的索引,执行如下命令

sudo apt-get update

更新完毕后再次执行setup.sh安装好mysql,hive等服务

紧接着执行start.sh来启动上述服务,启动后应该有如下进程,这是一个伪分布式节点

jpsService

对了,虚拟机各个服务(例如hive,zookeeper等),配置文件和日志文件在路径/mnt/下,组件的安装位置位于/opt下

三.Kafka connect快速使用

配置完以后就可以准备使用kafka-connect来快速构建一个数据pipeline了,如下图所示

wholePic

整个过程是将数据以mysql作为数据源,将数据通过kafka connect快速ETL到hive中去.注意这里图中没画kafka

但是实际上是包含在kafka connect里面的,话不多说,开始使用

1>Mysql数据准备

执行如下命令

复制代码

$ mysql -u root --password="mypassword"
mysql> CREATE DATABASE demo;
mysql> USE demo;
mysql> CREATE TABLE users (->   id serial NOT NULL PRIMARY KEY,->   name varchar(100),->   email varchar(200),->   department varchar(200),->   modified timestamp default CURRENT_TIMESTAMP NOT NULL,->   INDEX `modified_index` (`modified`)-> );
mysql> INSERT INTO users (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering');
mysql> INSERT INTO users (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales');
mysql> exit;

复制代码

注意上面第一行,--password="mypassword" ,对,你没看错,这里虚拟机mysql的root默认密码就是mypassword,

强迫症患者请自行更改.随后建库,建表,插入数据.

2>关键概念准备

这里我快速普及一下参考官方文档理解的一些关键概念.

kafka connector:kafka connector是kafka connect的关键组成部分,它是一个逻辑上的job,用于在kafka和其他系统之间拷贝数据,比如

从上游系统拷贝数据到kafka,或者从kafka拷贝数据到下游系统

Tasks:每个kafka connector可以初始化一组task进行数据的拷贝

Workers:逻辑上包含kafka connector和tasks用来调度执行具体任务的进程,具体执行时分为standalone模式和distributed模式

见下图,这个是kafka上游的数据stream过来后,定义好对应的kafka connector后,分解为一组tasks然后push数据到kafka的不同topic

kafkaConnectors

3>利用Kafka-connect摄取数据

主要是通过配置来实现从mysql摄取数据到kafka,然后按照topic来获取数据写入hdfs,命令如下

connect-standalone /mnt/etc/connect-avro-standalone.properties \/mnt/etc/mysql.properties /mnt/etc/hdfs.properties &

注意上面这些properties文件是虚拟机已经事先配置好的,可以直接执行实现数据的摄取

当前使用的kafka connect的standalone模式,当然还有distributed模式后续可以尝试

上面的那条命令的格式是这样:

connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

主要解释一下connect-standalone后面的参数

worker.properties:就是上面提到过的worker进程的配置文件,可以定义kafka cluster的相关信息以及数据序列化的格式.

随后的一些参数就是kafka connector的配置参数了,比如上面的mysql.properties定义了一个kafka jdbc connector,用来同步mysql数据到kafka

最后一个hdfs.properties是kafka hdfs connector的配置文件,用来消费kafka topic数据push到hdfs.

那么执行这条命令后就可以将mysql的数据通过kafka connect快速ETL到hdfs了.

最后可以通过hive创建外表映射hdfs上的数据文件,然后在hive中查看对应数据,如下

复制代码

$ hive
hive> SHOW TABLES;
OK
test_jdbc_users
hive> SELECT * FROM test_jdbc_users;
OK
1 alice alice@abc.com engineering 1450305345000
2 bob   bob@abc.com   sales       1450305346000

复制代码

四.Kafka connect使用总结

1>Kafka connect的使用其实就是配置不同的kafka connectors,这里大家可以把kafka作为中间组件,然后可以类比flume理解,kafka上游的

connector其实就是fllume的source从上游数据源sink到kafka,kafka的下游connector其实就是flume的source是kafka,sink到下游系统.

2>Kafka connect的数据pipeline要打通,它要求数据遵守confluent自己的一套通用的schema机制,细心的同学会发现上面jps后会有个进程名

SchemaRegistryMain,这里官方默认使用Avro格式进出Kafka,所以要留意worker.properties文件的配置信息.

3>我在使用中没有发现Flume 相关的connector,因此很好奇它应该是没有实现上游flume conector的属性配置。问题应该出在Flume的数据是基

于event的,而和上面2中所说的schema定义格式没有很好的兼容.

4>kafka connect的distributed模式应该更实用,随后会尝试,以及confluent所支持的实时处理流kafka streams.

参考资料:http://docs.confluent.io/2.0.0/platform.html

这篇关于Kafka connect 构建ETL方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1003903

相关文章

Knife4j+Axios+Redis前后端分离架构下的 API 管理与会话方案(最新推荐)

《Knife4j+Axios+Redis前后端分离架构下的API管理与会话方案(最新推荐)》本文主要介绍了Swagger与Knife4j的配置要点、前后端对接方法以及分布式Session实现原理,... 目录一、Swagger 与 Knife4j 的深度理解及配置要点Knife4j 配置关键要点1.Spri

SQLite3 在嵌入式C环境中存储音频/视频文件的最优方案

《SQLite3在嵌入式C环境中存储音频/视频文件的最优方案》本文探讨了SQLite3在嵌入式C环境中存储音视频文件的优化方案,推荐采用文件路径存储结合元数据管理,兼顾效率与资源限制,小文件可使用B... 目录SQLite3 在嵌入式C环境中存储音频/视频文件的专业方案一、存储策略选择1. 直接存储 vs

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过

Springboot3+将ID转为JSON字符串的详细配置方案

《Springboot3+将ID转为JSON字符串的详细配置方案》:本文主要介绍纯后端实现Long/BigIntegerID转为JSON字符串的详细配置方案,s基于SpringBoot3+和Spr... 目录1. 添加依赖2. 全局 Jackson 配置3. 精准控制(可选)4. OpenAPI (Spri

关于跨域无效的问题及解决(java后端方案)

《关于跨域无效的问题及解决(java后端方案)》:本文主要介绍关于跨域无效的问题及解决(java后端方案),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录通用后端跨域方法1、@CrossOrigin 注解2、springboot2.0 实现WebMvcConfig

基于Python构建一个高效词汇表

《基于Python构建一个高效词汇表》在自然语言处理(NLP)领域,构建高效的词汇表是文本预处理的关键步骤,本文将解析一个使用Python实现的n-gram词频统计工具,感兴趣的可以了解下... 目录一、项目背景与目标1.1 技术需求1.2 核心技术栈二、核心代码解析2.1 数据处理函数2.2 数据处理流程

在Java中将XLS转换为XLSX的实现方案

《在Java中将XLS转换为XLSX的实现方案》在本文中,我们将探讨传统ExcelXLS格式与现代XLSX格式的结构差异,并为Java开发者提供转换方案,通过了解底层原理、性能优势及实用工具,您将掌握... 目录为什么升级XLS到XLSX值得投入?实际转换过程解析推荐技术方案对比Apache POI实现编程

Python FastMCP构建MCP服务端与客户端的详细步骤

《PythonFastMCP构建MCP服务端与客户端的详细步骤》MCP(Multi-ClientProtocol)是一种用于构建可扩展服务的通信协议框架,本文将使用FastMCP搭建一个支持St... 目录简介环境准备服务端实现(server.py)客户端实现(client.py)运行效果扩展方向常见问题结

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

详解如何使用Python从零开始构建文本统计模型

《详解如何使用Python从零开始构建文本统计模型》在自然语言处理领域,词汇表构建是文本预处理的关键环节,本文通过Python代码实践,演示如何从原始文本中提取多尺度特征,并通过动态调整机制构建更精确... 目录一、项目背景与核心思想二、核心代码解析1. 数据加载与预处理2. 多尺度字符统计3. 统计结果可