Datax 支持增量 postgresql writeMode update

2024-09-04 00:18

本文主要是介绍Datax 支持增量 postgresql writeMode update,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Datax 支持 postgresql update

  • datax介绍
    • 支持增量 postgresql update
    • 修改 PostgresqlWriter.java
    • 修改WriterUtil.java
    • 效果
    • 源码

datax介绍

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

支持增量 postgresql update

我们使用datax 希望支持postgresql 增量导入数据:地址:https://gitee.com/cecotw/DataX

链接:https://pan.baidu.com/s/1mbEvLsDZZNWMYrTTTeYkAw 密码:v97c

修改 PostgresqlWriter.java

删除限制:
删除pg写入模式限制

修改WriterUtil.java

添加postgresql 数据插入类型转换:
添加postgresql 数据插入类型转换

    public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| writeMode.trim().toLowerCase().startsWith("update");if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate ||((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) && writeMode.trim().toLowerCase().startsWith("update"))) {//update只在mysql下使用writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();} else {if (dataBaseType == DataBaseType.PostgreSQL) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onConFlictDoString(writeMode, columnHolders)).toString();} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (writeMode.trim().toLowerCase().startsWith("update")) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}}return writeDataSqlTemplate;}

增加onConFlictDoString方法:
增加onConFlictDoString方法

    public static String onConFlictDoString(String conflict, List<String> columnHolders) {conflict = conflict.replace("update", "");StringBuilder sb = new StringBuilder();sb.append(" ON CONFLICT ");sb.append(conflict);sb.append(" DO ");if (columnHolders == null || columnHolders.size() < 1) {sb.append("NOTHING");return sb.toString();}sb.append(" UPDATE SET ");boolean first = true;for (String column : columnHolders) {if (!first) {sb.append(",");} else {first = false;}sb.append(column);sb.append("=excluded.");sb.append(column);}return sb.toString();}

效果

{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "postgresqlreader","parameter": {"username": "postgres","password": "postgres","connection": [{"querySql":["SELECT seq,userid,name FROM user"],"jdbcUrl": ["jdbc:postgresql://127.0.0.1:5432/postgres"]}]}},"writer": {"name": "postgresqlwriter","parameter": {"username": "thsdb","password": "thsdb_outsev","column": ["seq","userid","name"],"connection": [{"jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/postgres","table": ["user1"]}],"writeMode": "update (seq,userid)"}}}]}
}

源码

  • 关于 DATAX改造后的代码 ,参考 这儿.(https://gitee.com/cecotw/DataX)

这篇关于Datax 支持增量 postgresql writeMode update的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1134450

相关文章

MySQL中On duplicate key update的实现示例

《MySQL中Onduplicatekeyupdate的实现示例》ONDUPLICATEKEYUPDATE是一种MySQL的语法,它在插入新数据时,如果遇到唯一键冲突,则会执行更新操作,而不是抛... 目录1/ ON DUPLICATE KEY UPDATE的简介2/ ON DUPLICATE KEY UP

PostgreSQL简介及实战应用

《PostgreSQL简介及实战应用》PostgreSQL是一种功能强大的开源关系型数据库管理系统,以其稳定性、高性能、扩展性和复杂查询能力在众多项目中得到广泛应用,本文将从基础概念讲起,逐步深入到高... 目录前言1. PostgreSQL基础1.1 PostgreSQL简介1.2 基础语法1.3 数据库

python中update()函数的用法和一些例子

《python中update()函数的用法和一些例子》update()方法是字典对象的方法,用于将一个字典中的键值对更新到另一个字典中,:本文主要介绍python中update()函数的用法和一些... 目录前言用法注意事项示例示例 1: 使用另一个字典来更新示例 2: 使用可迭代对象来更新示例 3: 使用

Oracle迁移PostgreSQL隐式类型转换配置指南

《Oracle迁移PostgreSQL隐式类型转换配置指南》Oracle迁移PostgreSQL时因类型差异易引发错误,需通过显式/隐式类型转换、转换关系管理及冲突处理解决,并配合验证测试确保数据一致... 目录一、问题背景二、解决方案1. 显式类型转换2. 隐式转换配置三、维护操作1. 转换关系管理2.

postgresql使用UUID函数的方法

《postgresql使用UUID函数的方法》本文给大家介绍postgresql使用UUID函数的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录PostgreSQL有两种生成uuid的方法。可以先通过sql查看是否已安装扩展函数,和可以安装的扩展函数

PostgreSQL中rank()窗口函数实用指南与示例

《PostgreSQL中rank()窗口函数实用指南与示例》在数据分析和数据库管理中,经常需要对数据进行排名操作,PostgreSQL提供了强大的窗口函数rank(),可以方便地对结果集中的行进行排名... 目录一、rank()函数简介二、基础示例:部门内员工薪资排名示例数据排名查询三、高级应用示例1. 每

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

postgresql数据库基本操作及命令详解

《postgresql数据库基本操作及命令详解》本文介绍了PostgreSQL数据库的基础操作,包括连接、创建、查看数据库,表的增删改查、索引管理、备份恢复及退出命令,适用于数据库管理和开发实践,感兴... 目录1. 连接 PostgreSQL 数据库2. 创建数据库3. 查看当前数据库4. 查看所有数据库

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指

k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)

《k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)》本文记录在K8s上运行的MySQL/MariaDB备份方案,通过工具容器执行mysqldump,结合定时任务实... 目录前言一、获取需要备份的数据库的信息二、备份步骤1.准备工作(X86)1.准备工作(arm)2.手