Maxwell监听mysql的binlog日志变化写入kafka消费者

本文主要是介绍Maxwell监听mysql的binlog日志变化写入kafka消费者,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一. 环境: maxwell:v1.29.2   (从1.30开始maxwell停止了对java8的使用,改为为11)

 maxwell1.29.2这个版本对mysql8.0以后的缺少utf8mb3字符的解码问题,需要对原码中加上一个部分内容 :具体也给大家做了总结 : 

关于v1.29.2 版本的Maxwell存在于mysql8.0后版本部分源码字符集处理确实问题-CSDN博客

二. 程序这里还是那一个kafka模拟器来实时向mysql写数据做测试:

               

 maxwell可以成功将监听到的binlog日志信息写入到kafka的主题中去消费

三. 关于maxwell的配置以及启动方式:

1. 配置mysql的my.cnf配置文件开启binlog日志

 sudo  vim  /etc/my.cnf在[mysqld]模块下添加一下内容[mysqld]
server_id=1
log-bin=mysql-bin
binlog_format=row#binlog-do-db=test_maxwell然后并重启 Mysql 服务
sudo  systemctl  restart  mysqld登录 mysql 并查看是否修改完成 使用:
mysql>   show  variables  like  '%binlog%';
查看下列属性binlog_format	|   ROW

 2.查看mysql生成的binlog日志文件:正常会生成二个文件 一个初始化文件,一个索引记录文件

cd  /var/lib/mysqlsudo  ls  -l |grep binlog
总用量  188500
-rw-r-----.   1   mysql   mysql	154   11 月  17   16:30   mysql-
bin.000001
-rw-r-----.   1   mysql   mysql	19   11 月  17   16:30   mysql-
bin.index

 3.初始化Maxwell元数据库

(1)建立一个 maxwell 库用于存储 Maxwell 的元数据
mysql>   CREATE   DATABASE   maxwell;
(2)设置 mysql 用户密码安全级别感觉8.1后也不管用啊
mysql> set   global   validate_password_length=4;
mysql> set   global   validate_password_policy=0;
(3)创建一个maxwell账号可以操作该数据库 :user=maxwell ,PW:123456
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
(4)分配这个账号可以监控其他数据库的权限,查看其对应的binlog日志
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON
*.*  TO  maxwell@'%';
(5)最后记得刷新权限即可
mysql>flush   privileges;

4.启动maxwell的方式有二种:

4.1 .纯指令式

4.1.1 : 打印在console

bin/maxwell 
--user='maxwell' 
--password='123456' 
--host='hadoop102' 
--producer=stdout

4.1.2  : binlog消费到kafka的topic中

bin/maxwell 
--user='maxwell'
--password='123456!'
--host='localhost'
--producer=kafka 
--kafka.bootstrap.servers=hadoop103:9092,hadoop104:9092
--kafka_topic=maxwelltest

4.2 指定config文件式

vim  config.properties#   tl;dr   config
log_level=info
producer=kafka#   mysql   login   info
host=hadoop102
user=maxwell
password=123456#	***   kafka   ***
#   list   of   kafka   brokerskafka.bootstrap.servers=hosta:9092,hostb:9092
kafka_topic=maxwell

这种config式的配置之后在使用指令式的时候不能再加host参数和指定value了会报错

 5.开启kafka消费者来消费binlog二进制数据

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_name# 如果要kafka消费的数据直接写入hdfs:kafka-console-consumer.sh --bootstrap-server hadoop103:9092,hadoop104:9092 --topic topic-name | hadoop fs -put - /dataset/lixianData/text.txt  # 这里写入到这个txt文件中,提前建好

这篇关于Maxwell监听mysql的binlog日志变化写入kafka消费者的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/831701

相关文章

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

MySQL中On duplicate key update的实现示例

《MySQL中Onduplicatekeyupdate的实现示例》ONDUPLICATEKEYUPDATE是一种MySQL的语法,它在插入新数据时,如果遇到唯一键冲突,则会执行更新操作,而不是抛... 目录1/ ON DUPLICATE KEY UPDATE的简介2/ ON DUPLICATE KEY UP

MySQL分库分表的实践示例

《MySQL分库分表的实践示例》MySQL分库分表适用于数据量大或并发压力高的场景,核心技术包括水平/垂直分片和分库,需应对分布式事务、跨库查询等挑战,通过中间件和解决方案实现,最佳实践为合理策略、备... 目录一、分库分表的触发条件1.1 数据量阈值1.2 并发压力二、分库分表的核心技术模块2.1 水平分

Python与MySQL实现数据库实时同步的详细步骤

《Python与MySQL实现数据库实时同步的详细步骤》在日常开发中,数据同步是一项常见的需求,本篇文章将使用Python和MySQL来实现数据库实时同步,我们将围绕数据变更捕获、数据处理和数据写入这... 目录前言摘要概述:数据同步方案1. 基本思路2. mysql Binlog 简介实现步骤与代码示例1

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

使用shardingsphere实现mysql数据库分片方式

《使用shardingsphere实现mysql数据库分片方式》本文介绍如何使用ShardingSphere-JDBC在SpringBoot中实现MySQL水平分库,涵盖分片策略、路由算法及零侵入配置... 目录一、ShardingSphere 简介1.1 对比1.2 核心概念1.3 Sharding-Sp

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”