使用canal增量同步ES索引库数据

2024-08-24 02:44

本文主要是介绍使用canal增量同步ES索引库数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Canal增量数据同步利器

Canal介绍

canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。

github地址:https://github.com/alibaba/canal

版本下载地址:https://github.com/alibaba/canal/releases

文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

Canal应用场景

1.电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
2.价格、库存发生变更实时同步到redis;
3.数据库异地备份、数据同步;
4.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
在这里插入图片描述

MySQL主从复制原理

1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

Canal工作原理

1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3.canal 解析 binary log 对象(原始为 byte 流)
在这里插入图片描述

Canal安装

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

MySQL Bin-log开启

1)MySQL开启bin-log

a.进入mysql容器

docker exec -it -u root mysql /bin/bash

b.开启mysql的binlog

cd /etc/mysql/mysql.conf.d在mysqld.cnf最下面添加如下配置
# 开启 binlog
log-bin=/var/lib/mysql/mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server-id=12345

c.创建账号并授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

d.重启mysql

docker restart mysql

开启bin-log后,我们可以用sql语句查看下:

show variables like '%log_bin%'

效果如下:
在这里插入图片描述

Canal安装

1)拉取镜像

docker pull canal/canal-server:v1.1.1

2)安装容器

a.安装canal-server容器

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

b.配置canal-server

修改/home/admin/canal-server/conf/canal.properties,将它的id属性修改成和mysql数据库中server-id不同的值,如下图:
在这里插入图片描述
c.修改/home/admin/canal-server/conf/example/instance.properties,配置要监听的数据库服务地址和监听数据变化的数据库以及表,修改如下:
在这里插入图片描述
在这里插入图片描述
指定监听数据库表的配置如下canal.instance.filter.regex:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

重启canal:

docker restart canal
Canal微服务

​ 我们搭建一个微服务,用于读取canal监听到的变更日志,微服务名字叫seckill-canal。该项目我们需要引入canal-spring-boot-autoconfigure包,并且需要实现EntryHandler接口,该接口中有3个方法,分别为insert、update、delete,这三个方法用于监听数据增删改变化。

参考地址:https://github.com/NormanGyllenhaal/canal-client

1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>seckill-service</artifactId><groupId>com.seckill</groupId><version>0.0.1-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>seckill-canal</artifactId><dependencies><!--web--><dependency><groupId>com.seckill</groupId><artifactId>seckill-web</artifactId><version>0.0.1-SNAPSHOT</version></dependency><!--esAPI--><dependency><groupId>com.seckill</groupId><artifactId>seckill-search-api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><!--goodsAPI--><dependency><groupId>com.seckill</groupId><artifactId>seckill-goods-api</artifactId><version>0.0.1-SNAPSHOT</version></dependency><!--canal--><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-autoconfigure</artifactId><version>1.2.1-RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><!-- 指定该Main Class为全局的唯一入口 --><mainClass>com.seckill.CanalApplication</mainClass><layout>ZIP</layout></configuration><executions><execution><goals><goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中--></goals></execution></executions></plugin></plugins></build>
</project>

bootstrap.yml配置

server:port: 18088
spring:application:name: seckill-canalcloud:nacos:config:file-extension: yamlserver-addr: nacos-server:8848discovery:#Nacos的注册地址server-addr: nacos-server:8848
#超时配置
ribbon:ReadTimeout: 3000000
#Canal配置
canal:server: canal-server:11111destination: example
#日志
logging:level:root: error

2)创建com.seckill.handler.SkuHandler实现EntryHandler接口,代码如下:

@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {/**** 增加数据* @param sku*/@Overridepublic void insert(Sku sku) {System.out.println("===========insert:"+sku);}/**** 修改数据* @param before* @param after*/@Overridepublic void update(Sku before, Sku after) {System.out.println("===========update-before:"+before);System.out.println("===========update-after:"+after);}/**** 删除数据* @param sku*/@Overridepublic void delete(Sku sku) {System.out.println("===========delete:"+sku);}
}

