SpringBoot整合Canal+RabbitMQ监听数据变更详解

本文主要是介绍SpringBoot整合Canal+RabbitMQ监听数据变更详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit...

需求

在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求。例如,在电商系统中,当订单表发生更新时,可能需要同步这些变更到搜索服务、缓存服务或者通知其他微服务。传统的解决方案包括定时轮询数据库或通过触发器将变更写入消息队列等方法,但这些方案要么效率低下,要么实现复杂。而使用 Canal + RabbitMQ 可以提供一种高效且可靠的方式来捕获 mysql 数据库的变更,并将其发送到 RabbitMQ 中供其他服务消费。

Canal 是阿里巴巴开源的一个用于增量订阅和消费 MySQL 数据库 Binlog 的工具,它模拟 MySQL 主从复制机制,无需侵入业务逻辑即可捕获数据库变更。RabbitMQ 是一个流行的开源消息代理,支持多种协议并提供了丰富的特性来确保消息传递的可靠性。结合这两者,可以构建一个强大的实时数据变更监听和处理系统。

步骤

环境搭建

整合SpringBoot与Canal实现客户端

Canal整合RabbitMQ

SpringBoot整合RabbitMQ

环境搭建

1. 安装MySQL

确保你有一个正在运行的 MySQL 实例,并启用了 binlog 日志记录功能。这是 Canal 捕获数据库变更的基础。

# 修改 MySQL 配置文件 my.cnf 或 my.ini
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

重启 MySQL 服务使配置生效。

2. 安装Canal Server

下载最新版本的 Canal Server 并解压到合适的位置。根据官方文档进行python必要的配置,特别是 instance.properties 文件中的数据库连接信息。

3. 安装RabbitMQ

可以通过 docker 快速安装 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

访问 http://localhost:15672 登录管理界面,默认用户名/密码为 guest/guest。

整合SpringBoot与Canal实现客户端

创建SpringBoot项目

使用 Spring Initializr 创建一个新的 Spring Boot 项目,添加 Web, JPA, 和 AMQP(用于后续整合 RabbitMQ)依赖。

引入Canal依赖

在 pom.XML 中添加 Canal Client 的依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

编写Canal客户端代码

创建一个 Canal 客户端类,用来监听 MySQL 数据库的变化,并将变更事件转发给 RabbitMQ。

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.rabbitmq.client.Channel;

public class CanalClient {

    private final CanalConnector connector;
    private final Channel channel;

    public CanalClient(CanalConnector connector, Channel channel) {
        this.connector = connector;
        this.channel = channel;
    }

    public void start() {
        // Canal 连接配置
        connector.connect();
        connector.subscribe(".*\\..*"); // 订阅所有数据库和表
        connector.rollback();

        while (true) {
            int BATchSize = 1000;
            EntryBatch batch = connector.getWithoutAck(batchSize); // 获取一批次数据
            long batchId = batch.getId();
            int size = batch.getEntries().size();

            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                printEntry(batch.getEntries());
                connector.ack(batchId); // 提交确认
            }

            if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }

        connector.disconnect();
    }

    private void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChanjavascriptge rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    sendToRabbitMQ(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    sendToRabbitMQ(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    sendToRabbitMQ(rowData.getBeforeColumnsList());

                    System.out.println("-------> after");
                    sendToRabbitMQ(rowData.getAfterColumnsList());
           China编程     }
            }
        }
    }

    private void sendToRabbitMQ(List<Column> columns) {
        StringBuilder message = new StringBuilder();
        for (Column column : columns) {
            message.append(column.getName()).append("=").append(column.getValue()).append(",");
        }
        try {
            channel.basicPublish("", "canal_exchange", null, message.toString().getBytes());
        } catch (IOException e) {
            e.printStackTrawww.chinasem.cnce();
        }
    }
}

Canal整合RabbitMQ

配置Canal Server

确保 Canal Server 已正确配置并启动,能够监听 MySQL 的 binlog 日志。修改 Canal Server 的配置文件以指向你的 MySQL 实例,并设置适当的过滤规则。

配置RabbitMQ Exchange

在 RabbitMQ 中创建一个名为 canal_exchange 的 exchange,类型可以根据需要选择,如 fanout, direct, topic 或 headers。

rabbitmqadmin declare exchange name=canal_exchange type=fanout

SpringBoot整合RabbitMQ

添加依赖

确保在 pom.xml 中已经包含了 RabbitMQ 的 Spring AMQP 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置RabbitMQ连接信息

在 application.yml 或 application.properties 中配置 RabbitMQ 的连接参数。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

创建消费者

编写一个消费者类来接收来自 RabbitMQ 的消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class CanalMessageConsumer {

    @RabbitListener(queues = "canal_queue")
    public void receive(String message) {
        System.out.println("Received message: " + message);
    }
}

配置队列和绑定

确保在应用程序启动时自动创建所需的队列,并将它们绑定到之前创建的 exchange 上。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue canalQueue() {
        return new Queue("canal_queue", false);
    }

    @Bean
    public TopicExchange canalExchange() {
        return new TopicExchange("canal_exchange");
    }

    @Bean
    public Binding binding(Queue canalQueue, TopicExchange canalExchange) {
        return BindingBuilder.bind(canalQueue).to(canalExchange).with("#");
    }
}

总结

通过以上步骤,我们成功地将 Canal 与 RabbitMQ 整合到了 Spring Boot 应用程序中。这使得我们可以实时监听 MySQL 数据库的变更,并将这些变更作为消息发布到 RabbitMQ 中供其他微服务消费。这种方法不仅提高了系统的响应速度,也简化了数据同步的过程,降低了开发和维护成本。

到此这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这了,更多相关SpringBoot监听数据变更内容请搜索编程China编程(www.chinasem.cn)以前的文章或继www.chinasem.cn续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于SpringBoot整合Canal+RabbitMQ监听数据变更详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1152860

相关文章

HTML5的input标签的`type`属性值详解和代码示例

《HTML5的input标签的`type`属性值详解和代码示例》HTML5的`input`标签提供了多种`type`属性值,用于创建不同类型的输入控件,满足用户输入的多样化需求,从文本输入、密码输入、... 目录一、引言二、文本类输入类型2.1 text2.2 password2.3 textarea(严格

SpringBoot简单整合ElasticSearch实践

《SpringBoot简单整合ElasticSearch实践》Elasticsearch支持结构化和非结构化数据检索,通过索引创建和倒排索引文档,提高搜索效率,它基于Lucene封装,分为索引库、类型... 目录一:ElasticSearch支持对结构化和非结构化的数据进行检索二:ES的核心概念Index:

Python数据验证神器Pydantic库的使用和实践中的避坑指南

《Python数据验证神器Pydantic库的使用和实践中的避坑指南》Pydantic是一个用于数据验证和设置的库,可以显著简化API接口开发,文章通过一个实际案例,展示了Pydantic如何在生产环... 目录1️⃣ 崩溃时刻:当你的API接口又双叒崩了!2️⃣ 神兵天降:3行代码解决验证难题3️⃣ 深度

C++ move 的作用详解及陷阱最佳实践

《C++move的作用详解及陷阱最佳实践》文章详细介绍了C++中的`std::move`函数的作用,包括为什么需要它、它的本质、典型使用场景、以及一些常见陷阱和最佳实践,感兴趣的朋友跟随小编一起看... 目录C++ move 的作用详解一、一句话总结二、为什么需要 move?C++98/03 的痛点⚡C++

Java方法重载与重写之同名方法的双面魔法(最新整理)

《Java方法重载与重写之同名方法的双面魔法(最新整理)》文章介绍了Java中的方法重载Overloading和方法重写Overriding的区别联系,方法重载是指在同一个类中,允许存在多个方法名相同... 目录Java方法重载与重写:同名方法的双面魔法方法重载(Overloading):同门师兄弟的不同绝

MySQL中between and的基本用法、范围查询示例详解

《MySQL中betweenand的基本用法、范围查询示例详解》BETWEENAND操作符在MySQL中用于选择在两个值之间的数据,包括边界值,它支持数值和日期类型,示例展示了如何使用BETWEEN... 目录一、between and语法二、使用示例2.1、betwphpeen and数值查询2.2、be

python中的flask_sqlalchemy的使用及示例详解

《python中的flask_sqlalchemy的使用及示例详解》文章主要介绍了在使用SQLAlchemy创建模型实例时,通过元类动态创建实例的方式,并说明了如何在实例化时执行__init__方法,... 目录@orm.reconstructorSQLAlchemy的回滚关联其他模型数据库基本操作将数据添

Spring配置扩展之JavaConfig的使用小结

《Spring配置扩展之JavaConfig的使用小结》JavaConfig是Spring框架中基于纯Java代码的配置方式,用于替代传统的XML配置,通过注解(如@Bean)定义Spring容器的组... 目录JavaConfig 的概念什么是JavaConfig?为什么使用 JavaConfig?Jav

Java数组动态扩容的实现示例

《Java数组动态扩容的实现示例》本文主要介绍了Java数组动态扩容的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1 问题2 方法3 结语1 问题实现动态的给数组添加元素效果,实现对数组扩容,原始数组使用静态分配

Java中ArrayList与顺序表示例详解

《Java中ArrayList与顺序表示例详解》顺序表是在计算机内存中以数组的形式保存的线性表,是指用一组地址连续的存储单元依次存储数据元素的线性结构,:本文主要介绍Java中ArrayList与... 目录前言一、Java集合框架核心接口与分类ArrayList二、顺序表数据结构中的顺序表三、常用代码手动