MongoDB CDC 导入 Elasticsearch

2024-08-29 08:20

本文主要是介绍MongoDB CDC 导入 Elasticsearch,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、docker-compose

version: '3'
services:mongo:image: "mongo:4.0-xenial"command: --replSet rs0 --smallfiles --oplogSize 128ports:- "27017:27017"environment:- MONGO_INITDB_ROOT_USERNAME=mongouser- MONGO_INITDB_ROOT_PASSWORD=mongopwelasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- "5601:5601"

二、进入 MongoDB 容器,初始化副本集和数据

docker-compose exec mongo /usr/bin/mongo -u mongouser -p mongopw
// 1. 初始化副本集
rs.initiate();
rs.status();// 2. 切换数据库
use mgdb;// 3. 初始化数据
db.orders.insertMany([{order_id: 101,order_date: ISODate("2020-07-30T10:08:22.001Z"),customer_id: 1001,price: NumberDecimal("50.50"),product: {name: 'scooter',description: 'Small 2-wheel scooter'},order_status: false},{order_id: 102, order_date: ISODate("2020-07-30T10:11:09.001Z"),customer_id: 1002,price: NumberDecimal("15.00"),product: {name: 'car battery',description: '12V car battery'},order_status: false},{order_id: 103,order_date: ISODate("2020-07-30T12:00:30.001Z"),customer_id: 1003,price: NumberDecimal("25.25"),product: {name: 'hammer',description: '16oz carpenter hammer'},order_status: false}
]);db.customers.insertMany([{ customer_id: 1001, name: 'Jark', address: 'Hangzhou' },{ customer_id: 1002, name: 'Sally',address: 'Beijing'},{ customer_id: 1003,name: 'Edward',address: 'Shanghai'}
]);

三、下载以下 jar 包到 <FLINK_HOME>/lib/

下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
  • flink-sql-connector-mongodb-cdc-2.4.0.jar

 四、然后启动 Flink 集群,再启动 SQL CLI.

-- Flink SQL
-- 设置间隔时间为3秒                       
Flink SQL> SET execution.checkpointing.interval = 3s;-- 设置本地时区为 Asia/Shanghai
Flink SQL> SET table.local-time-zone = Asia/Shanghai;Flink SQL> CREATE TABLE orders (_id STRING,order_id INT,order_date TIMESTAMP_LTZ(3),customer_id INT,price DECIMAL(10, 5),product ROW<name STRING, description STRING>,order_status BOOLEAN,PRIMARY KEY (_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017','username' = 'mongouser','password' = 'mongopw','database' = 'mgdb','collection' = 'orders');Flink SQL> CREATE TABLE customers (_id STRING,customer_id INT,name STRING,address STRING,PRIMARY KEY (_id) NOT ENFORCED) WITH ('connector' = 'mongodb-cdc','hosts' = 'localhost:27017','username' = 'mongouser','password' = 'mongopw','database' = 'mgdb','collection' = 'customers');Flink SQL> CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP_LTZ(3),customer_id INT,price DECIMAL(10, 5),product ROW<name STRING, description STRING>,order_status BOOLEAN,customer_name STRING,customer_address STRING,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders');Flink SQL> INSERT INTO enriched_ordersSELECT o.order_id,o.order_date,o.customer_id,o.price,o.product,o.order_status,c.name,c. addressFROM orders AS oLEFT JOIN customers AS c ON o.customer_id = c.customer_id;

五、修改 MongoDB 里面的数据,观察 elasticsearch 里的结果 

db.orders.insert({order_id: 104,order_date: ISODate("2020-07-30T12:00:30.001Z"),customer_id: 1004,price: NumberDecimal("25.25"),product: {name: 'rocks',description: 'box of assorted rocks'},order_status: false
});db.customers.insert({customer_id: 1004,name: 'Jacob',address: 'Shanghai'
});db.orders.updateOne({ order_id: 104 },{ $set: { order_status: true } }
);db.orders.deleteOne({ order_id : 104 }
);

这篇关于MongoDB CDC 导入 Elasticsearch的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1117340

相关文章

基于MongoDB实现文件的分布式存储

《基于MongoDB实现文件的分布式存储》分布式文件存储的方案有很多,今天分享一个基于mongodb数据库来实现文件的存储,mongodb支持分布式部署,以此来实现文件的分布式存储,需要的朋友可以参考... 目录一、引言二、GridFS 原理剖析三、Spring Boot 集成 GridFS3.1 添加依赖

MySQL Workbench工具导出导入数据库方式

《MySQLWorkbench工具导出导入数据库方式》:本文主要介绍MySQLWorkbench工具导出导入数据库方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录mysql Workbench工具导出导入数据库第一步 www.chinasem.cn数据库导出第二步

将图片导入Python的turtle库的详细过程

《将图片导入Python的turtle库的详细过程》在Python编程的世界里,turtle库以其简单易用、图形化交互的特点,深受初学者喜爱,随着项目的复杂度增加,仅仅依靠线条和颜色来绘制图形可能已经... 目录开篇引言正文剖析1. 理解基础:Turtle库的工作原理2. 图片格式与支持3. 实现步骤详解第

POI从入门到实战轻松完成EasyExcel使用及Excel导入导出功能

《POI从入门到实战轻松完成EasyExcel使用及Excel导入导出功能》ApachePOI是一个流行的Java库,用于处理MicrosoftOffice格式文件,提供丰富API来创建、读取和修改O... 目录前言:Apache POIEasyPoiEasyExcel一、EasyExcel1.1、核心特性

关于MongoDB图片URL存储异常问题以及解决

《关于MongoDB图片URL存储异常问题以及解决》:本文主要介绍关于MongoDB图片URL存储异常问题以及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录MongoDB图片URL存储异常问题项目场景问题描述原因分析解决方案预防措施js总结MongoDB图

Elasticsearch 在 Java 中的使用教程

《Elasticsearch在Java中的使用教程》Elasticsearch是一个分布式搜索和分析引擎,基于ApacheLucene构建,能够实现实时数据的存储、搜索、和分析,它广泛应用于全文... 目录1. Elasticsearch 简介2. 环境准备2.1 安装 Elasticsearch2.2 J

Java导入、导出excel用法步骤保姆级教程(附封装好的工具类)

《Java导入、导出excel用法步骤保姆级教程(附封装好的工具类)》:本文主要介绍Java导入、导出excel的相关资料,讲解了使用Java和ApachePOI库将数据导出为Excel文件,包括... 目录前言一、引入Apache POI依赖二、用法&步骤2.1 创建Excel的元素2.3 样式和字体2.

浅析Python中的绝对导入与相对导入

《浅析Python中的绝对导入与相对导入》这篇文章主要为大家详细介绍了Python中的绝对导入与相对导入的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1 Imports快速介绍2 import语句的语法2.1 基本使用2.2 导入声明的样式3 绝对import和相对i

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

Go Mongox轻松实现MongoDB的时间字段自动填充

《GoMongox轻松实现MongoDB的时间字段自动填充》这篇文章主要为大家详细介绍了Go语言如何使用mongox库,在插入和更新数据时自动填充时间字段,从而提升开发效率并减少重复代码,需要的可以... 目录前言时间字段填充规则Mongox 的安装使用 Mongox 进行插入操作使用 Mongox 进行更