Flink CDC 1.18.1 Oracle 数据同步到postgresql

2024-03-25 08:04

本文主要是介绍Flink CDC 1.18.1 Oracle 数据同步到postgresql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、下载flink-1.18.1-bin-scala_2.12.tgz,linux通过:

  wget https://archive.apache.org/dist/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz

2、oracle11g客户端安装,下载:

instantclient-basic-linux.x64-11.2.0.4.0.zip
instantclient-sdk-linux.x64-11.2.0.4.0.zip
instantclient-sqlplus-linux.x64-11.2.0.4.0.zip

以上文件,在ORACLE网站下载。

3、配置oracle客户端:

[root@wn1 ~]# ls /usr/local/
[root@wn1 ~]# cp instantclient-* /usr/local
[root@wn1 ~]# cd /usr/local
[root@wn1 ~]# unzip instantclient-basic-linux.x64-11.2.0.4.0.zip
[root@wn1 ~]# unzip instantclient-sdk-linux.x64-11.2.0.4.0.zip
[root@wn1 ~]# unzip instantclient-sqlplus-linux.x64-11.2.0.4.0.zip
[root@wn1 ~]# mv instantclient_11_2 oracle_11
[root@wn1 ~]# rm instantclient-*
[root@wn1 ~]# vi /etc/profile
#增加以下内容
export ORACLE_HOME=/usr/local/oracle_11
export PATH=.:${PATH}:$ORACLE_HOME
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ORACLE_HOME#保存退出,执行
[root@wn1 ~]# source /etc/profile#修改ld配置
[root@wn1 ~]#  vi /etc/ld.so.conf.d/oracle.conf
#写入内容
/usr/local/oracle_11
#保存退出,执行
[root@wn1 ~]#ldconfig
#配置oracle连接参数
[root@wn1 ~]# mkdir -p network/admin/
[root@wn1 ~]# cd network/admin/
#找一个tnsnames.ora文件,直接上传到服务器
[root@wn1 ~]# cp /root/tnsnames.ora ./#测试连接
[root@wn1 ~]# sqlplus sys/manager@//192.168.56.1/orcl
SQL*Plus: Release 11.2.0.4.0 Production on Sun Mar 24 18:04:15 2024Copyright (c) 1982, 2013, Oracle.  All rights reserved.Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing optionsSQL>

4、配置oracle数据库,启用归档日志,这步需要参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/legacy-flink-cdc-sources/oracle-cdc/

5、下载oracle cdc 连接器

wget https://maven.aliyun.com/repository/public/com/ververica/flink-sql-connector-oracle-cdc/3.0.1/flink-sql-connector-oracle-cdc-3.0.1.jar

解压:

tar zxvf flink-1.18.1-bin-scala_2.12.tgz

将flink-sql-connector-oracle-cdc-3.0.1.jar复制到flink-1.18.1/lib目录中

6、下载 flink-connector-jdbc-3.1.1-1.17.jar,postgresql-42.7.3.jar

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar

https://jdbc.postgresql.org/download/postgresql-42.7.3.jar

将jar包复制到flink-1.18.1/lib目录中

7、安装postgresql就不说了,相信你已经有了数据库了

8、修改Flink的配置文件 /home/flink/flink-1.18.1/conf/flink-conf.yaml ,主要是各种服务的绑定地址,默认为localhost,统统改为0.0.0.0,如:rest.address: 0.0.0.0 #localhost
9、启动

[flink@cn1 bin]$ ./start-cluster.sh
[flink@cn1 bin]$ ./sql-client.sh
Flink SQL>
#创建ORACLE源表SET execution.checkpointing.interval = 3s;create table SYS_DIC_DEPT
(DEPT_CODE       STRING,DEPT_NAME       STRING,DEPT_ADDR       STRING,DEPT_MEMO       STRING,DEPT_FLAG       STRING,DEPT_GZYZLTJFLAG STRING,DEPT_UPPER       STRING,PRIMARY KEY (DEPT_CODE) NOT ENFORCED
)WITH ('connector' = 'oracle-cdc','hostname' = '192.168.56.1','port' = '1521','username' = 'username','password' = '123456','database-name' = 'ORCL','schema-name' = 'schema-name','table-name' = 'table-name','debezium.log.mining.strategy'='online_catalog','debezium.log.mining.continuos.mine'='true','debezium.snapshot.mode' = 'initial','debezium.database.tablename.case.insensitive'='true');Flink SQL> select * from SYS_DIC_DEPT;

看不数据,请检查ORACLE的字段是否全部大写

10、创建PG Sink:

Flink SQL>
create table sys_dic_dept_sink
(dept_code       STRING PRIMARY KEY,dept_name       STRING,dept_addr       STRING,dept_memo       STRING,dept_flag       STRING,dept_gzyzltjflag STRING,dept_upper       STRING,PRIMARY KEY (dept_code) NOT ENFORCED
)
with(
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.56.90:5432/postgres?currentSchema=public', 
'username' = 'postgres',
'password' = '123456',  
'table-name' = 'sys_dic_dept'
);

11、抽数据

Flink SQL> insert into sys_dic_dept_sink select * from SYS_DIC_DEPT;

12、查看任务执行 http://192.168.56.90:8081/#/job/running

这篇关于Flink CDC 1.18.1 Oracle 数据同步到postgresql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/844345

相关文章

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

postgresql使用UUID函数的方法

《postgresql使用UUID函数的方法》本文给大家介绍postgresql使用UUID函数的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录PostgreSQL有两种生成uuid的方法。可以先通过sql查看是否已安装扩展函数,和可以安装的扩展函数

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

PostgreSQL中rank()窗口函数实用指南与示例

《PostgreSQL中rank()窗口函数实用指南与示例》在数据分析和数据库管理中,经常需要对数据进行排名操作,PostgreSQL提供了强大的窗口函数rank(),可以方便地对结果集中的行进行排名... 目录一、rank()函数简介二、基础示例:部门内员工薪资排名示例数据排名查询三、高级应用示例1. 每

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法