DataX实战应用

2024-05-14 15:18
文章标签 实战 应用 datax

本文主要是介绍DataX实战应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

前言

系统架构

关键实现

系统目前使用现状

DataX使用心得


前言

DataX是阿里开源数据同步工具,实现异构数据源的数据同步,Github地址:https://github.com/alibaba/DataX,企业存储离线数据到数仓,但是没办法对接业务,本次实践主要是运用DataX实现数据从数仓导入到MySQL,从而对接业务,另外,对数仓数据的流出进行管理。

一般从数仓数据导入到MySQL中,可以从hive查询存储到一个文件里面,如果是数据量比较大的情况下先将文件按一定行数切分为多个文件,然后遍历文件往MySQL中导入,这种方式虽然简单,缺点在于对于每一个导入需求,都需要写一个job,并且每次都会产生临时文件,mysql load会比较占用资源,之所以选择了DataX,因为它能实现hdfs导入MySQL,速度快,能实现增量全量,可以分表,能减少很多技术的实现成本。

系统架构

本次应用主要需实现一个Client和Server,依托于Hadoop集群,通过Schedule基础平台定时调度调用client实现数据的同步。Server是一个管理项目,后端使用Spring MVC工程,对接了元数据系统,直接从前端页面可以配置从Hive表导入到具体的MySQL表,client是一个jar包,通过HTTP和MQ和Server通信。

运行流程

1.Schedule通过java -jar client.jar启动client

2.client通过http接口调用到server,获取到本次要同步的一个job,通过job的信息生成一个json串,分配一个线程,client中通过python datax.py xxx.json调用DataX完成数据同步

3.client完成json文件的输出,输出到指定的文件夹下,文件名采用时间戳命名。

4. client中通过执行shell命令完成对DataX的调用

发布流程:

1.DataX部署,这里只使用到了mysqlwriter和hdfsreader组件,所以通过clone DataX源码,删除掉了其余不用的组件代码,手动编译进行部署,编译后大约占用200M空间,部署到hadoop集群的各个slave机器,正常情况下DataX只需要一次部署。

2.server发布,server通过jenkins发布为一个web工程,后端提供http接口,并监听MQ。

3.client发布,client通过jenkins发布到hadoop集群的slave机器。

关键实现

1.日志监控,schedule调起client任务一次对应的日志应该都要全部输出到schedule中,并且应该要捕捉到DataX的运行日志。关键代码:

//commond是运行DataX的命令:python datax/bin/datax.py xxx.json 
Process process = RUNTIME.exec(command);
try (BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()))) {String line;while ((line = input.readLine()) != null) {System.out.println(line);}
} catch (IOException e) {LOG.error("读取DataX中job运行日志异常", e);
}

2.作业告警,采用了Schedule调度系统的告警,DataX导入失败会触发调度系统的告警,然后通过电话和邮件对负责人进行通知。

3.Hive分区表,hive中对单个表进行分区存储,一般按照日期进行分区,在schedule调起client时可以传入一个日期参数,然后根据日期生成一个日期分区,在hdfsreader中的分区中带入即可:

 "path": "/user/hive/warehouse/order/20180831","defaultFS": "hdfs://localhost:2181",

4.MySQL增量和全量,DataX很贴心,mysqlwriter可以选择导入的模式。

