【甘道夫】Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表

本文主要是介绍【甘道夫】Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

需求
将Oracle中的业务基础表增量数据导入Hive中,与当前的全量表合并为最新的全量表。

***欢迎转载,请注明来源***   
http://blog.csdn.net/u010967382/article/details/38735381

设计
涉及的三张表:
  • 全量表保存了截止上一次同步时间的全量基础数据表
  • 增量表:增量临时表
  • 更新后的全量表:更新后的全量数据表

步骤:
  1. 通过Sqoop将Oracle中的表导入Hive,模拟全量表和增量表
  2. 通过Hive将“全量表+增量表”合并为“更新后的全量表”,覆盖当前的全量表

步骤1: 通过Sqoop将Oracle中表的导入Hive,模拟全量表和增量表
为了模拟场景,需要一张全量表,和一张增量表,由于数据源有限,所以两个表都来自Oracle中的OMP_SERVICE,全量表包含所有数据,在Hive中名称叫service_all,增量表包含部分时间段数据,在Hive中名称叫service_tmp。

(1)全量表导入:导出所有数据,只要部分字段,导入到Hive指定表里
为实现导入Hive功能,需要先配置HCatalog(HCatalog是Hive子模块)的环境变量,/etc/profile中新增:
export HCAT_HOME=/home/fulong/Hive/apache-hive-0.13.1-bin/hcatalog

执行以下命令导入数据:
fulong@FBI006:~/Sqoop/sqoop-1.4.4/bin$ ./sqoop import \
> --connect jdbc:oracle:thin:@192.168.0.147:1521:ORCLGBK  --username SP --password fulong \
> --table OMP_SERVICE \
> --columns "SERVICE_CODE,SERVICE_NAME,SERVICE_PROCESS,CREATE_TIME,ENABLE_ORG,ENABLE_PLATFORM,IF_DEL" \
> --hive-import --hive-table SERVICE_ALL

注意:用户名必须大写

(2)增量表导入:只导出所需时间范围内的数据,只要部分字段,导入到Hive指定表里
使用以下命令导入数据:
fulong@FBI006:~/Sqoop/sqoop-1.4.4/bin$ ./sqoop import \
> --connect jdbc:oracle:thin:@192.168.0.147:1521:ORCLGBK  --username SP --password fulong \
> --table OMP_SERVICE \
> --columns "SERVICE_CODE,SERVICE_NAME,SERVICE_PROCESS,CREATE_TIME,ENABLE_ORG,ENABLE_PLATFORM,IF_DEL" \
> --where "CREATE_TIME > to_date('2012/12/4 17:00:00','yyyy-mm-dd hh24:mi:ss') and CREATE_TIME < to_date('2012/12/4 18:00:00','yyyy-mm-dd hh24:mi:ss')" \
> --hive-import --hive-overwrite --hive-table SERVICE_TMP

注意:
  1. 由于使用了--hive-overwrite参数,所以该语句可反复执行,往service_tmp表中覆盖插入最新的增量数据;
  2. Sqoop还支持使用复杂Sql语句查询数据导入,相亲参见http://sqoop.apache.org/docs/1.4.4/SqoopUserGuide.html的“7.2.3.Free-form Query Imports”章节

