SpringBoot整合Canal+RabbitMQ监听数据变更详解

本文主要是介绍SpringBoot整合Canal+RabbitMQ监听数据变更详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit...

需求

在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求。例如,在电商系统中,当订单表发生更新时,可能需要同步这些变更到搜索服务、缓存服务或者通知其他微服务。传统的解决方案包括定时轮询数据库或通过触发器将变更写入消息队列等方法,但这些方案要么效率低下,要么实现复杂。而使用 Canal + RabbitMQ 可以提供一种高效且可靠的方式来捕获 mysql 数据库的变更,并将其发送到 RabbitMQ 中供其他服务消费。

Canal 是阿里巴巴开源的一个用于增量订阅和消费 MySQL 数据库 Binlog 的工具,它模拟 MySQL 主从复制机制,无需侵入业务逻辑即可捕获数据库变更。RabbitMQ 是一个流行的开源消息代理,支持多种协议并提供了丰富的特性来确保消息传递的可靠性。结合这两者,可以构建一个强大的实时数据变更监听和处理系统。

步骤

环境搭建

整合SpringBoot与Canal实现客户端

Canal整合RabbitMQ

SpringBoot整合RabbitMQ

环境搭建

1. 安装MySQL

确保你有一个正在运行的 MySQL 实例,并启用了 binlog 日志记录功能。这是 Canal 捕获数据库变更的基础。

# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

重启 MySQL 服务使配置生效。

2. 安装Canal Server

下载最新版本的 Canal Server 并解压到合适的位置。根据官方文档进行python必要的配置,特别是 instance.properties 文件中的数据库连接信息。

3. 安装RabbitMQ

可以通过 docker 快速安装 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

访问 http://localhost:15672 登录管理界面,默认用户名/密码为 guest/guest。

整合SpringBoot与Canal实现客户端

创建SpringBoot项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,添加 Web, JPA, 和 AMQP(用于后续整合 RabbitMQ)依赖。

引入Canal依赖

在 pom.XML 中添加 Canal Client 的依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

编写Canal客户端代码

创建一个 Canal 客户端类,用来监听 MySQL 数据库的变化,并将变更事件转发给 RabbitMQ。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;

public class CanalClient {

    private final CanalConnector connector;
    private final Channel channel;

    public CanalClient(CanalConnector connector, Channel channel) {
        this.connector = connector;
        this.channel = channel;
    }

    public void start() {
        // Canal 连接配置
        connector.connect();
        connector.subscribe(".*\\..*"); // 订阅所有数据库和表
        connector.rollback();

        while (true) {
            int BATchSize = 1000;
            EntryBatch batch = connector.getWithoutAck(batchSize); // 获取一批次数据
            long batchId = batch.getId();
            int size = batch.getEntries().size();

            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                printEntry(batch.getEntries());
                connector.ack(batchId); // 提交确认
            }

            if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }

        connector.disconnect();
    }

    private void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChanjavascriptge rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    sendToRabbitMQ(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    sendToRabbitMQ(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    sendToRabbitMQ(rowData.getBeforeColumnsList());

                    System.out.println("-------> after");
                    sendToRabbitMQ(rowData.getAfterColumnsList());
           China编程     }
            }
        }
    }

    private void sendToRabbitMQ(List<Column> columns) {
        StringBuilder message = new StringBuilder();
        for (Column column : columns) {
            message.append(column.getName()).append("=").append(column.getValue()).append(",");
        }
        try {
            channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrawww.chinasem.cnce();
        }
    }
}

Canal整合RabbitMQ

配置Canal Server

确保 Canal Server 已正确配置并启动,能够监听 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 实例,并设置适当的过滤规则。

配置RabbitMQ Exchange

在 RabbitMQ 中创建一个名为 canal_exchange 的 exchange,类型可以根据需要选择,如 fanout, direct, topic 或 headers。

rabbitmqadmin declare exchange name=canal_exchange type=fanout

SpringBoot整合RabbitMQ

添加依赖

确保在 pom.xml 中已经包含了 RabbitMQ 的 Spring AMQP 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ连接信息

在 application.yml 或 application.properties 中配置 RabbitMQ 的连接参数。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

创建消费者