"writer":{"name": "mysqlwriter","parameter": {"writeMode": "insert",  // 写入模式可以选择insert/replace/update"username": "root","password": "root","column": ["id","name"],"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from test" // 预执行SQL,可以选择一个delete语句],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk","table": ["test"]}]}
}

其中parameter中的writeMode可以选择insert,update,replace。并且有preSql选项,也就是在导入之前可以执行一些SQL,对我们全量数据和增量数据提供了有效的方案。

全量数据导入:通过preSql传入一条delete语句,然后writerMode选择insert执行导入(甚至如果有比较高的权限,可以直接用truncate语句,但是一般dba才有的这样权限),在delete的时候需要注意,如果数据量太大的情况下,一条delete就足以让dba找上门拉,所以添加索引就有必要拉,根据索引删除就会快一些。

增量数据导入:选择writeMode使用update,这里dataX是通过on duplicate key update语句来实现数据的增量实现,也就是说表里面有唯一键或者主键冲突时,覆盖原来的数据,这就很容易理解了,比如订单数据,订单号肯定是一个唯一键,如果导入一个已存在的订单,数据肯定会覆盖原来的数据。

5.为什么要用MQ,这里client在调用DataX执行完成之后,应该要通知到server,当前的任务执行完毕,server会更新当前任务的状态,对于简单检查任务的状态时,就不用去查看schedule中的日志了。

系统目前使用现状

1.稳定性强,系统上线2个月以来,有大约20个作业在运行,其中有5个小时运行的作业,从来没有出现过问题。

2.速度快,目前同步数据量最大的一个表大概是1.4亿的数据量,同步的时间在1100s左右,也就是18分钟到19分钟这样一个时间,每秒最高写入量达到14w+,平均每秒写入量大约在13w左右的速度,速度保持一个平均的趋势,下面是一个运行结果的截图

DataX使用心得

1.DataX是一个高可用的数据同步工具,稳定性强,速度快,上手快(不知道二次开发会不会困难,有机会可以试试,但是目前的功能已经能满足很大一部分需求)。

2.事务的支持不足,在github上看到的DataX支持的一个线程中的事务,在测试过程中没有体会到这种小事务能带来多大的收益,在有脏数据的情况下,如果当前线程的数据出现问题执行回滚,其实对于整体来说数据是不完整的,还是有一部分数据已经导入到MySQL 了,所以单个线程的事务其实对于数据量较大的数据来说收益不大,除非作业是通过一个线程来跑的。

但是,另一方面,如果在大量数据导入执行一个事务,对MySQL来说无疑是一个灾难,所以最好的办法是保证系统结构稳定。

这里其实有一个办法,如果是全量的数据,在job跑失败之后重新跑,能证数据的准确性,但是跑失败的那段时间数据是有缺失的;如果是增量数据导入,重新跑一下job也能保证数据没有问题。但是在那段小时间内还是有问题的。

确实是没有什么好的办法去支持完整的事务。

欢迎交流。

这篇关于DataX实战应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/989099

相关文章

Python标准库之数据压缩和存档的应用详解

《Python标准库之数据压缩和存档的应用详解》在数据处理与存储领域,压缩和存档是提升效率的关键技术,Python标准库提供了一套完整的工具链,下面小编就来和大家简单介绍一下吧... 目录一、核心模块架构与设计哲学二、关键模块深度解析1.tarfile:专业级归档工具2.zipfile:跨平台归档首选3.

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

使用IDEA部署Docker应用指南分享

《使用IDEA部署Docker应用指南分享》本文介绍了使用IDEA部署Docker应用的四步流程:创建Dockerfile、配置IDEADocker连接、设置运行调试环境、构建运行镜像,并强调需准备本... 目录一、创建 dockerfile 配置文件二、配置 IDEA 的 Docker 连接三、配置 Do

深入浅出SpringBoot WebSocket构建实时应用全面指南

《深入浅出SpringBootWebSocket构建实时应用全面指南》WebSocket是一种在单个TCP连接上进行全双工通信的协议,这篇文章主要为大家详细介绍了SpringBoot如何集成WebS... 目录前言为什么需要 WebSocketWebSocket 是什么Spring Boot 如何简化 We

java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)

《java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)》:本文主要介绍java中pdf模版填充表单踩坑的相关资料,OpenPDF、iText、PDFBox是三... 目录准备Pdf模版方法1:itextpdf7填充表单(1)加入依赖(2)代码(3)遇到的问题方法2:pd

Java Stream流之GroupBy的用法及应用场景

《JavaStream流之GroupBy的用法及应用场景》本教程将详细介绍如何在Java中使用Stream流的groupby方法,包括基本用法和一些常见的实际应用场景,感兴趣的朋友一起看看吧... 目录Java Stream流之GroupBy的用法1. 前言2. 基础概念什么是 GroupBy?Stream

python中列表应用和扩展性实用详解

《python中列表应用和扩展性实用详解》文章介绍了Python列表的核心特性:有序数据集合,用[]定义,元素类型可不同,支持迭代、循环、切片,可执行增删改查、排序、推导式及嵌套操作,是常用的数据处理... 目录1、列表定义2、格式3、列表是可迭代对象4、列表的常见操作总结1、列表定义是处理一组有序项目的

C#中的Converter的具体应用

《C#中的Converter的具体应用》C#中的Converter提供了一种灵活的类型转换机制,本文详细介绍了Converter的基本概念、使用场景,具有一定的参考价值,感兴趣的可以了解一下... 目录Converter的基本概念1. Converter委托2. 使用场景布尔型转换示例示例1:简单的字符串到

Spring Boot Actuator应用监控与管理的详细步骤

《SpringBootActuator应用监控与管理的详细步骤》SpringBootActuator是SpringBoot的监控工具,提供健康检查、性能指标、日志管理等核心功能,支持自定义和扩展端... 目录一、 Spring Boot Actuator 概述二、 集成 Spring Boot Actuat

PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例

《PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例》词嵌入解决NLP维度灾难,捕捉语义关系,PyTorch的nn.Embedding模块提供灵活实现,支持参数配置、预训练及变长... 目录一、词嵌入(Word Embedding)简介为什么需要词嵌入?二、PyTorch中的nn.Em