数仓数据异构方案

2024-08-28 13:36
文章标签 数据 数仓 方案 异构

本文主要是介绍数仓数据异构方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

数仓学习|几种常见的数据同步方式_1.数据仓库数据同步的方式有哪两种方式-CSDN博客

总结

流程

异构数据(不同来源的DB业务数据) - > ODS  - > Hive

1,直连

概念

即直接连接到数据库,select,把查询的数据存到本地(本地应该指中间件本地)文件,然后把文件load到数仓中

弊端

1,时间越来越长,随着数据量的不断增加

2,性能,直接查询数据库,影响数据库性能。如果有备库,不影响主库性能,但是量大查询性能依然慢

全量

中间件

sqoop

2,实时增量

概念

原理基于binlog日志解析

优势

避免了直连的弊端,实时行高

增量

中间件

DataX(开源版本,DataWorks商业版) canal  Kettle TimeTunnel (TT)

【知识】ETL大数据集成工具Sqoop、dataX、Kettle、Canal、StreamSets大比拼-腾讯云开发者社区-腾讯云

DataX官网地址

DataX/introduction.md at master · alibaba/DataX · GitHub 

目录
前言
(1)常见数据同步方式
(1.1)直连同步
(1.2)实时增量同步(日志解析)
(2)流式数据集成实现
前言
数据仓库的特性之一是集成,即首先把未经过加工处理的、不同来源的、不同形式的数据同步到ODS层,一般情况下,这些ODS层数据包括日志数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中),将数据采集并导入到数仓中(通常是Hive或者MaxCompute)是非常重要的一个环节。

那么,该如何将业务DB数据高效准确地同步到数仓中呢?

一般企业会使用两种方案:

直连同步
实时增量同步(数据库日志解析)


其中直连同步的基本思路是直连数据库进行SELECT,然后将查询的数据存储到本地文件作为中间存储,最后把文件Load到数仓中。这种方式非常的简单方便,但是随着业务的发展,会遇到一些瓶颈,具体见下文分析。

为了解决这些问题,一般会使用实时增量的方式进行数据同步,其基本原理是CDC (Change Data Capture) + Merge,即实时Binlog采集 + 离线处理Binlog还原业务数据这样一套解决方案。

(1)常见数据同步方式
(1.1)直连同步
直连同步是指通过定义好的规范接口API和基于动态链接库的方式直接连接业务库,比如ODBC/JDBC等规定了统一的标准接口,不同的数据库基于这套标准提供规范的驱动,从而支持完全相同的函数调用和SQL实现。比如经常使用的Sqoop就是采取这种方式进行批量数据同步的。

直连同步的方式配置十分简单,很容易上手操作,比较适合操作型业务系统的数据同步,但是会存在以下问题:

数据同步时间:随着业务规模的增长,数据同步花费的时间会越来越长,无法满足下游数仓生产的时间要求。
性能瓶颈(关键):直连数据库查询数据,对数据库影响非常大,容易造成慢查询,如果业务库没有采取主备策略,则会影响业务线上的正常服务,如果采取了主备策略,虽然可以避免对业务系统的性能影响,但当数据量较大时,性能依然会很差。


(1.2)实时增量同步(日志解析)
所谓日志解析,即解析数据库的变更日志,比如MySQL的Binlog日志,Oracle的归档日志文件。通过读取这些日志信息,收集变化的数据并将其解析到目标存储中即可完成数据的实时同步。这种读操作是在操作系统层面完成的,不需要通过数据库,因此不会给源数据库带来性能上的瓶颈。

数据库日志解析的同步方式可以实现实时与准实时的同步,延迟可以控制在毫秒级别的,其最大的优势就是性能好、效率高,不会对源数据库造成影响,目前,从业务系统到数据仓库中的实时增量同步,广泛采取这种方式。当然,这种方式也会存在一些问题,比如批量补数时造成大量数据更新,日志解析会处理较慢,造成数据延迟。除此之外,这种方式比较复杂,投入也较大,因为需要一个实时的抽取系统去抽取并解析日志,下文会对此进行详细解释。


