canal实现mysql数据同步的详细过程

2025-06-13 04:50

本文主要是介绍canal实现mysql数据同步的详细过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的...

1、canal下载

canal实现mysql数据同步可以直接安装canal server就可以了,但是为了方便管理(instance配置,canal server状态管理,集群等),需要安装canal admin,应用下载地址:Releases · alibaba/canal · github

进入页面可以选择需要安装的版本

canal实现mysql数据同步的详细过程

下载canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz

2、mysql同步用户创建和授权

登录mysql
mysql -h 127.0.0.1 -P 3306 -u root -p
创建同步用户 repl 密码设为123456
CREATE USER 'repl'@'%' IDENTIFIED BY '123456';
给予同步权限
GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456';
给予repl只读test库的权限,test库是用来同步数据的
GRANT SELECT ON test.* to 'repl'@'%' identified by '123456';
canal_manager是canal admin需要的,给予repl对该库的读写权限
GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456';
mysql my.cnf配置文件增加主从配置master数据库的配置信息
#主数据主从配置 唯一id
server_id=1
#开启logbin
log-bin=mysql-bin
#写入模式 row
binlog-format=ROW
#需要同步的库
binlog-do-db=test
#忽略的数据库
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema

canal实现mysql数据同步的详细过程

在canal-admin解压文件的conf中有一个canal_manager.sql,导入到master数据库

canal实现mysql数据同步的详细过程

3、canal admin安装和启动

把canal.admin-1.1.8.tar.gz上传到linux

解压 tar -zvxf canal.admin-1.1.8.tar.gz 

进入conf目录下,编辑application.yml配置文件。

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: repl
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowpublicKeyRetrieval=true
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
canal:
  adminUser: admin
  adminPasswd: 123456

重点介绍以下几个参数:

address:我们需要订阅(也就是mysql master服务器)mysql所在的服务器IP和数据库端口。

database:canal.admin web系统必须的几张表,需要在mysql master服务器上初始化conf/canal_manager.sql文件。

sername和password就是mysql master服务器创建的用于复制的用户和密码,也就是我们在canal server中配置的repl 和 12China编程3456。

China编程

driver-class-name:mysql的驱动,默认是MYSQL5的驱动,如果你China编程的MYSQL是8的(我的就是),要将驱动改为com.mysql.cj.jdbc.Driver。

另外,还需要在mysql连接后面加上allowPublicKeyRetrieval=true,不然启动时,有可能报错。

启动canal.admin

进入bin目录,执行如下命令,启动canal.admin:

./startup.sh

查看 admin 日志

2022-12-10 03:13:58.995 [main] INFO  o.s.jmx.export.annotation.AnnotationMBeanExporter - 
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO  org.apache.tomcat.util.net.NIOSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)

如果出现上述日志,说明启动成功!

登录admin

通过http://127.0.0.1:8089/访问,默认密码:admin/123456。

注意,IP和密码需要改成你自己配置的。如果是在服务器上配置的,别忘记放开8089端口。

canal实现mysql数据同步的详细过程

输入用户名和密码之后,出现上述页面说明配置成功!

如果需要修改密码,直接通过执行 select upper(sha1(unhex(sha1('1234567')))) 这个sql得到结果,然后复制到canal_manager库的canal_user表的password字段中就可以了,其中1234567是明文密码,执行上述sql会得到一个密码。

4、canal server安装和启动

把canal.deployer-1.1.8.tar.gz上传到linux

解压 tar -zvxf ccanal.deployer-1.1.8.tar.gz

进入conf目录下,编辑canal.properties配置文件。

注意,如果直接编辑canal.properties,可能无法启动,报如下错误:

canal实现mysql数据同步的详细过程

可以通过如下方式修改

mv canal.properties canal.properties_bak
cp canal_local.properties canal.properties
vim canal.properties

canal.properties文件全部内容如下:

# register ip
canal.register.ip =
# canal admin config  canalAdmin 的链接、端口、用户名和MD5密码
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
# admin auto register canal server启动后自动注入到canal admin管理模块
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =

一般只需要修改下面这3个

canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB

启动canal.server

进入bin目录,执行如下命令,启动canal.server:

./startup.sh

查看canal日志

canal实现mysql数据同步的详细过程

