Flume之Multiplexing Channel Selector使用示例

2024-04-15 14:58

本文主要是介绍Flume之Multiplexing Channel Selector使用示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


前言

  • Multiplexing Channe Selector 的作用就是根据 EventHeader 中的某个或几个字段的值将其映射到指定的 Channel ,便于之后 Channel Processor 将Event发送至对应的Channel中去。在Flume中,Multiplexing Channel Selector一般都与 Interceptor 拦截器搭配使用,因为新鲜的Event数据中Header为空,需要Interceptor去填充所需字段

具体配置

1)flume1.properties

# flume1:此配置用于监控单个或多个指定文件将其追加内容生成的Event先通过自定义的TypeInterceptor
# 根据Body中的内容向其Header中添加type字段,然后使用Multiplexing Channel Selector将不同
# type的Event传输到不同的Channel中,最后分别输出到flume2和flume3的控制台
# a1:TailDir Source-> TypeInterceptor -> Multiplexing Channel Selector ->
#   Memory Channel -> Avro Sink# Agent
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2# Sources
# a1.sources.r1
a1.sources.r1.type = TAILDIR
# 设置Json文件存储路径(最好使用绝对路径)
# 用于记录文件inode/文件的绝对路径/每个文件的最后读取位置等信息
a1.sources.r1.positionFile = /opt/module/flume-1.8.0/.position/taildir_position.json
# 指定监控的文件组
a1.sources.r1.filegroups = f1
# 配置文件组中的被监控文件
# 设置f2组的监控文件,注意:使用的是正则表达式,而不是Linux通配符
a1.sources.r1.filegroups.f1 = /tmp/logs/^.*log$# Interceptor
# a1.sources.r1.interceptors
# 配置Interceptor链,Interceptor调用顺序与配置循序相同
a1.sources.r1.interceptors = typeInterceptor
# 指定使用的自定义Interceptor全类名,并使用其中的静态内部类Builder
# 要想使用自定义Interceptor,必须将实现的类打包成jar包放入$FLUME_HOME/lib文件夹中
# flume运行Java程序时会将此路径加入到ClassPath中
a1.sources.r1.interceptors.typeInterceptor.type = com.tomandersen.interceptors.TypeInterceptor$Builder# Channels
# a1.channels.c1
# 使用内存作为缓存/最多缓存的Event个数/单次传输的Event个数
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# a1.channels.c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Channel Selector
# a1.sources.r1.selector
# 使用Multiple Channel Selector
a1.sources.r1.selector.type = multiplexing
# 设置匹配Header的字段
a1.sources.r1.selector.header = type
# 设置不同字段的值映射至各个Channel,其余的Event默认丢弃
a1.sources.r1.selector.mapping.Startup = c1
a1.sources.r1.selector.mapping.Event = c2# Sinks
# a1.sinks.k1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
# a1.sinks.k2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4141# Bind
# r1->TypeInterceptor->Multiplexing Channel Selector->c1->k1
# r1->TypeInterceptor->Multiplexing Channel Selector->c2->k2
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

2)flume2.properties

# flume2:此配置用于将来自指定Avro端口的数据输出到控制台中
# a2:Avro Source->Memory Channel->Logger Sink# Agent
a2.sources = r1
a2.channels = c1
a2.sinks = k1# Sources
a2.sources.r1.type = avro
a2.sources.r1.bind = 0.0.0.0
a2.sources.r1.port = 4141# Channels
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Sinks
# 运行时设置参数 -Dflume.root.logger=INFO,console 即输出到控制台实时显示
a2.sinks.k1.type = logger
# 设置Event的Body中写入log的最大字节数(默认值为16)
a2.sinks.k1.maxBytesToLog = 256# Bind
r1->c1->k1
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

3)flume3.properties

# flume3:此配置用于将来自指定Avro端口的数据输出到控制台中
# a3:Avro Source->Memory Channel->Logger Sink# Agent
a3.sources = r1
a3.channels = c1
a3.sinks = k1# Sources
a3.sources.r1.type = avro
a3.sources.r1.bind = 0.0.0.0
a3.sources.r1.port = 4141# Channels
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Sinks
# 运行时设置参数 -Dflume.root.logger=INFO,console 即输出到控制台实时显示
a3.sinks.k1.type = logger
# 设置Event的Body中写入log的最大字节数(默认值为16)
a3.sinks.k1.maxBytesToLog = 256# Bind
r1->c1->k1
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

4)启动命令

Flume Agent a1至a3分别运行在主机hadoop101、hadoop102、hadoop103上

./bin/flume-ng agent -n a1 -c conf -f flume1.properties
./bin/flume-ng agent -n a2 -c conf -f flume2.properties -Dflume.root.logger=INFO,console
./bin/flume-ng agent -n a3 -c conf -f flume3.properties -Dflume.root.logger=INFO,console

5)实现功能

Agent a1监听本地指定文件,将监听到的数据组装成Event通过自定义的 TypeInterceptor 来根据其Body中的内容向Header中添加不同的type字段键值,然后通过 Multiplexing Channel Selector将不同type的Event发送给不同的Channel,并最终分别在a2和a3的控制台上输出


End~

这篇关于Flume之Multiplexing Channel Selector使用示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/906134

相关文章

Python并行处理实战之如何使用ProcessPoolExecutor加速计算

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu... 目录简介完整代码示例代码解释1. 导入必要的模块2. 定义处理函数3. 主函数4. 生成数字列表5.

Python中help()和dir()函数的使用

《Python中help()和dir()函数的使用》我们经常需要查看某个对象(如模块、类、函数等)的属性和方法,Python提供了两个内置函数help()和dir(),它们可以帮助我们快速了解代... 目录1. 引言2. help() 函数2.1 作用2.2 使用方法2.3 示例(1) 查看内置函数的帮助(

Linux脚本(shell)的使用方式

《Linux脚本(shell)的使用方式》:本文主要介绍Linux脚本(shell)的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述语法详解数学运算表达式Shell变量变量分类环境变量Shell内部变量自定义变量:定义、赋值自定义变量:引用、修改、删

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

LiteFlow轻量级工作流引擎使用示例详解

《LiteFlow轻量级工作流引擎使用示例详解》:本文主要介绍LiteFlow是一个灵活、简洁且轻量的工作流引擎,适合用于中小型项目和微服务架构中的流程编排,本文给大家介绍LiteFlow轻量级工... 目录1. LiteFlow 主要特点2. 工作流定义方式3. LiteFlow 流程示例4. LiteF

使用Python开发一个现代化屏幕取色器

《使用Python开发一个现代化屏幕取色器》在UI设计、网页开发等场景中,颜色拾取是高频需求,:本文主要介绍如何使用Python开发一个现代化屏幕取色器,有需要的小伙伴可以参考一下... 目录一、项目概述二、核心功能解析2.1 实时颜色追踪2.2 智能颜色显示三、效果展示四、实现步骤详解4.1 环境配置4.

使用jenv工具管理多个JDK版本的方法步骤

《使用jenv工具管理多个JDK版本的方法步骤》jenv是一个开源的Java环境管理工具,旨在帮助开发者在同一台机器上轻松管理和切换多个Java版本,:本文主要介绍使用jenv工具管理多个JD... 目录一、jenv到底是干啥的?二、jenv的核心功能(一)管理多个Java版本(二)支持插件扩展(三)环境隔

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用