如上图所示架构,在直连同步基础之上增加了流式同步的链路,经过流式计算引擎把相应的 Binlog 采集到 Kafka,同时会经过一个 Kafka 2Hive 的程序把它导入到原始数据,再经过一层 Merge,产出下游需要的 ODS 数据。

上述的数据集成方式优势是非常明显的,把数据传输的时间放到了 T+0 这一天去做,在第二天的时候只需要去做一次 merge 就可以了。非常节省时间和计算资源。

两种数据同步方式比较:


(2)流式数据集成实现
实现思路

首先,采用Flink负责把Kafka上的Binlog数据拉取到HDFS上,生成增量表。

然后,对每张ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的全量数据读取到Hive上,这一过程底层采用直连MySQL去Select数据的方式,可以使用Sqoop进行一次性全量导入,生成一张全量表。

最后,对每张ODS表,每天基于全量数据和当天增量产生的Binlog做Merge,从而还原出业务数据。

Binlog是流式产生的,通过对Binlog的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对MySQL的访问压力上,都会有明显地改善。Binlog本身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

关于Binlog解析部分,可以使用canal工具,采集到Kafka之后,可以使用Flink解析kafka数据并写入到HDFS上,解析kafka的数据可以使用Flink的DataStreamAPI,也可以使用FlinkSQL的canal-json数据源格式进行解析,使用FlinkSQL相对来说是比较简单的。下面是canal-json格式的kafka数据源。

CREATE TABLE region (
  id BIGINT,
  region_name STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 

数据解析完成之后,下面的就是合并还原完整数据的过程,关于合并还原数据,一种比较常见的方式就是全外连接(FULL OUTER JOIN)。具体如下:

生成增量表与全量表的Merge任务,当天的增量数据与昨天的全量数据进行全外连接,该Merge任务的基本逻辑是:

INSERT OVERWRITE TABLE user_order PARTITION(ds='20211012')
SELECT  CASE    WHEN n.id IS NULL THEN o.id 
                ELSE n.id 
        END
        ,CASE    WHEN n.id IS NULL THEN o.create_time 
                 ELSE n.create_time 
         END
        ,CASE    WHEN n.id IS NULL THEN o.modified_time
                 ELSE n.modified_time 
         END
        ,CASE    WHEN n.id IS NULL THEN o.user_id 
                 ELSE n.user_id 
         END
        
        ,CASE    WHEN n.id IS NULL THEN o.sku_code 
                 ELSE n.sku_code 
         END
        ,CASE    WHEN n.id IS NULL THEN o.pay_fee
                 ELSE n.pay_fee 
         END
FROM    (
            SELECT  *
            FROM    user_order_delta
            WHERE   ds = '20211012'
            AND     id IS NOT NULL
            AND     user_id IS NOT NULL
        ) n
FULL OUTER JOIN (-- 全外连接进行数据merge
                    SELECT  *
                    FROM    user_order
                    WHERE   ds = '20211011'
                    AND     id IS NOT NULL
                    AND     user_id IS NOT NULL
                
                ) o
ON      o.id = n.id
AND     o.user_id = n.user_id
;

经过上述步骤,即可将数据还原完整。


————————————————

                            版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
                        
原文链接:https://blog.csdn.net/weixin_45366499/article/details/120752717

这篇关于数仓数据异构方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1114915

相关文章

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

MySQL 迁移至 Doris 最佳实践方案(最新整理)

《MySQL迁移至Doris最佳实践方案(最新整理)》本文将深入剖析三种经过实践验证的MySQL迁移至Doris的最佳方案,涵盖全量迁移、增量同步、混合迁移以及基于CDC(ChangeData... 目录一、China编程JDBC Catalog 联邦查询方案(适合跨库实时查询)1. 方案概述2. 环境要求3.