启动后,canalAdmin的server管理模块,对应创建的canal server会动态识别到,状态变为启动

canal实现mysql数据同步的详细过程

5、canal数据同步

5.1、java 端集成监听canal 同步的mysql数据

1、引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId&China编程gt;
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2、编写测试代码

package com.hy.das.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements InitializingBean{
    private final static int BATCH_SIZE = 1000;
    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接 此处的11111为tcp端口 在canal admin Server管理模块可以查看
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "test", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                System.out.println(message.getEntries().size());
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("----------------");
                    //如果有数据,处理数据
                    //遍历entries,单条解析
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        //获取表名
                        String tableName = entry.getHeader().getTableName();
                        //获取类型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        //判断entry类型是否为ROWDATA类型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //获取当前事件操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //获取数据集
                            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                            //遍历
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改变前数据
                                JSONObject jsonObjectBefore = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改变后数据
                                JSONObject jsonObjectAfter = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("当前操作类型为:"+entryType);
                        }
                    }
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

canal实现mysql数据同步的详细过程

newSingleConnector方法里面的test是一个instance实列,定义了需要同步的master库的信息(ip、端口、用户名、密码、binlog文件名称、同步位置、需要同步的库、不需要同步的库等)

在canal admin web管理界面的Instance 管理模块,点击新建Instance进行创建,新建页面的Instance名称就是test,这个可以随便填写,代码对应修改就行,所属集群/主机,因为我这里是单机部署,直接选择自动注入的canal server就行,点击载入模板,获取配置初始信息,下图中标出的信息按照实际的修改填入就行,点击保存后,启动这个Instance。

canal实现mysql数据同步的详细过程

canal实现mysql数据同步的详细过程

3、启动服务,对test库的sys_user表进行数据更新,可以看到后台已经收到变更数据

canal实现mysql数据同步的详细过程

5.2、kafka同步数据

1:canal.properties编程配置文件增加如下配置

#数据变更发送到kafka
# 设置输出目标为 kafka
canal.serverMode = kafka
# Kafka 地址
canal.mq.servers  = xx.xx.xx.xx:9092
# 投递失败的重试次数,默认0,改为2
canal.mq.retries = 2
# Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,这里改为200ms
# 满足batch.size和linger.ms其中之一,就会发送消息
canal.mq.lingerMs = 200
# Kafka buffer.memory,缓存大小,默认32M
canal.mq.bufferMemory = 33554432
# 获取binlog数据的批次大小,默认50
canal.mq.canalBatchSize = 50
# 获取binlog数据的超时时间,默认200ms
canal.mq.canalGetTimeout = 200
# 是否将binlog转为JSON格式。如果为false,就是原生Protobuf格式
canal.mq.flatMessage = true
# 压缩类型,官方文档没有描述
canal.mq.compressionType = none
# Kafka acks,默认all,表示分区leader会等所有follower同步完才给producer发送ack
# 0表示不等待ack,1表示leader写入完毕之后直接ack
canal.mq.acks = all
# Kafka消息投递是否使用事务
# 主要针对flatMessage的异步发送和动态多topic消息投递进行事务控制来保持和Canal binlog位置的一致性
# flatMessage模式下建议开启
canal.mq.transaction = true

canal实现mysql数据同步的详细过程

2:在canal admin web界面修改instance mq配置,增加数据同步到kakfa的topic

canal实现mysql数据同步的详细过程

3:如上两步配置完成重启后,在kafka监听配置的topic就可以接收到数据了

6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步

注意上面需要提供对外访问的端口需要开通安全组,比如8089、11111等端口。

参考文章:

【CanalAdmin部署文档】

https://zhuanlan.zhihu.com/p/590705531

到此这篇关于canal实现mysql数据同步的文章就介绍到这了,更多相关canal mysql数据同步内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于canal实现mysql数据同步的详细过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155039

相关文章

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

oracle 11g导入\导出(expdp impdp)之导入过程

《oracle11g导入导出(expdpimpdp)之导入过程》导出需使用SEC.DMP格式,无分号;建立expdir目录(E:/exp)并确保存在;导入在cmd下执行,需sys用户权限;若需修... 目录准备文件导入(impdp)1、建立directory2、导入语句 3、更改密码总结上一个环节,我们讲了

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决