flume采集数据到kafka和hive

2024-06-17 13:58
文章标签 数据 采集 hive kafka flume

本文主要是介绍flume采集数据到kafka和hive,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  1. 构建ftp服务
    在安装flume的机器上添加sftp服务
useradd flumetest
passwd flumetest

#ubuntu-查看所有用户
cat /etc/shadow

apt-get install vsftpd
#查看
service vsftpd status
#创建接受数据目录
mkdir /home/flumetest/alarm

在vsftpd服务配置文件中设置:

# Allowanonymous FTP? (Disabled by default)
anonymous_enable=NO#Uncomment this to enable any form of FTP write command.
write_enable=YES#chroot_local_user=YES
chroot_list_enable=YES
#(default follows)
chroot_list_file=/etc/vsftpd.chroot_list
  1. 配置kafka
    参考网址:http://kafka.apache.org/quickstart
#启动kafka,kafka节点都需要启动
nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null2>&1 &#创建topic
bin/kafka-topics.sh --create --zookeeper 116.62.*.*:2181--replication-factor 2 --partitions 2 --topic alarm#查看topic List
bin/kafka-topics.sh --list --zookeeper 116.62.*.*:2181启动consumer,查看数据
bin/kafka-console-consumer.sh --bootstrap-server 116.62.*.*:9092--topic alarm --from-beginning#删除topic
bin/kafka-topics.sh --zookeeper 116.62.*.*:2181 --delete --topic alarm
#如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion,此时你若想真正删除它,可以如下操作:
#(1)登录zookeeper客户端:命令:./bin/zookeeper-client
#(2)找到topic所在的目录:ls /brokers/topics
#(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。

3 创建hive表

create table if not EXISTS alarm_no_partition(alm_timestring,alm_timeMs int,tag_NameID string,alm_Type int,priID int,alm_Ack_Timestring,alm_Ack_TimeMs int,alm_Group int,alm_Sub_Area int,tag_Data_Typestring,alm_Ack_Flg string,alm_Remove_Flg string,alm_Remove_Timestring,alm_Remove_TimeMs int,alarm_date string) clusteredby (priID) into 2 buckets stored as orc TBLPROPERTIES("transactional"="true");

4 配置flume
参考网址:http://flume.apache.org/FlumeUserGuide.html
配置flume-conf.properties文件:一个source,多个sink、

channel
agent1.sources= alarms1
agent1.channels= alarmc1 alarmc2 alarmc3
agent1.sinks= alarmk1 alarmk2 alarmk3#SpoolingDirectory
#setalarms1
agent1.sources.alarms1.type=spooldir
agent1.sources.alarms1.spoolDir=/home/flumetest/alarm/
agent1.sources.alarms1.channels=alarmc1alarmc2 alarmc3
agent1.sources.alarms1.fileHeader= false#setalarmk1
agent1.sinks.alarmk1.channel=alarmc1
agent1.sinks.alarmk1.type= org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.alarmk1.topic= alarm
agent1.sinks.alarmk1.kafka.bootstrap.servers= 116.62.*.*:9092;116.62.*.*:9092;116.62.*.*:9092
agent1.sinks.alarmk1.kafka.flumeBatchSize= 20
agent1.sinks.alarmk1.kafka.producer.acks= 1
agent1.sinks.alarmk1.kafka.producer.linger.ms= 1
agent1.sinks.alarmk1.kafka.producer.compression.type= snappy
#setalarmk2
agent1.sinks.alarmk2.channel=alarmc2
agent1.sinks.alarmk2.type=hive
agent1.sinks.alarmk2.hive.metastore= thrift://127.0.0.1:9083
agent1.sinks.alarmk2.hive.database= alarm
agent1.sinks.alarmk2.hive.table= alarm_no_partition
#agent1.sinks.alarmk2.hive.partition=%{alarm_date}
agent1.sinks.alarmk2.useLocalTimeStamp= false
#agent1.sinks.alarmk2.roundValue= 10
#agent1.sinks.alarmk2.roundUnit= minute
agent1.sinks.alarmk2.serializer= DELIMITED
agent1.sinks.alarmk2.serializer.delimiter=,
agent1.sinks.alarmk2.serializer.serdeSeparator='\t'
agent1.sinks.alarmk2.serializer.fieldnames=alm_time,alm_timems,tag_nameid,alm_type,priid,alm_ack_time,alm_ack_timems,alm_group,alm_sub_area,tag_data_type,alm_ack_flg,alm_remove_flg,alm_remove_time,alm_remove_timems,alarm_date
#setalarmk3
#setalarmk3
#agent1.sinks.alarmk3.channel=alarmc3
#agent1.sinks.alarmk3.type=hbase
#agent1.sinks.alarmk3.table=alarm_test
#agent1.sinks.alarmk3.columnFamily=
#agent1.sinks.alarmk3.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer#setalarmc1
agent1.channels.alarmc1.type= memory
agent1.channels.alarmc1.capacity= 1000 
agent1.channels.alarmc1.transactionCapacity= 100
#setalarmc2
agent1.channels.alarmc2.type= memory
agent1.channels.alarmc2.capacity= 1000 
agent1.channels.alarmc2.transactionCapacity= 100
#setalarmc3
agent1.channels.alarmc3.type= memory
agent1.channels.alarmc3.capacity= 1000 
agent1.channels.alarmc3.transactionCapacity= 100

启动flume

nohup bin/flume-ng agent -cconf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,LOGFILE-Dflume.log.dir=logs >> /dev/null 2>&1

5 加载数据
向目录中添加文件加载完成文件后缀添加.COMPLETED

这篇关于flume采集数据到kafka和hive的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1069638

相关文章

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

使用SpringBoot+InfluxDB实现高效数据存储与查询

《使用SpringBoot+InfluxDB实现高效数据存储与查询》InfluxDB是一个开源的时间序列数据库,特别适合处理带有时间戳的监控数据、指标数据等,下面详细介绍如何在SpringBoot项目... 目录1、项目介绍2、 InfluxDB 介绍3、Spring Boot 配置 InfluxDB4、I