本文主要是介绍MySQL 迁移至 Doris 最佳实践方案(最新整理),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《MySQL迁移至Doris最佳实践方案(最新整理)》本文将深入剖析三种经过实践验证的MySQL迁移至Doris的最佳方案,涵盖全量迁移、增量同步、混合迁移以及基于CDC(ChangeData...
在数据架构不断演进的背景下,从 MySQL 迁移至 Doris 成为许多企业提升数据处理效率的关键选择。本文将深入剖析三种经过实践验证的 MySQL 迁移至 Doris 的最佳方案,涵盖全量迁移、增量同步、混合迁移以及基于 CDC(Change Data Capture)的实时迁移。每种方案都将从技术原理、实施步骤、适用场景、资源消耗等维度展开分析,并通过对比选型帮助读者结合自身业务需求与技术栈,找到最契合的迁移路径,实现数据平稳过渡与性能跃升。
一、JDBC Catalog 联邦查询方案(适合跨库实时查询)
1. 方案概述
通过 Doris 1.2 + 引入的JDBC Catalog功能,直接通过标准 JDBC 协议连接 MySQL 数据库,实现跨库联邦查询和数据写入。相比 ODBC,JDBC 接口更统一、兼容性更强,且无需复杂的驱动安装和版本匹配。
2. 环境要求
组件 | 版本要求 | 说明 |
---|---|---|
Doris | 1.2.0+ | 支持 JDBC Catalog 功能 |
MySQL | MySQL 5.7, 8.0 或更高版本 | 需开启远程访问权限 |
MySQL JDBC | 8.0.28+ | 驱动下载:MySQL JDBC 驱动 |
3. 实施步骤
3.1 安装 JDBC 驱动(所有 FE/BE 节点)
步骤 1:创建驱动目录
mkdir -p /your_path/doris/jdbc_drivers
步骤 2:上传驱动包下载 MySQL JDBC 驱动(如mysql-connector-Java-8.0.31.jar
),并上传至所有 FE/BE 节点的/your_path/doris/jdbc_drivers
目录。
步骤 3:配置驱动路径编辑fe.conf
和be.conf
,添加以下配置(如果你使用的是默认的路径就不需要修改这个了):
jdbc_drivers_dir = /opt/doris/jdbc_drivers
步骤 4:重启服务
# 重启FE cd /opt/doris/fe bin/stop_fe.sh && bin/start_fe.sh --daemon # 重启BE cd /opt/doris/be bin/stop_be.sh && bin/start_be.sh --daemon
3.2 创建 JDBC Catalog
使用 MySQL 客户端连接 Doris,执行以下 SQL 创建 Catalog:
CREATE CATALOG mysql PROPERTIES ( "type"="jdbc", "user"="root", "password"="secret", "jdbc_url" = "jdbc:mysql://example.net:3306", "driver_url" = "mysql-connector-j-8.3.0.jar", "driver_class" = "com.mysql.cj.jdbc.Driver" )
jdbc_url
:MySQL 连接地址,可指定数据库(如jdbc:mysql://host:port/dbname
)。
driver_url
:驱动文件名,需与/opt/doris/jdbc_drivers
目录下的文件一致,默认就是/your_path/doris/jdbc_drivers
。
3.3 创建查询或者导入
执行联邦查询
-- 直接查询mysql表 SELECT * FROM mysql_catalog.mysql_db.user_table LIMIT 10; -- 与Doris表关联查询 SELECT a.id, a.name, b.sales FROM mysql_catalog.mysql_db.user_table a JOIN doris_table b ON a.id = b.user_id;
执行数据导入
-- 建表 create table as select mysql_catalog.mysql_db.user_table -- 导入 insert into doris_table select * from mysql_catalog.mysql_db.user_table
4. 注意事项
数据类型映射:
MySQL 的BIT
类型映射为 Doris 的STRING
,需在查询时转换(如CAST(bit_col AS BOOLEAN)
)。
大字段类型(如BLOB
)建议通过VARCHAR
映射,避免查询性能问题。
权限控制:
为 Doris 创建专用 MySQL 用户,仅授予SELECT
权限,避免数据泄露。
在 Doris 中通过GRANT
语句限制外部表访问权限。
连接安全
如果您使用数据源上安装的全局信任证书配置了 TLS,则可以通过将参数附加到在 jdbc_url 属性中设置的 JDBC 连接字符串来启用集群和数据源之间的 TLS。
二、Binlog 实时同步方案(适合增量数据同步)
1. 方案概述
通过 Canal 解析 MySQL Binlog,实时同步增量数据至 Doris,支持 INSERT/UPDATE/DELETE 操作,适用于实时数据仓库场景。
2. 环境要求
组件 | 版本要求 | 说明 |
---|---|---|
MySQL | 5.7+ | 需开启 Binlog(ROW 模式) |
Canal | 1.1.5+ | 下载地址:Canal 官网 |
Doris | 1.2.0+ | 支持 BATCH_DELETE 特性 |
3. 实施步骤
3.1 配置 MySQL 开启 Binlog
步骤 1:编辑 my.cnf
[mysqld] log-bin=mysql-bin # 开启Binlog binlog-format=ROW # 使用ROW模式 binlog-row-image=FULL # 记录完整行数据 server-id=1 # 唯一服务器ID
步骤 2:重启 MySQL 服务
sudo systemctl restart mysqld
3.2 部署 Canal 服务
步骤 1:下载并解压
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz tar -zxvf canal.deployer-1.1.5.tar.gz
步骤 2:创建 Instance 配置
mkdir conf/demo cp conf/example/instance.properties conf/demo/ vim conf/demo/instance.properties
关键配置:
canal.instance.master.address=192.168.1.100:3306 # MySQL地址 canal.instance.dbUsername=canal # 具有Binlog读取权限的用户 canal.instance.dbPassword=canal # 用户密码 canal.destination=demo # Instance名称
步骤 3:启动 Canal
sh bin/startup.sh # 验证日志:cat logs/demo/demo.log,显示"start successful"表示启动成功
3.3 在 Doris 中创建同步任务
步骤 1:创建 Doris 目标表(需开启 BATCH_DELETE)
--create Mysql table CREATE TABLE `test.source_test` ( `id` int(11) NOT NULL COMMENT "", `name` int(11) NOT NULL COMMENT "" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- create Doris table CREATE TABLE `target_test` ( `id` int(11) NOT NULL COMMENT "", `name` int(11) NOT NULL COMMENT "" ) ENGINE=OLAP UNIQUE KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 8 PROPERTIES ( "replication_allocation" = "tag.location.default: 3" );
步骤 2:创建同步作业
CREATE SYNC demo_job ( FROM mysql.test_table INTO target_test (id, name) ) FROM BINLOG ( "type" = "canal", "canal.server.ip" = "192.168.1.101", # Canal服务器IP "canal.server.port" = "11111", # Canal默认端口 "canal.destination" = "demo", # Canal Instance名称 "canal.username" = "canal", "canal.password" = "canal", "sync.mode" = "incremental" # 增量同步模式 );
4. 注意事项
权限要求:需为 Canal 创建专用用户(如 canal),并授予REPLICATION SLAVE
权限:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
数据一致性:建议在 Doris 目标表中使用UNIQUE KEY
模型,并确保 MySQL 源表与 Doris 表字段一一对应。
三、Flink CDC 流式同步方案(适合高实时性场景)
1. 方案概述
基于 Flink CDC(Change Data Capture)实时捕获 MySQL 变更数据,通过 Flink-Doris 连接器写入 Doris,支持秒级延迟,适用于实时分析、实时报表场景。
2. 环境要求
组件 | 版本要求 | 说明 |
---|---|---|
Flink | 1.13+ | 支持 Flink CDC 2.0+ |
Doris | 1.2.0+ | 支持 Delete 操作 |
依赖库 | flink-cdc-2.1.0, flink-doris-connector-1.1.0 | 需下载对应版本 JAR 包 |
3. 实施步骤
3.1 构建 Flink 作业
步骤 1:添加 Maven 依赖
<dependency>
<groupId>js;org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>25.1.0</version>
</dependency>
步骤 2:编写 Flink SQL 脚本
-- enable checkpoint SET 'execution.checkpointing.interval' = '30s'; -- 创建MySQL CDC源表 CREATE TABLE mysql_cdc_source ( id INT PRIMARY KEY NOT ENFORCED, name STRING, age INT, ts TIMESTAMP(3) METADATA FROM 'source_ts' VIRTUAL ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.1.100', 'port' = '3306', 'username' = 'root', 'password' = 'your_mysql_password', 'database-name' = 'test', 'table-name' = 'user_table' ); -- 创建Doris目标表 CREATE TABLE doris_sink ( id INT, name STRING, age INT, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'doris', 'fenodes' = 'doris-fe:8030', 'table.identifier' = 'test.user_table', 'username' = 'root', 'password' = '', 'sink.properties.format' = 'json', 'sink.enable-delete' = 'true', 'sink.label-prefix' = 'flink_cdc_' ); -- 启动数据同步 INSERT INTO doris_sink SELECT id, name, age, ts FROM mysql_cdc_source;
3.2 提交 Flink 作业
<FLINK_HOME>bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.16-24.0.1.jar \ mysql-sync-database \ --database test_db \ --mysql-conf hostname=127.0.0.1 \ --mysql-conf port=3306 \ --mysql-conf username=root \ --mysql-conf password=123456 \ --mysql-conf database-name=mysql_db \ --including-tables "tbl1|test.*" \ --sink-conf fenodes=127.0.0.1:8030 \ --sink-conf username=root \ --sink-conf password=123456 \ --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ --sink-conf sink.label-prefix=label \ --table-conf replication_num=1
4. 注意事项
并行度优化:根据 MySQL 表数量和数据量调整 Flink 作业并行度,避免单任务压力过大。
写入机制:开启 Flink Checkpoint,Doris Flink Connpythonector 提供了两种攒批模式,默认使用基于 Flink Checkpoint 的流式写入方式。
写入方式 | 流式写入 | 批量写入 |
---|---|---|
触发条件 | 依赖 Flink 的 Checkpoint,跟随 Flink 的 Checkpoint 周期写入到 Doris 中 | 基于 Connector 内的时间阈值、数据量阈值进行周期性提交,写入到 Doris 中 |
一致性 | Exactly-Once | At-Least-Once,基于主键模型可以保证 Exactly-Once |
延迟 | 受 Checkpoint 时间间隔限制,通常较高 | 独立的批处理机制,灵活调整 |
容错与恢复 | 与 Flink 状态恢复完全一致 | 依赖外部去重逻辑(如 Doris 主键去重) |
四、Datax 同步方案(适合全量 / 批量数据迁移)
1. 方案概述
Datax 是一款异构数据源之间数据同步工具,通过编写 JSON 格式的配置文件,实现 MySQL 与 Doris 之间的数据抽取、转换和加载(ETL)。该方案适用于全量数据迁移、定期批量数据同步场景,对实时性要求不高,但配置灵活,可自定义数据处理逻辑。
2. 环境要求
组件 | 版本要求 | 说明 |
---|---|---|
Datax | 3.0+ | 下载地址:Datax 官网 |
MySQL | 5.6+ | 支持标准 JDBC 协议 |
Doris | 1.0+ | 支持通过 JDBC 或 Broker 导入数据 |
3. 实施步骤
3.1 安装 Datax
# 下载Datax压缩包 wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz tar -zxvf datax.tar.gz
3.2 编写同步配置文件
创建mysql_to_doris.json
文件,示例配置如下:
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [xxx], "connection": [ { "jjsdbcUrl": ["jdbc:mysql://localhost:3306/demo"], "table": ["table_name"] } ], "username": "root", "password": "xxxxx", "where": "" } }, "writer": { "name": "doriswriter", "parameter": { "loadUrl": ["127.0.0.1:8030"], "column": [xxxx], "username": "root", "password": "xxxxxx", "postSql": ["select count(1) from table_name"], "preSql": [], "flushInterval":30000, "connection": [ { "jdbcUrl": "jdbc:mysql://127.0.0.1:9030/demo", "selectedDatabase": "demo", "table": ["table_name"] } ], "loadProps": { "format": "json", "strip_outer_array":"true", "line_delimiter": "\\x02" } } } } ], "setting": { "speed": { "channel": "1" } } } }
3.3 执行同步任务
python bin/datax.py mysql_to_doris.json
4. 注意事项
数据类型映射:确保 MySQL 与 Doris 字段类型兼容,如 MySQL 的DATETIME
对应 Doris 的DATETIME
,避免类型转换错误。
性能调优:通过调整channel
参数控制并发度,但过高的并发可能导致资源耗尽,建议根据数据库负载测试后设置。
五、方案对比与选型建议
方案 | 适用场景 | 复杂度 | 资源依赖 |
---|---|---|---|
JDBC Catalog | 跨库实时查询、联邦分析 | ★★☆☆☆ | 依赖 Doris JDBC 功能 |
Binlog 同步 | 增量数据实时同步 | ★★★☆☆ | 需部署 Canal 及相关组件 |
Flink CDC 同步 | 高实时性流式处理、复杂清洗 | ★★★★☆ | 依赖 Flink 集群 |
Datax 同步 | 全量 / 批量数据迁移、离线同步 | ★★★☆☆ | 轻量级工具,独立部署 |
优先选择 JDBC Catalog:当需要快速验证跨库查询功能,或需要与 Doris 本地表进行关联分析时。
推荐 Binlog 同步:常规增量同步场景,兼顾实时性和部署复杂度。
选择 Flink CDC:对实时性要求高(如实时看板),或需要复杂数据清洗(通过 Flink Shttp://www.chinasem.cnQL 实现)。
使用 Datax 同步:适合一次性全量迁移,或周期性批量同步历史数据。
五、数据迁移最佳实践要点
全量初始化与增量同步结合:
首次迁移时,先通过DataX
导入 MySQL 全量数据,再启动 Binlog 或 Flink CDC 同步增量数据,避免历史数据积压。
数据类型兼容性测试:
重点验证 MySQL 的ENUM
、SET
、BIT
等类型在 Doris 中的映射是否符合预期。
监控与告警体系:
结合 Prometheus + Grafana 监控或者Doris Manager工具观察资源使用情况。
这份文档已涵盖多种迁移方式。若还想对某个迁移方式补充更多细节,或增加其他方面的内容,欢迎讨论。
到此这篇关于MySQL 迁移至 Doris 最佳实践方案的文章就介绍到这了,更多相关MySQL 迁移至 Doris内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于MySQL 迁移至 Doris 最佳实践方案(最新整理)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!