基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件

2023-10-12 10:59

本文主要是介绍基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件

  • 一、下载Confluent Kafka
  • 二、配置文件connect-distributed.properties
  • 三、启动脚本connect-distributed
  • 四、启动Kafka Connect集群
  • 五、加载debezium插件
  • 六、总结和延伸

一、下载Confluent Kafka

Confluent Kafka的下载地址:

  • https://www.confluent.io/download/

下载社区免费版本:

在这里插入图片描述

二、配置文件connect-distributed.properties

核心参数如下所示:

  • /data/src/confluent-7.3.3/etc/schema-registry/connect-distributed.properties
bootstrap.servers=realtime-kafka-001:9092,realtime-kafka-003:9092,realtime-kafka-002:9092group.id=datasight-confluent-test-debezium-cluster-statuskey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=true
value.converter.schemas.enable=trueconfig.storage.topic=offline_confluent_test_debezium_cluster_connect_configs
offset.storage.topic=offline_confluent_test_debezium_cluster_connect_offsets
status.storage.topic=offline_confluent_test_debezium_cluster_connect_statusesconfig.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3offset.storage.partitions=25
status.storage.partitions=5
config.storage.partitions=1internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true#rest.host.name=0.0.0.0
#rest.port=8083#rest.advertised.host.name=0.0.0.0
#rest.advertised.port=8083plugin.path=/data/service/debezium/connectors2

三、启动脚本connect-distributed

  • /data/src/confluent-7.3.3/bin/connect-distributed

  • connect-distributed的脚本内容如下所示,可以不需要修改

  • 如果需要导出kafka connector的jmx,则需要设置jmx导出端口和jmx导出器,详细的部署方式可以参考博主下面这篇技术博客:

    • Debezium系列之:安装jmx导出器监控debezium指标
if [ $# -lt 1 ];
thenecho "USAGE: $0 [-daemon] connect-distributed.properties"exit 1
fibase_dir=$(dirname $0)###
### Classpath additions for Confluent Platform releases (LSB-style layout)
###
#cd -P deals with symlink from /bin to /usr/bin
java_base_dir=$( cd -P "$base_dir/../share/java" && pwd )# confluent-common: required by kafka-serde-tools
# kafka-serde-tools (e.g. Avro serializer): bundled with confluent-schema-registry package
for library in "confluent-security/connect" "kafka" "confluent-common" "kafka-serde-tools" "monitoring-interceptors"; dodir="$java_base_dir/$library"if [ -d "$dir" ]; thenclasspath_prefix="$CLASSPATH:"if [ "x$CLASSPATH" = "x" ]; thenclasspath_prefix=""fiCLASSPATH="$classpath_prefix$dir/*"fi
doneif [ "x$KAFKA_LOG4J_OPTS" = "x" ]; thenLOG4J_CONFIG_DIR_NORMAL_INSTALL="/etc/kafka"LOG4J_CONFIG_NORMAL_INSTALL="${LOG4J_CONFIG_DIR_NORMAL_INSTALL}/connect-log4j.properties"LOG4J_CONFIG_DIR_ZIP_INSTALL="$base_dir/../etc/kafka"LOG4J_CONFIG_ZIP_INSTALL="${LOG4J_CONFIG_DIR_ZIP_INSTALL}/connect-log4j.properties"if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layoutKAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL} -Dlog4j.config.dir=${LOG4J_CONFIG_DIR_NORMAL_INSTALL}"elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layoutKAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL} -Dlog4j.config.dir=${LOG4J_CONFIG_DIR_ZIP_INSTALL}"else # Fallback to normal defaultKAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties -Dlog4j.config.dir=$base_dir/../config"fi
fi
export KAFKA_LOG4J_OPTSif [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fiEXTRA_ARGS=${EXTRA_ARGS-'-name connectDistributed'}COMMAND=$1
case $COMMAND in-daemon)EXTRA_ARGS="-daemon "$EXTRA_ARGSshift;;*);;
esacexport CLASSPATH
exec $(dirname $0)/kafka-run-class $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "$@"

