flume采集数据到kafka和hive

2024-06-17 13:58
文章标签 数据 采集 hive kafka flume

本文主要是介绍flume采集数据到kafka和hive,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  1. 构建ftp服务
    在安装flume的机器上添加sftp服务
useradd flumetest
passwd flumetest

#ubuntu-查看所有用户
cat /etc/shadow

apt-get install vsftpd
#查看
service vsftpd status
#创建接受数据目录
mkdir /home/flumetest/alarm

在vsftpd服务配置文件中设置:

# Allowanonymous FTP? (Disabled by default)
anonymous_enable=NO#Uncomment this to enable any form of FTP write command.
write_enable=YES#chroot_local_user=YES
chroot_list_enable=YES
#(default follows)
chroot_list_file=/etc/vsftpd.chroot_list
  1. 配置kafka
    参考网址:http://kafka.apache.org/quickstart
#启动kafka,kafka节点都需要启动
nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null2>&1 &#创建topic
bin/kafka-topics.sh --create --zookeeper 116.62.*.*:2181--replication-factor 2 --partitions 2 --topic alarm#查看topic List
bin/kafka-topics.sh --list --zookeeper 116.62.*.*:2181启动consumer,查看数据
bin/kafka-console-consumer.sh --bootstrap-server 116.62.*.*:9092--topic alarm --from-beginning#删除topic
bin/kafka-topics.sh --zookeeper 116.62.*.*:2181 --delete --topic alarm
#如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion,此时你若想真正删除它,可以如下操作:
#(1)登录zookeeper客户端:命令:./bin/zookeeper-client
#(2)找到topic所在的目录:ls /brokers/topics
#(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。

3 创建hive表

create table if not EXISTS alarm_no_partition(alm_timestring,alm_timeMs int,tag_NameID string,alm_Type int,priID int,alm_Ack_Timestring,alm_Ack_TimeMs int,alm_Group int,alm_Sub_Area int,tag_Data_Typestring,alm_Ack_Flg string,alm_Remove_Flg string,alm_Remove_Timestring,alm_Remove_TimeMs int,alarm_date string) clusteredby (priID) into 2 buckets stored as orc TBLPROPERTIES("transactional"="true");

4 配置flume
参考网址:http://flume.apache.org/FlumeUserGuide.html
配置flume-conf.properties文件:一个source,多个sink、

channel
agent1.sources= alarms1
agent1.channels= alarmc1 alarmc2 alarmc3
agent1.sinks= alarmk1 alarmk2 alarmk3#SpoolingDirectory
#setalarms1
agent1.sources.alarms1.type=spooldir
agent1.sources.alarms1.spoolDir=/home/flumetest/alarm/
agent1.sources.alarms1.channels=alarmc1alarmc2 alarmc3
agent1.sources.alarms1.fileHeader= false#setalarmk1
agent1.sinks.alarmk1.channel=alarmc1
agent1.sinks.alarmk1.type= org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.alarmk1.topic= alarm
agent1.sinks.alarmk1.kafka.bootstrap.servers= 116.62.*.*:9092;116.62.*.*:9092;116.62.*.*:9092
agent1.sinks.alarmk1.kafka.flumeBatchSize= 20
agent1.sinks.alarmk1.kafka.producer.acks= 1
agent1.sinks.alarmk1.kafka.producer.linger.ms= 1
agent1.sinks.alarmk1.kafka.producer.compression.type= snappy
#setalarmk2
agent1.sinks.alarmk2.channel=alarmc2
agent1.sinks.alarmk2.type=hive
agent1.sinks.alarmk2.hive.metastore= thrift://127.0.0.1:9083
agent1.sinks.alarmk2.hive.database= alarm
agent1.sinks.alarmk2.hive.table= alarm_no_partition
#agent1.sinks.alarmk2.hive.partition=%{alarm_date}
agent1.sinks.alarmk2.useLocalTimeStamp= false
#agent1.sinks.alarmk2.roundValue= 10
#agent1.sinks.alarmk2.roundUnit= minute
agent1.sinks.alarmk2.serializer= DELIMITED
agent1.sinks.alarmk2.serializer.delimiter=,
agent1.sinks.alarmk2.serializer.serdeSeparator='\t'
agent1.sinks.alarmk2.serializer.fieldnames=alm_time,alm_timems,tag_nameid,alm_type,priid,alm_ack_time,alm_ack_timems,alm_group,alm_sub_area,tag_data_type,alm_ack_flg,alm_remove_flg,alm_remove_time,alm_remove_timems,alarm_date
#setalarmk3
#setalarmk3
#agent1.sinks.alarmk3.channel=alarmc3
#agent1.sinks.alarmk3.type=hbase
#agent1.sinks.alarmk3.table=alarm_test
#agent1.sinks.alarmk3.columnFamily=
#agent1.sinks.alarmk3.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer#setalarmc1
agent1.channels.alarmc1.type= memory
agent1.channels.alarmc1.capacity= 1000 
agent1.channels.alarmc1.transactionCapacity= 100
#setalarmc2
agent1.channels.alarmc2.type= memory
agent1.channels.alarmc2.capacity= 1000 
agent1.channels.alarmc2.transactionCapacity= 100
#setalarmc3
agent1.channels.alarmc3.type= memory
agent1.channels.alarmc3.capacity= 1000 
agent1.channels.alarmc3.transactionCapacity= 100

启动flume

nohup bin/flume-ng agent -cconf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,LOGFILE-Dflume.log.dir=logs >> /dev/null 2>&1

5 加载数据
向目录中添加文件加载完成文件后缀添加.COMPLETED

这篇关于flume采集数据到kafka和hive的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1069638

相关文章

MySQL快速复制一张表的四种核心方法(包括表结构和数据)

《MySQL快速复制一张表的四种核心方法(包括表结构和数据)》本文详细介绍了四种复制MySQL表(结构+数据)的方法,并对每种方法进行了对比分析,适用于不同场景和数据量的复制需求,特别是针对超大表(1... 目录一、mysql 复制表(结构+数据)的 4 种核心方法(面试结构化回答)方法 1:CREATE

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

SpringBoot整合Kafka启动失败的常见错误问题总结(推荐)

《SpringBoot整合Kafka启动失败的常见错误问题总结(推荐)》本文总结了SpringBoot项目整合Kafka启动失败的常见错误,包括Kafka服务器连接问题、序列化配置错误、依赖配置问题、... 目录一、Kafka服务器连接问题1. Kafka服务器无法连接2. 开发环境与生产环境网络不通二、序

MySQL中的DELETE删除数据及注意事项

《MySQL中的DELETE删除数据及注意事项》MySQL的DELETE语句是数据库操作中不可或缺的一部分,通过合理使用索引、批量删除、避免全表删除、使用TRUNCATE、使用ORDERBY和LIMI... 目录1. 基本语法单表删除2. 高级用法使用子查询删除删除多表3. 性能优化策略使用索引批量删除避免

MySQL 数据库进阶之SQL 数据操作与子查询操作大全

《MySQL数据库进阶之SQL数据操作与子查询操作大全》本文详细介绍了SQL中的子查询、数据添加(INSERT)、数据修改(UPDATE)和数据删除(DELETE、TRUNCATE、DROP)操作... 目录一、子查询:嵌套在查询中的查询1.1 子查询的基本语法1.2 子查询的实战示例二、数据添加:INSE

Linux服务器数据盘移除并重新挂载的全过程

《Linux服务器数据盘移除并重新挂载的全过程》:本文主要介绍在Linux服务器上移除并重新挂载数据盘的整个过程,分为三大步:卸载文件系统、分离磁盘和重新挂载,每一步都有详细的步骤和注意事项,确保... 目录引言第一步:卸载文件系统第二步:分离磁盘第三步:重新挂载引言在 linux 服务器上移除并重新挂p

使用MyBatis TypeHandler实现数据加密与解密的具体方案

《使用MyBatisTypeHandler实现数据加密与解密的具体方案》在我们日常的开发工作中,经常会遇到一些敏感数据需要存储,比如用户的手机号、身份证号、银行卡号等,为了保障数据安全,我们通常会对... 目录1. 核心概念:什么是 TypeHandler?2. 实战场景3. 代码实现步骤步骤 1:定义 E

使用C#导出Excel数据并保存多种格式的完整示例

《使用C#导出Excel数据并保存多种格式的完整示例》在现代企业信息化管理中,Excel已经成为最常用的数据存储和分析工具,从员工信息表、销售数据报表到财务分析表,几乎所有部门都离不开Excel,本文... 目录引言1. 安装 Spire.XLS2. 创建工作簿和填充数据3. 保存为不同格式4. 效果展示5

Python多任务爬虫实现爬取图片和GDP数据

《Python多任务爬虫实现爬取图片和GDP数据》本文主要介绍了基于FastAPI开发Web站点的方法,包括搭建Web服务器、处理图片资源、实现多任务爬虫和数据可视化,同时,还简要介绍了Python爬... 目录一. 基于FastAPI之Web站点开发1. 基于FastAPI搭建Web服务器2. Web服务

MySQL 批量插入的原理和实战方法(快速提升大数据导入效率)

《MySQL批量插入的原理和实战方法(快速提升大数据导入效率)》在日常开发中,我们经常需要将大量数据批量插入到MySQL数据库中,本文将介绍批量插入的原理、实现方法,并结合Python和PyMySQ... 目录一、批量插入的优势二、mysql 表的创建示例三、python 实现批量插入1. 安装 PyMyS