二百零三、Flume——Flume实时采集数据频率为1s的高频率Kafka数据直接写入ODS层表的HDFS文件路径下

本文主要是介绍二百零三、Flume——Flume实时采集数据频率为1s的高频率Kafka数据直接写入ODS层表的HDFS文件路径下,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、目的

在离线数仓中,需要用Flume去采集Kafka中的数据,然后写入HDFS中。

由于每种数据类型的频率、数据大小、数据规模不同,因此每种数据的采集需要不同的Flume配置文件。玩了几天Flume,感觉Flume的使用难点就是配置文件

二、使用场景

静态排队数据是数据频率为1s的数据类型代表,数据量很大、频率很高,因此搞定了静态排队数据的采集就搞定了这一类高频率数据的实时采集问题

1台雷达每日的静态排队数据规模是25MB,10台雷达的数据规模则是250MB

三、静态排队数据的配置文件


## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 10240000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

四、Flume写入HDFS结果

Flume根据时间戳按照ODS层表的分区,将数据写入对应HDFS文件

五、ODS表刷新分区后查验数据

(一)刷新表分区

MSCK REPAIR TABLE ods_queue;

(二)查看表数据

select * from ods_queue;

六、注意点

(一)配置文件中的重点是红色标记的几点

a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 10240000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

(二)这几个重点参数的含义

序号Flume参数参数含义
1round是否启用时间上的”舍弃”,如果启用,则会影响除了%t的其他所有时间表达式                                       默认值:false
2roundValue多少时间单位创建一个新的文件夹
3roundUnit重新定义时间单位
4rollSize当临时文件达到该大小(单位:bytes)时,滚动成目标文件;默认值:1024byte            如果设置成0,则表示不根据临时文件大小来滚动文件
5rollCount当events数据达到该数量时候,将临时文件滚动成目标文件;默认值:10               如果设置成0,则表示不根据events数据来滚动文件
6rollInterval多久将临时文件滚动成最终目标文件,单位:秒;默认值:30s                    如果设置成0,则表示不根据时间来滚动文件;
7idleTimeout当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;            默认值:0
8minBlockReplicas写入HDFS文件块的最小副本数,一般配置成1才能正确滚动文件

更多Flume配置文件参数含义请看鄙人另一篇博客

一百九十一、Flume——Flume配置文件各参数含义(持续完善中)

http://t.csdnimg.cn/o5XbGicon-default.png?t=N7T8http://t.csdnimg.cn/o5XbG

就先这样吧,如果有问题的话后面再更新!!!

这篇关于二百零三、Flume——Flume实时采集数据频率为1s的高频率Kafka数据直接写入ODS层表的HDFS文件路径下的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/396439

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

深入浅出SpringBoot WebSocket构建实时应用全面指南

《深入浅出SpringBootWebSocket构建实时应用全面指南》WebSocket是一种在单个TCP连接上进行全双工通信的协议,这篇文章主要为大家详细介绍了SpringBoot如何集成WebS... 目录前言为什么需要 WebSocketWebSocket 是什么Spring Boot 如何简化 We

python设置环境变量路径实现过程

《python设置环境变量路径实现过程》本文介绍设置Python路径的多种方法:临时设置(Windows用`set`,Linux/macOS用`export`)、永久设置(系统属性或shell配置文件... 目录设置python路径的方法临时设置环境变量(适用于当前会话)永久设置环境变量(Windows系统

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

Spring Boot中的路径变量示例详解

《SpringBoot中的路径变量示例详解》SpringBoot中PathVariable通过@PathVariable注解实现URL参数与方法参数绑定,支持多参数接收、类型转换、可选参数、默认值及... 目录一. 基本用法与参数映射1.路径定义2.参数绑定&nhttp://www.chinasem.cnbs

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口