四、启动Kafka Connect集群

启动命令如下所示:

/data/src/confluent-7.3.3/bin/connect-distributed /data/src/confluent-7.3.3/etc/schema-registry/connect-distributed.properties

正常启动Kafka Connect集群完整输出如下所示:

[2023-06-21 16:43:01,249] INFO EnrichedConnectorConfig values: config.action.reload = restartconnector.class = io.debezium.connector.mysql.MySqlConnectorerrors.log.enable = falseerrors.log.include.messages = falseerrors.retry.delay.max.ms = 60000errors.retry.timeout = 0errors.tolerance = noneexactly.once.support = requestedheader.converter = nullkey.converter = nullname = mysql-dw-valuekey-testoffsets.storage.topic = nullpredicates = []tasks.max = 1topic.creation.default.exclude = []topic.creation.default.include = [.*]topic.creation.default.partitions = 12topic.creation.default.replication.factor = 3topic.creation.groups = []transaction.boundary = polltransaction.boundary.interval.ms = nulltransforms = [unwrap, moveFieldsToHeader, moveHeadersToValue, Reroute]transforms.Reroute.key.enforce.uniqueness = truetransforms.Reroute.key.field.regex = nulltransforms.Reroute.key.field.replacement = nulltransforms.Reroute.logical.table.cache.size = 16transforms.Reroute.negate = falsetransforms.Reroute.predicate = transforms.Reroute.topic.regex = debezium-dw-encryption-test.dw.(.*)transforms.Reroute.topic.replacement = debezium-test-dw-encryption-all3transforms.Reroute.type = class io.debezium.transforms.ByLogicalTableRoutertransforms.moveFieldsToHeader.fields = [cdc_code, product]transforms.moveFieldsToHeader.headers = [product_code, productname]transforms.moveFieldsToHeader.negate = falsetransforms.moveFieldsToHeader.operation = copytransforms.moveFieldsToHeader.predicate = transforms.moveFieldsToHeader.type = class org.apache.kafka.connect.transforms.HeaderFrom$Valuetransforms.moveHeadersToValue.fields = [product_code2, productname2]transforms.moveHeadersToValue.headers = [product_code, productname]transforms.moveHeadersToValue.negate = falsetransforms.moveHeadersToValue.operation = copytransforms.moveHeadersToValue.predicate = transforms.moveHeadersToValue.type = class io.debezium.transforms.HeaderToValuetransforms.unwrap.add.fields = []transforms.unwrap.add.headers = []transforms.unwrap.delete.handling.mode = droptransforms.unwrap.drop.tombstones = truetransforms.unwrap.negate = falsetransforms.unwrap.predicate = transforms.unwrap.route.by.field = transforms.unwrap.type = class io.debezium.transforms.ExtractNewRecordStatevalue.converter = null(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
[2023-06-21 16:43:01,253] INFO [mysql-dw-valuekey-test|task-0] Loading the custom topic naming strategy plugin: io.debezium.schema.DefaultTopicNamingStrategy (io.debezium.config.CommonConnectorConfig:849)
Jun 21, 2023 4:43:01 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.[2023-06-21 16:43:01,482] INFO Started o.e.j.s.ServletContextHandler@2b80497f{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:921)
[2023-06-21 16:43:01,482] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:324)
[2023-06-21 16:43:01,482] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)

五、加载debezium插件

  • 下载debezium插件到plugin.path=/data/service/debezium/connectors2设置的目录下
  • 然后重新启动Kafka Connect集群就能够成功加载debezium插件

重启Kafka Connect集群查看debezium插件是否加载成功,如下所示:成功加载到了debezium 插件

[{
"class":"io.debezium.connector.mysql.MySqlConnector",
"type":"source",
"version":"2.2.1.Final"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"7.3.3-ce"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"7.3.3-ce"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"7.3.3-ce"}]

六、总结和延伸

