大数据技术之Flume应用案例(2)

2024-08-24 15:52

本文主要是介绍大数据技术之Flume应用案例(2),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

 监控端口数据官方案例

步骤 1: 准备环境

步骤 2: 配置 Flume Agent

步骤 3: 启动 Flume Agent

步骤 4: 发送数据到 Flume

步骤 5: 查看 HDFS 中的数据

注意事项

示例说明

实时监控单个追加文件案例

需求分析

实现步骤

(1)确保环境变量配置正确

(2)创建 flume-file-hdfs.conf 文件

(3)运行 Flume

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

(5)在 HDFS 上查看文件

实时监控目录下多个新文件案例

需求分析

实现步骤

(1)创建 flume-dir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 upload 文件夹中添加文件

(4)查看 HDFS 上的数据

实时监控目录下的多个追加文件案例

需求分析

实现步骤

(1)创建 flume-taildir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 files 文件夹中追加内容

(4)查看 HDFS 上的数据

Taildir Source 说明


 监控端口数据官方案例

Flume 可以用来监控网络端口数据,这对于收集来自不同系统的日志或数据非常有用。下面是一个使用 Flume 监控网络端口数据的官方示例,我们将使用 Flume 的 netcat source 来接收数据,并将其写入到 HDFS 中。

步骤 1: 准备环境

确保已经安装并配置好了 Flume 和 Hadoop。这里假设你已经在上一步中完成了 Flume 的安装。

步骤 2: 配置 Flume Agent

创建一个名为 flume-conf.properties 的配置文件,该文件将定义一个 Flume Agent 的配置。

配置文件 flume-conf.properties

# 定义 agent 名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# 配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume
a1.sinks.k1.hdfs.filePrefix = flume-logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 512
a1.sinks.k1.hdfs.rollCount = 20
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置 agent 的 source、channel 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

步骤 3: 启动 Flume Agent

使用以下命令启动 Flume Agent:

$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file ./flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

这里 $FLUME_HOME 是 Flume 的安装目录。

步骤 4: 发送数据到 Flume

你可以使用 netcat 工具或其他类似工具发送数据到 Flume 监听的端口。例如,如果你在另一台机器上或同一台机器的不同终端窗口中,可以使用 netcat 发送数据:

echo "This is a test message" | nc localhost 44444

步骤 5: 查看 HDFS 中的数据

一旦数据被发送到 Flume,Flume 将其写入到 HDFS 中。你可以使用 Hadoop 命令来查看数据:

hadoop fs -ls /flume
hadoop fs -cat /flume/flume-logs-*

注意事项

  • 确保 Hadoop 的 hdfs-site.xml 和 core-site.xml 配置文件已经正确配置。
  • 如果你的 Hadoop 集群使用了安全模式,确保你已经配置了正确的 Kerberos 凭证。
  • 如果你使用的是分布式 Flume,确保所有的 Flume 节点都能够访问 HDFS。

示例说明

  • Netcat Source (a1.sources.r1):配置了 netcat source 来监听 localhost 的 44444 端口。
  • HDFS Sink (a1.sinks.k1):配置了 HDFS sink 将数据写入到 HDFS 的 /flume 目录下。
  • Memory Channel (a1.channels.c1):使用内存 channel 作为 source 和 sink 之间的缓冲区。

实时监控单个追加文件案例

需求分析

  • 实时读取本地文件到HDFS案例
  • Hive日志文件位于 /opt/module/hive/logs/hive.log
  • Flume监控该文件
  • 数据最终存储到HDFS

实现步骤

(1)确保环境变量配置正确

确认 /etc/profile.d/my_env.sh 文件中包含以下内容:

JAVA_HOME=/opt/module/jdk1.8.0_212
HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME
(2)创建 flume-file-hdfs.conf 文件

创建文件 flume-file-hdfs.conf,并添加如下内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c# Configure the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop12:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0# Configure the channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(3)运行 Flume
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
[lzl@hadoop12 hadoop-2.7.2]$ sbin/start-dfs.sh
[lzl@hadoop13 hadoop-2.7.2]$ sbin/start-yarn.sh
[lzl@hadoop12 hive]$ bin/hive
(5)在 HDFS 上查看文件
hadoop fs -ls /flume

实时监控目录下多个新文件案例

