Mongodb 开启oplog,java监听oplog并写入关系型数据库

2023-12-05 21:15

本文主要是介绍Mongodb 开启oplog,java监听oplog并写入关系型数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:replSetName: localoplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{"ok" : 1,"operationTime" : Timestamp(1627503341, 1),"$clusterTime" : {"clusterTime" : Timestamp(1627503341, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}}
}

监听Oplog日志

pom

 	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.10</version><relativePath/></parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.7</version></dependency><dependency><groupId>com.vividsolutions</groupId><artifactId>jts</artifactId><version>1.13</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-spatial</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-java8</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>com.bedatadriven</groupId><artifactId>jackson-datatype-jts</artifactId><version>2.3</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>runtime</scope></dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate MongoTemplate mongoTemplate;@Resourceprivate EntityManager entityManager;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");MongoCollection<Document> oplog = db.getCollection("oplog.rs");BsonTimestamp startTS = getStartTimestamp();BsonTimestamp endTS = getEndTimestamp();Bson filter = Filters.and(Filters.gt("ts", startTS));MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();while (true) {if (cursor.hasNext()) {Document doc = cursor.next();String operation = doc.getString("op");if (!"n".equals(operation)) {String namespace = doc.getString("ns");String[] nsParts = StringUtils.split(namespace, ".");String collectionName = nsParts[1];String databaseName = nsParts[0];Document object = (Document) doc.get("o");log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);if ("i".equals(operation)) {insert((Document) doc.get("o"), databaseName, collectionName);} else if ("u".equals(operation)) {update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);} else if ("d".equals(operation)) {delete((Document) doc.get("o"), databaseName, collectionName);}}}}}private BsonTimestamp getStartTimestamp() {long currentSeconds = System.currentTimeMillis() / 1000;return new BsonTimestamp((int) currentSeconds, 1);}private BsonTimestamp getEndTimestamp() {return new BsonTimestamp(0, 1);}private void insert(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void update(Document object, Document update, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);String updateJson = JSON.serialize(update);Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");query.setParameter("json", json);query.setParameter("updateJson", updateJson);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void delete(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}
}

这篇关于Mongodb 开启oplog,java监听oplog并写入关系型数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/459166

相关文章

Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式

《Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式》本文详细介绍如何使用Java通过JDBC连接MySQL数据库,包括下载驱动、配置Eclipse环境、检测数据库连接等关键步骤,... 目录一、下载驱动包二、放jar包三、检测数据库连接JavaJava 如何使用 JDBC 连接 mys

SpringBoot线程池配置使用示例详解

《SpringBoot线程池配置使用示例详解》SpringBoot集成@Async注解,支持线程池参数配置(核心数、队列容量、拒绝策略等)及生命周期管理,结合监控与任务装饰器,提升异步处理效率与系统... 目录一、核心特性二、添加依赖三、参数详解四、配置线程池五、应用实践代码说明拒绝策略(Rejected

一文详解SpringBoot中控制器的动态注册与卸载

《一文详解SpringBoot中控制器的动态注册与卸载》在项目开发中,通过动态注册和卸载控制器功能,可以根据业务场景和项目需要实现功能的动态增加、删除,提高系统的灵活性和可扩展性,下面我们就来看看Sp... 目录项目结构1. 创建 Spring Boot 启动类2. 创建一个测试控制器3. 创建动态控制器注

Java操作Word文档的全面指南

《Java操作Word文档的全面指南》在Java开发中,操作Word文档是常见的业务需求,广泛应用于合同生成、报表输出、通知发布、法律文书生成、病历模板填写等场景,本文将全面介绍Java操作Word文... 目录简介段落页头与页脚页码表格图片批注文本框目录图表简介Word编程最重要的类是org.apach

Spring Boot中WebSocket常用使用方法详解

《SpringBoot中WebSocket常用使用方法详解》本文从WebSocket的基础概念出发,详细介绍了SpringBoot集成WebSocket的步骤,并重点讲解了常用的使用方法,包括简单消... 目录一、WebSocket基础概念1.1 什么是WebSocket1.2 WebSocket与HTTP

SpringBoot+Docker+Graylog 如何让错误自动报警

《SpringBoot+Docker+Graylog如何让错误自动报警》SpringBoot默认使用SLF4J与Logback,支持多日志级别和配置方式,可输出到控制台、文件及远程服务器,集成ELK... 目录01 Spring Boot 默认日志框架解析02 Spring Boot 日志级别详解03 Sp

java中反射Reflection的4个作用详解

《java中反射Reflection的4个作用详解》反射Reflection是Java等编程语言中的一个重要特性,它允许程序在运行时进行自我检查和对内部成员(如字段、方法、类等)的操作,本文将详细介绍... 目录作用1、在运行时判断任意一个对象所属的类作用2、在运行时构造任意一个类的对象作用3、在运行时判断

java如何解压zip压缩包

《java如何解压zip压缩包》:本文主要介绍java如何解压zip压缩包问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java解压zip压缩包实例代码结果如下总结java解压zip压缩包坐在旁边的小伙伴问我怎么用 java 将服务器上的压缩文件解压出来,

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Spring WebFlux 与 WebClient 使用指南及最佳实践

《SpringWebFlux与WebClient使用指南及最佳实践》WebClient是SpringWebFlux模块提供的非阻塞、响应式HTTP客户端,基于ProjectReactor实现,... 目录Spring WebFlux 与 WebClient 使用指南1. WebClient 概述2. 核心依