总结:

  • 至此成功部署了具有一个节点的Kafka Connect集群,如果需要更多节点,需要在多台服务器上启动Kafka Connect,从而组成一个多节点的Kafka Connect集群

基于Kafka Connect加载debezium插件的更多的内容可以参考博主以下几篇技术博客或者Debezium 专栏:

  • Debezium系列之:安装部署debezium详细步骤,并把debezium服务托管到systemctl
  • Debezium系列之:打通Debezium2.0以上版本的使用技术
  • Debezium系列之:安装部署debezium2.0以上版本的详细步骤
  • Debezium系列之:实现接入上千Mysql、Sqlserver、MongoDB、Postgresql数据库的Debezium集群从Debezium1.X版本升级到Debezium2.X版本
  • Debezium系列之:安装jmx导出器监控debezium指标
  • Debezium系列之:Debezium UI部署详细步骤
  • Debezium 专栏地址

延伸:

  • 组成一个Kafka Connect集群后,需要启动多个connector进行Kafka Connect集群稳定性、可靠性测试。
  • 可以进一步部署Kafka Connect集群UI

这篇关于基于Confluent Kafka部署Kafka Connect集群,Kafka Connect集群加载debezium插件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:https://blog.csdn.net/zhengzaifeidelushang/article/details/131329473
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/195393

相关文章

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

IDEA下"File is read-only"可能原因分析及"找不到或无法加载主类"的问题

《IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题》:本文主要介绍IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题,具有很好的参... 目录1.File is read-only”可能原因2.“找不到或无法加载主类”问题的解决总结1.File

Web技术与Nginx网站环境部署教程

《Web技术与Nginx网站环境部署教程》:本文主要介绍Web技术与Nginx网站环境部署教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Web基础1.域名系统DNS2.Hosts文件3.DNS4.域名注册二.网页与html1.网页概述2.HTML概述3.

MyBatis分页插件PageHelper深度解析与实践指南

《MyBatis分页插件PageHelper深度解析与实践指南》在数据库操作中,分页查询是最常见的需求之一,传统的分页方式通常有两种内存分页和SQL分页,MyBatis作为优秀的ORM框架,本身并未提... 目录1. 为什么需要分页插件?2. PageHelper简介3. PageHelper集成与配置3.

Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例

《Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例》本文介绍Nginx+Keepalived实现Web集群高可用负载均衡的部署与测试,涵盖架构设计、环境配置、健康检查、... 目录前言一、架构设计二、环境准备三、案例部署配置 前端 Keepalived配置 前端 Nginx

Maven 插件配置分层架构深度解析

《Maven插件配置分层架构深度解析》:本文主要介绍Maven插件配置分层架构深度解析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录Maven 插件配置分层架构深度解析引言:当构建逻辑遇上复杂配置第一章 Maven插件配置的三重境界1.1 插件配置的拓扑

重新对Java的类加载器的学习方式

《重新对Java的类加载器的学习方式》:本文主要介绍重新对Java的类加载器的学习方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍1.1、简介1.2、符号引用和直接引用1、符号引用2、直接引用3、符号转直接的过程2、加载流程3、类加载的分类3.1、显示

ubuntu如何部署Dify以及安装Docker? Dify安装部署指南

《ubuntu如何部署Dify以及安装Docker?Dify安装部署指南》Dify是一个开源的大模型应用开发平台,允许用户快速构建和部署基于大语言模型的应用,ubuntu如何部署Dify呢?详细请... Dify是个不错的开源LLM应用开发平台,提供从 Agent 构建到 AI workflow 编排、RA

ubuntu16.04如何部署dify? 在Linux上安装部署Dify的技巧

《ubuntu16.04如何部署dify?在Linux上安装部署Dify的技巧》随着云计算和容器技术的快速发展,Docker已经成为现代软件开发和部署的重要工具之一,Dify作为一款优秀的云原生应用... Dify 是一个基于 docker 的工作流管理工具,旨在简化机器学习和数据科学领域的多步骤工作流。它