(3)验证导入结果:列出所有表,统计行数,查看表结构
hive>  show tables;
OK
searchlog
searchlog_tmp
service_all
service_tmp
Time taken: 0.04 seconds, Fetched: 4 row(s)
hive>  select count(*) from service_all;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1407233914535_0013, Tracking URL = http://FBI003:8088/proxy/application_1407233914535_0013/
Kill Command = /home/fulong/Hadoop/hadoop-2.2.0/bin/hadoop job  -kill job_1407233914535_0013
Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1
2014-08-21 16:51:47,389 Stage-1 map = 0%,  reduce = 0%
2014-08-21 16:51:59,816 Stage-1 map = 33%,  reduce = 0%, Cumulative CPU 1.36 sec
2014-08-21 16:52:01,996 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 2.45 sec
2014-08-21 16:52:07,877 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.96 sec
2014-08-21 16:52:17,639 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.29 sec
MapReduce Total cumulative CPU time: 5 seconds 290 msec
Ended Job = job_1407233914535_0013
MapReduce Jobs Launched:
Job 0: Map: 3  Reduce: 1   Cumulative CPU: 5.46 sec   HDFS Read: 687141 HDFS Write: 5 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 460 msec
OK
6803
Time taken: 59.386 seconds, Fetched: 1 row(s)
hive>  select count(*) from service_tmp;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1407233914535_0014, Tracking URL = http://FBI003:8088/proxy/application_1407233914535_0014/
Kill Command = /home/fulong/Hadoop/hadoop-2.2.0/bin/hadoop job  -kill job_1407233914535_0014
Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1
2014-08-21 16:53:03,951 Stage-1 map = 0%,  reduce = 0%
2014-08-21 16:53:15,189 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 2.17 sec
2014-08-21 16:53:16,236 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.38 sec
2014-08-21 16:53:57,935 Stage-1 map = 100%,  reduce = 22%, Cumulative CPU 3.78 sec
2014-08-21 16:54:01,811 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.34 sec
MapReduce Total cumulative CPU time: 5 seconds 340 msec
Ended Job = job_1407233914535_0014
MapReduce Jobs Launched:
Job 0: Map: 3  Reduce: 1   Cumulative CPU: 5.66 sec   HDFS Read: 4720 HDFS Write: 3 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 660 msec
OK
13
Time taken: 75.856 seconds, Fetched: 1 row(s)
hive>  describe service_all;
OK
service_code            string
service_name            string
service_process         string
create_time             string
enable_org              string
enable_platform         string
if_del                  string
Time taken: 0.169 seconds, Fetched: 7 row(s)
hive>  describe service_tmp;
OK
service_code            string
service_name            string
service_process         string
create_time             string
enable_org              string
enable_platform         string
if_del                  string
Time taken: 0.117 seconds, Fetched: 7 row(s)

步骤2:通过Hive将“全量表+增量表”合并为“更新后的全量表”,覆盖当前的全量表

合并新表的逻辑如下:
  • 整个tmp表进入最终表中
  • all表的数据中不包含在tmp表service_code范围内的数据全部进入新表
执行以下sql语句可以合并得到更新后的全量表:
hive> select * from service_tmp  union all  select a.* from service_all a left outer join service_tmp b on a.service_code = b.service_code where b.service_code is null;

我们需要直接将查询结果更新回全量表中:
hive> insert overwrite table service_all select * from service_tmp union all select a.* from service_all a left outer join service_tmp b on a.service_code = b.service_code where b.service_code is null;

注意,将查询结果插入表有以下两类语法:
  • INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement;
  • INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;
INSERT OVERWRITE 将会覆盖现有数据,由于当前场景需要更新全量表,所以使用了覆盖模式;
INSERT INTO 不会覆盖现有数据,是追加数据

到此为止,Hive中的service_all表已经更新为最新的数据!
在真实场景中,需要结合shell+cron实现该过程的定时执行。

这篇关于【甘道夫】Sqoop1.4.4 实现将 Oracle10g 中的增量数据导入 Hive0.13.1 ,并更新Hive中的主表的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1004661

相关文章

HTML5 getUserMedia API网页录音实现指南示例小结

《HTML5getUserMediaAPI网页录音实现指南示例小结》本教程将指导你如何利用这一API,结合WebAudioAPI,实现网页录音功能,从获取音频流到处理和保存录音,整个过程将逐步... 目录1. html5 getUserMedia API简介1.1 API概念与历史1.2 功能与优势1.3

Java实现删除文件中的指定内容

《Java实现删除文件中的指定内容》在日常开发中,经常需要对文本文件进行批量处理,其中,删除文件中指定内容是最常见的需求之一,下面我们就来看看如何使用java实现删除文件中的指定内容吧... 目录1. 项目背景详细介绍2. 项目需求详细介绍2.1 功能需求2.2 非功能需求3. 相关技术详细介绍3.1 Ja

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解

PostgreSQL中MVCC 机制的实现

《PostgreSQL中MVCC机制的实现》本文主要介绍了PostgreSQL中MVCC机制的实现,通过多版本数据存储、快照隔离和事务ID管理实现高并发读写,具有一定的参考价值,感兴趣的可以了解一下... 目录一 MVCC 基本原理python1.1 MVCC 核心概念1.2 与传统锁机制对比二 Postg

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM

OpenCV实现实时颜色检测的示例

《OpenCV实现实时颜色检测的示例》本文主要介绍了OpenCV实现实时颜色检测的示例,通过HSV色彩空间转换和色调范围判断实现红黄绿蓝颜色检测,包含视频捕捉、区域标记、颜色分析等功能,具有一定的参考... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取