Flume之使用Failover Sink Processor实现sink故障转移

2024-04-15 14:58

本文主要是介绍Flume之使用Failover Sink Processor实现sink故障转移,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


前言

  • Failover Sink Processor 维护着Sink组中Sinks的优先级表,根据优先级尝试将Event传输给不同的Sink直到Event成功发送。当优先级高的Sink不可用时,会将Event传输给下一优先级Sink,以此来确保每个Event都能被投递。当Sink不可用时,Failover Sink Processor和Load balancing Sink Processor一样,也会进行指数回退backoff,并可以设置最大回退时间(即在黑名单中的保存时间),在倒计时结束后会再次尝试访问之前挂掉的Sink

使用示例

1)flume1.properties

# flume1:此配置用于监控某个窗口将其追加内容输出到flume2和flume3中
# 并将两个Sink组成一个sink group,并将Sink Processor设置成Failover类型
# a1:Netcat Source->Memory Channel->Avro Sink# Agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2# Sink groups
a1.sinkgroups = g1
# 设置sink group中的sinks
a1.sinkgroups.g1.sinks = k1 k2
# 设置Failover sink processor(只有sink group才可以使用sink processor)
a1.sinkgroups.g1.processor.type = failover
# 设置Failover sink processor优先级表
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
# 设置最大避让时间(ms)
a1.sinkgroups.g1.processor.maxpenalty = 10000# Sources
# 配置a1.sources.r1的各项属性参数,类型/绑定主机ip/端口号
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 44444# Channels
# 配置a1.channerls.c1的各项属性参数,缓存方式/最多缓存的Event个数/单次传输的Event个数
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Sinks
# sinks.k1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
# sinks.k2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4141# Bind
# 注意:source可以绑定多个channel,但是sink/sink group只能绑定单个channel
# r1->c1->g1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

2)flume2.properties

# flume2:此配置用于将来自指定Avro端口的数据输出到控制台
# a2:Avro Source->Memory Channel->Logger Sink# Agent
a2.sources = r1
a2.channels = c1
a2.sinks = k1# Sources
# a2.sources.r1
a2.sources.r1.type = avro
# 设置监听本地IP
a2.sources.r1.bind = 0.0.0.0
# 设置监听端口号
a2.sources.r1.port = 4141# Channels
# a2.channels.c1
# 使用内存作为缓存/最多缓存的Event个数/单次传输的Event个数
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Sinks
# 运行时设置参数 -Dflume.root.logger=INFO,console 即输出到控制台实时显示
a2.sinks.k1.type = logger
# 设置Event的Body中写入log的最大字节数(默认值为16)
a2.sinks.k1.maxBytesToLog = 256# Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

3)flume3.properties

# flume3:此配置用于将来自指定Avro端口的数据输出到控制台
# a3:Avro Source->Memory Channel->Logger Sink# Agent
a3.sources = r1
a3.channels = c1
a3.sinks = k1# Sources
# a3.sources.r1
a3.sources.r1.type = avro
# 设置监听本地IP
a3.sources.r1.bind = 0.0.0.0
# 设置监听端口号
a3.sources.r1.port = 4141# Channels
# a3.channels.c1
# 使用内存作为缓存/最多缓存的Event个数/单次传输的Event个数
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Sinks
# 运行时设置参数 -Dflume.root.logger=INFO,console 即输出到控制台实时显示
a3.sinks.k1.type = logger
# 设置Event的Body中写入log的最大字节数(默认值为16)
a3.sinks.k1.maxBytesToLog = 256# Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

4)启动命令

Flume Agent a1至a3分别运行在主机hadoop101、hadoop102、hadoop103上

./bin/flume-ng agent -n a1 -c conf -f flume1.properties
./bin/flume-ng agent -n a2 -c conf -f flume2.properties -Dflume.root.logger=INFO,console
./bin/flume-ng agent -n a3 -c conf -f flume3.properties -Dflume.root.logger=INFO,console

5)实现功能

  • Aent a1将指定端口的监听数据输出到a2或者a3的控制台

  • 当Event从Channel中传输给Sink Group之前,首先会根据配置Failover sink processor优先级表尝试将此Event发送给优先级最高的可用Sink,如果成功则继续处理下一个Event。如果在发送过程中,当前Sink宕机,则将其加入黑名单,一定时间内不再尝试将Event发往此Sink,并且退避时间呈指数增长,直到最大退避时间maxpenalty,以此来实现Sink的故障转移


End~

这篇关于Flume之使用Failover Sink Processor实现sink故障转移的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/906135

相关文章

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri

Python常用命令提示符使用方法详解

《Python常用命令提示符使用方法详解》在学习python的过程中,我们需要用到命令提示符(CMD)进行环境的配置,:本文主要介绍Python常用命令提示符使用方法的相关资料,文中通过代码介绍的... 目录一、python环境基础命令【Windows】1、检查Python是否安装2、 查看Python的安

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM

OpenCV实现实时颜色检测的示例

《OpenCV实现实时颜色检测的示例》本文主要介绍了OpenCV实现实时颜色检测的示例,通过HSV色彩空间转换和色调范围判断实现红黄绿蓝颜色检测,包含视频捕捉、区域标记、颜色分析等功能,具有一定的参考... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间

Python并行处理实战之如何使用ProcessPoolExecutor加速计算

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu... 目录简介完整代码示例代码解释1. 导入必要的模块2. 定义处理函数3. 主函数4. 生成数字列表5.

Python中help()和dir()函数的使用

《Python中help()和dir()函数的使用》我们经常需要查看某个对象(如模块、类、函数等)的属性和方法,Python提供了两个内置函数help()和dir(),它们可以帮助我们快速了解代... 目录1. 引言2. help() 函数2.1 作用2.2 使用方法2.3 示例(1) 查看内置函数的帮助(

Linux脚本(shell)的使用方式

《Linux脚本(shell)的使用方式》:本文主要介绍Linux脚本(shell)的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述语法详解数学运算表达式Shell变量变量分类环境变量Shell内部变量自定义变量:定义、赋值自定义变量:引用、修改、删

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取

基于Python实现一个Windows Tree命令工具

《基于Python实现一个WindowsTree命令工具》今天想要在Windows平台的CMD命令终端窗口中使用像Linux下的tree命令,打印一下目录结构层级树,然而还真有tree命令,但是发现... 目录引言实现代码使用说明可用选项示例用法功能特点添加到环境变量方法一:创建批处理文件并添加到PATH1

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.