编写一个消费者类来接收来自 RabbitMQ 的消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class CanalMessageConsumer {

    @RabbitListener(queues = "canal_queue")
    public void receive(String message) {
        System.out.println("Received message: " + message);
    }
}

配置队列和绑定

确保在应用程序启动时自动创建所需的队列,并将它们绑定到之前创建的 exchange 上。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue canalQueue() {
        return new Queue("canal_queue", false);
    }

    @Bean
    public TopicExchange canalExchange() {
        return new TopicExchange("canal_exchange");
    }

    @Bean
    public Binding binding(Queue canalQueue, TopicExchange canalExchange) {
        return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");
    }
}

总结

通过以上步骤,我们成功地将 Canal 与 RabbitMQ 整合到了 Spring Boot 应用程序中。这使得我们可以实时监听 MySQL 数据库的变更,并将这些变更作为消息发布到 RabbitMQ 中供其他微服务消费。这种方法不仅提高了系统的响应速度,也简化了数据同步的过程,降低了开发和维护成本。

到此这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这了,更多相关SpringBoot监听数据变更内容请搜索编程China编程(www.chinasem.cn)以前的文章或继www.chinasem.cn续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1152860

相关文章

使用Navicat工具比对两个数据库所有表结构的差异案例详解

《使用Navicat工具比对两个数据库所有表结构的差异案例详解》:本文主要介绍如何使用Navicat工具对比两个数据库test_old和test_new,并生成相应的DDLSQL语句,以便将te... 目录概要案例一、如图两个数据库test_old和test_new进行比较:二、开始比较总结概要公司存在多

JavaWeb-WebSocket浏览器服务器双向通信方式

《JavaWeb-WebSocket浏览器服务器双向通信方式》文章介绍了WebSocket协议的工作原理和应用场景,包括与HTTP的对比,接着,详细介绍了如何在Java中使用WebSocket,包括配... 目录一、概述二、入门2.1 POM依赖2.2 编写配置类2.3 编写WebSocket服务2.4 浏

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

配置springboot项目动静分离打包分离lib方式

《配置springboot项目动静分离打包分离lib方式》本文介绍了如何将SpringBoot工程中的静态资源和配置文件分离出来,以减少jar包大小,方便修改配置文件,通过在jar包同级目录创建co... 目录前言1、分离配置文件原理2、pom文件配置3、使用package命令打包4、总结前言默认情况下,

Java文件与Base64之间的转化方式

《Java文件与Base64之间的转化方式》这篇文章介绍了如何使用Java将文件(如图片、视频)转换为Base64编码,以及如何将Base64编码转换回文件,通过提供具体的工具类实现,作者希望帮助读者... 目录Java文件与Base64之间的转化1、文件转Base64工具类2、Base64转文件工具类3、

java获取图片的大小、宽度、高度方式

《java获取图片的大小、宽度、高度方式》文章介绍了如何将File对象转换为MultipartFile对象的过程,并分享了个人经验,希望能为读者提供参考... 目China编程录Java获取图片的大小、宽度、高度File对象(该对象里面是图片)MultipartFile对象(该对象里面是图片)总结java获取图片

Java通过反射获取方法参数名的方式小结

《Java通过反射获取方法参数名的方式小结》这篇文章主要为大家详细介绍了Java如何通过反射获取方法参数名的方式,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1、前言2、解决方式方式2.1: 添加编译参数配置 -parameters方式2.2: 使用Spring的内部工具类 -

Java如何获取视频文件的视频时长

《Java如何获取视频文件的视频时长》文章介绍了如何使用Java获取视频文件的视频时长,包括导入maven依赖和代码案例,同时,也讨论了在运行过程中遇到的SLF4J加载问题,并给出了解决方案... 目录Java获取视频文件的视频时长1、导入maven依赖2、代码案例3、SLF4J: Failed to lo

如何使用Spring boot的@Transactional进行事务管理

《如何使用Springboot的@Transactional进行事务管理》这篇文章介绍了SpringBoot中使用@Transactional注解进行声明式事务管理的详细信息,包括基本用法、核心配置... 目录一、前置条件二、基本用法1. 在方法上添加注解2. 在类上添加注解三、核心配置参数1. 传播行为(