3)创建启动类

@SpringBootApplication
public class CanalApplication {public static void main(String[] args) {SpringApplication.run(CanalApplication.class,args);}
}

程序启动后,修改tb_sku数据,可以看到控制会打印修改前后的数据:
在这里插入图片描述

索引库同步

当tb_sku秒杀商品发生变化时,我们应该同时变更索引库中的索引数据,比如秒杀商品增加,则需要同步增加秒杀商品的索引,如果有秒杀商品删除,则需要同步移除秒杀商品。

修改seckill-canal中的com.seckill.handler.SkuHandler的增删改方法,代码如下:

@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {@Autowiredprivate SkuInfoFeign skuInfoFeign;/**** 增加数据* @param sku*/@Overridepublic void insert(Sku sku) {//将Sku转换成SkuInfoSkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);//同步索引skuInfoFeign.modify(1,skuInfo);}/**** 修改数据* @param before* @param after*/@Overridepublic void update(Sku before, Sku after) {int type=2;//将Sku转换成SkuInfoSkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(after) ,SkuInfo.class);if(skuInfo.getStatus()==1 || after.getSeckillNum()<=0){//商品变成了普通商品,或者商品库存为0,则需要删除索引数据type=3;}//同步索引skuInfoFeign.modify(type,skuInfo);}/**** 删除数据* @param sku*/@Overridepublic void delete(Sku sku) {//将Sku转换成SkuInfoSkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);//同步索引skuInfoFeign.modify(3,skuInfo);}
}

开启Feign功能:@EnableFeignClients(basePackages = {“com.seckill.search.feign”})
在这里插入图片描述
此时对数据库中tb_sku表进行增删改的时候,会同步到索引库中。

这篇关于使用canal增量同步ES索引库数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101225

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

Python常用命令提示符使用方法详解

《Python常用命令提示符使用方法详解》在学习python的过程中,我们需要用到命令提示符(CMD)进行环境的配置,:本文主要介绍Python常用命令提示符使用方法的相关资料,文中通过代码介绍的... 目录一、python环境基础命令【Windows】1、检查Python是否安装2、 查看Python的安

Python并行处理实战之如何使用ProcessPoolExecutor加速计算

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu... 目录简介完整代码示例代码解释1. 导入必要的模块2. 定义处理函数3. 主函数4. 生成数字列表5.

Python中help()和dir()函数的使用

《Python中help()和dir()函数的使用》我们经常需要查看某个对象(如模块、类、函数等)的属性和方法,Python提供了两个内置函数help()和dir(),它们可以帮助我们快速了解代... 目录1. 引言2. help() 函数2.1 作用2.2 使用方法2.3 示例(1) 查看内置函数的帮助(

Linux脚本(shell)的使用方式

《Linux脚本(shell)的使用方式》:本文主要介绍Linux脚本(shell)的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述语法详解数学运算表达式Shell变量变量分类环境变量Shell内部变量自定义变量:定义、赋值自定义变量:引用、修改、删

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

LiteFlow轻量级工作流引擎使用示例详解

《LiteFlow轻量级工作流引擎使用示例详解》:本文主要介绍LiteFlow是一个灵活、简洁且轻量的工作流引擎,适合用于中小型项目和微服务架构中的流程编排,本文给大家介绍LiteFlow轻量级工... 目录1. LiteFlow 主要特点2. 工作流定义方式3. LiteFlow 流程示例4. LiteF

使用Python开发一个现代化屏幕取色器

《使用Python开发一个现代化屏幕取色器》在UI设计、网页开发等场景中,颜色拾取是高频需求,:本文主要介绍如何使用Python开发一个现代化屏幕取色器,有需要的小伙伴可以参考一下... 目录一、项目概述二、核心功能解析2.1 实时颜色追踪2.2 智能颜色显示三、效果展示四、实现步骤详解4.1 环境配置4.

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal