大数据技术之Flume应用案例(2)

2024-08-24 15:52

本文主要是介绍大数据技术之Flume应用案例(2),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

 监控端口数据官方案例

步骤 1: 准备环境

步骤 2: 配置 Flume Agent

步骤 3: 启动 Flume Agent

步骤 4: 发送数据到 Flume

步骤 5: 查看 HDFS 中的数据

注意事项

示例说明

实时监控单个追加文件案例

需求分析

实现步骤

(1)确保环境变量配置正确

(2)创建 flume-file-hdfs.conf 文件

(3)运行 Flume

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

(5)在 HDFS 上查看文件

实时监控目录下多个新文件案例

需求分析

实现步骤

(1)创建 flume-dir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 upload 文件夹中添加文件

(4)查看 HDFS 上的数据

实时监控目录下的多个追加文件案例

需求分析

实现步骤

(1)创建 flume-taildir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 files 文件夹中追加内容

(4)查看 HDFS 上的数据

Taildir Source 说明


 监控端口数据官方案例

Flume 可以用来监控网络端口数据,这对于收集来自不同系统的日志或数据非常有用。下面是一个使用 Flume 监控网络端口数据的官方示例,我们将使用 Flume 的 netcat source 来接收数据,并将其写入到 HDFS 中。

步骤 1: 准备环境

确保已经安装并配置好了 Flume 和 Hadoop。这里假设你已经在上一步中完成了 Flume 的安装。

步骤 2: 配置 Flume Agent

创建一个名为 flume-conf.properties 的配置文件,该文件将定义一个 Flume Agent 的配置。

配置文件 flume-conf.properties

# 定义 agent 名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# 配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume
a1.sinks.k1.hdfs.filePrefix = flume-logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 512
a1.sinks.k1.hdfs.rollCount = 20
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置 agent 的 source、channel 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

步骤 3: 启动 Flume Agent

使用以下命令启动 Flume Agent:

$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file ./flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

这里 $FLUME_HOME 是 Flume 的安装目录。

步骤 4: 发送数据到 Flume

你可以使用 netcat 工具或其他类似工具发送数据到 Flume 监听的端口。例如,如果你在另一台机器上或同一台机器的不同终端窗口中,可以使用 netcat 发送数据:

echo "This is a test message" | nc localhost 44444

步骤 5: 查看 HDFS 中的数据

一旦数据被发送到 Flume,Flume 将其写入到 HDFS 中。你可以使用 Hadoop 命令来查看数据:

hadoop fs -ls /flume
hadoop fs -cat /flume/flume-logs-*

注意事项

  • 确保 Hadoop 的 hdfs-site.xml 和 core-site.xml 配置文件已经正确配置。
  • 如果你的 Hadoop 集群使用了安全模式,确保你已经配置了正确的 Kerberos 凭证。
  • 如果你使用的是分布式 Flume,确保所有的 Flume 节点都能够访问 HDFS。

示例说明

  • Netcat Source (a1.sources.r1):配置了 netcat source 来监听 localhost 的 44444 端口。
  • HDFS Sink (a1.sinks.k1):配置了 HDFS sink 将数据写入到 HDFS 的 /flume 目录下。
  • Memory Channel (a1.channels.c1):使用内存 channel 作为 source 和 sink 之间的缓冲区。

实时监控单个追加文件案例

需求分析

  • 实时读取本地文件到HDFS案例
  • Hive日志文件位于 /opt/module/hive/logs/hive.log
  • Flume监控该文件
  • 数据最终存储到HDFS

实现步骤

(1)确保环境变量配置正确

确认 /etc/profile.d/my_env.sh 文件中包含以下内容:

JAVA_HOME=/opt/module/jdk1.8.0_212
HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME
(2)创建 flume-file-hdfs.conf 文件

创建文件 flume-file-hdfs.conf,并添加如下内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c# Configure the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop12:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0# Configure the channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(3)运行 Flume
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
[lzl@hadoop12 hadoop-2.7.2]$ sbin/start-dfs.sh
[lzl@hadoop13 hadoop-2.7.2]$ sbin/start-yarn.sh
[lzl@hadoop12 hive]$ bin/hive
(5)在 HDFS 上查看文件
hadoop fs -ls /flume

实时监控目录下多个新文件案例

需求分析

  • 使用 Flume 监听整个目录的文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/upload

实现步骤

(1)创建 flume-dir-hdfs.conf 文件

创建文件 flume-dir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
(3)向 upload 文件夹中添加文件
[lzl@hadoop12 flume]$ mkdir upload
[lzl@hadoop12 upload]$ touch lzl.txt
[lzl@hadoop12 upload]$ touch lzl.tmp
[lzl@hadoop12 upload]$ touch lzl.log
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload

 

实时监控目录下的多个追加文件案例

需求分析

  • 使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/files

实现步骤

(1)创建 flume-taildir-hdfs.conf 文件

创建文件 flume-taildir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
  • 在 /opt/module/flume 目录下创建 files 文件夹
[lzl@hadoop12 flume]$ mkdir files
  • 向 files 文件夹中添加文件
[lzl@hadoop12 files]$ echo hello >> file1.txt
[lzl@hadoop12 files]$ echo lzl>> file2.txt
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload2
Taildir Source 说明
  • Position File: Taildir Source 维护了一个 JSON 格式的 positionFile,它会定期地往 positionFile 中更新每个文件读取到的最新位置,因此能够实现断点续传。
  • Position File 格式:
    {"inode": 2496272,"pos": 12,"file": "/opt/module/flume/files/file1.txt"
    }
    {"inode": 2496275,"pos": 12,"file": "/opt/module/flume/files/file2.txt"
    }
  • Note: Linux 中存储文件元数据的区域称为 inode,每个 inode 都有一个编号,操作系统用 inode 编号来识别不同的文件。Unix/Linux 系统内部不使用文件名,而是使用 inode 编号来识别文件。

这篇关于大数据技术之Flume应用案例(2)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1102929

相关文章

一文教你Python如何快速精准抓取网页数据

《一文教你Python如何快速精准抓取网页数据》这篇文章主要为大家详细介绍了如何利用Python实现快速精准抓取网页数据,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录1. 准备工作2. 基础爬虫实现3. 高级功能扩展3.1 抓取文章详情3.2 保存数据到文件4. 完整示例

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

Java Stream流使用案例深入详解

《JavaStream流使用案例深入详解》:本文主要介绍JavaStream流使用案例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录前言1. Lambda1.1 语法1.2 没参数只有一条语句或者多条语句1.3 一个参数只有一条语句或者多

MySQL 中的 JSON 查询案例详解

《MySQL中的JSON查询案例详解》:本文主要介绍MySQL的JSON查询的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql 的 jsON 路径格式基本结构路径组件详解特殊语法元素实际示例简单路径复杂路径简写操作符注意MySQL 的 J

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

C语言中位操作的实际应用举例

《C语言中位操作的实际应用举例》:本文主要介绍C语言中位操作的实际应用,总结了位操作的使用场景,并指出了需要注意的问题,如可读性、平台依赖性和溢出风险,文中通过代码介绍的非常详细,需要的朋友可以参... 目录1. 嵌入式系统与硬件寄存器操作2. 网络协议解析3. 图像处理与颜色编码4. 高效处理布尔标志集合

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读