需求分析

  • 使用 Flume 监听整个目录的文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/upload

实现步骤

(1)创建 flume-dir-hdfs.conf 文件

创建文件 flume-dir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
(3)向 upload 文件夹中添加文件
[lzl@hadoop12 flume]$ mkdir upload
[lzl@hadoop12 upload]$ touch lzl.txt
[lzl@hadoop12 upload]$ touch lzl.tmp
[lzl@hadoop12 upload]$ touch lzl.log
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload

 

实时监控目录下的多个追加文件案例

需求分析

  • 使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/files

实现步骤

(1)创建 flume-taildir-hdfs.conf 文件

创建文件 flume-taildir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
  • 在 /opt/module/flume 目录下创建 files 文件夹
[lzl@hadoop12 flume]$ mkdir files
  • 向 files 文件夹中添加文件
[lzl@hadoop12 files]$ echo hello >> file1.txt
[lzl@hadoop12 files]$ echo lzl>> file2.txt
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload2
Taildir Source 说明
  • Position File: Taildir Source 维护了一个 JSON 格式的 positionFile,它会定期地往 positionFile 中更新每个文件读取到的最新位置,因此能够实现断点续传。
  • Position File 格式:
    {"inode": 2496272,"pos": 12,"file": "/opt/module/flume/files/file1.txt"
    }
    {"inode": 2496275,"pos": 12,"file": "/opt/module/flume/files/file2.txt"
    }
  • Note: Linux 中存储文件元数据的区域称为 inode,每个 inode 都有一个编号,操作系统用 inode 编号来识别不同的文件。Unix/Linux 系统内部不使用文件名,而是使用 inode 编号来识别文件。

这篇关于大数据技术之Flume应用案例(2)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1102929

相关文章

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

深入浅出SpringBoot WebSocket构建实时应用全面指南

《深入浅出SpringBootWebSocket构建实时应用全面指南》WebSocket是一种在单个TCP连接上进行全双工通信的协议,这篇文章主要为大家详细介绍了SpringBoot如何集成WebS... 目录前言为什么需要 WebSocketWebSocket 是什么Spring Boot 如何简化 We

Java Stream流之GroupBy的用法及应用场景

《JavaStream流之GroupBy的用法及应用场景》本教程将详细介绍如何在Java中使用Stream流的groupby方法,包括基本用法和一些常见的实际应用场景,感兴趣的朋友一起看看吧... 目录Java Stream流之GroupBy的用法1. 前言2. 基础概念什么是 GroupBy?Stream

python中列表应用和扩展性实用详解

《python中列表应用和扩展性实用详解》文章介绍了Python列表的核心特性:有序数据集合,用[]定义,元素类型可不同,支持迭代、循环、切片,可执行增删改查、排序、推导式及嵌套操作,是常用的数据处理... 目录1、列表定义2、格式3、列表是可迭代对象4、列表的常见操作总结1、列表定义是处理一组有序项目的

C#中的Converter的具体应用

《C#中的Converter的具体应用》C#中的Converter提供了一种灵活的类型转换机制,本文详细介绍了Converter的基本概念、使用场景,具有一定的参考价值,感兴趣的可以了解一下... 目录Converter的基本概念1. Converter委托2. 使用场景布尔型转换示例示例1:简单的字符串到

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Spring Boot Actuator应用监控与管理的详细步骤

《SpringBootActuator应用监控与管理的详细步骤》SpringBootActuator是SpringBoot的监控工具,提供健康检查、性能指标、日志管理等核心功能,支持自定义和扩展端... 目录一、 Spring Boot Actuator 概述二、 集成 Spring Boot Actuat

PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例

《PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例》词嵌入解决NLP维度灾难,捕捉语义关系,PyTorch的nn.Embedding模块提供灵活实现,支持参数配置、预训练及变长... 目录一、词嵌入(Word Embedding)简介为什么需要词嵌入?二、PyTorch中的nn.Em

Python实现PDF按页分割的技术指南

《Python实现PDF按页分割的技术指南》PDF文件处理是日常工作中的常见需求,特别是当我们需要将大型PDF文档拆分为多个部分时,下面我们就来看看如何使用Python创建一个灵活的PDF分割工具吧... 目录需求分析技术方案工具选择安装依赖完整代码实现使用说明基本用法示例命令输出示例技术亮点实际应用场景扩