大数据技术之Flume 企业开发案例——聚合(7)

2024-08-28 21:52

本文主要是介绍大数据技术之Flume 企业开发案例——聚合(7),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

聚合

1)案例需求:

2)需求分析 

3)实现步骤:

准备工作

创建 flume1-logger-flume.conf

创建 flume2-netcat-flume.conf

创建 flume3-flume-logger.conf

执行配置文件


聚合

1)案例需求:

  • hadoop12 上的 Flume-1 监控文件 /opt/module/group.log
  • hadoop13 上的 Flume-2 监控某个端口的数据流,
  • Flume-1 与 Flume-2 将数据发送给 hadoop14 上的 Flume-3Flume-3 将最终数据打印到控制台。

2)需求分析 

多数据源汇总案例

 

3)实现步骤:

  1. 准备工作
    • 分发 Flume

    • [lzl@hadoop12 module]$ xsync flumexsync 是集群同步文件脚本,也就是在一台服务器分发文件给其他台服务器,脚本内容如下:
      #!/bin/bash
      #1. 判断参数个数
      if [ $# -lt 1 ]
      thenecho Not Enough Arguement!exit;
      fi
      #2. 遍历集群所有机器
      for host in hadoop12 hadoop13 hadoop14
      doecho ====================  $host  ====================#3. 遍历所有目录,挨个发送for file in $@do#4 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
      done
    • hadoop12hadoop13 以及 hadoop14/opt/module/flume/job 目录下创建一个 group3 文件夹。

      [lzl@hadoop12 job]$ mkdir group3
      [lzl@hadoop13 job]$ mkdir group3
      [lzl@hadoop14 job]$ mkdir group3
  2. 创建 flume1-logger-flume.conf
    • 配置 Source 用于监控 /opt/module/group.log 文件,配置 Sink 输出数据到下一级 Flume。

      hadoop12 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume1-logger-flume.conf

      添加如下内容

      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1# Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /opt/module/group.log
      a1.sources.r1.shell = /bin/bash -c# Describe the sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop14
      a1.sinks.k1.port = 4141# Describe the channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
  3. 创建 flume2-netcat-flume.conf
    • 配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume。

       

      hadoop13 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume2-netcat-flume.conf

      添加如下内容

      # Name the components on this agent
      a2.sources = r1
      a2.sinks = k1
      a2.channels = c1# Describe/configure the source
      a2.sources.r1.type = netcat
      a2.sources.r1.bind = hadoop13
      a2.sources.r1.port = 44444# Describe the sink
      a2.sinks.k1.type = avro
      a2.sinks.k1.hostname = hadoop14
      a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
      a2.channels.c1.type = memory
      a2.channels.c1.capacity = 1000
      a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a2.sources.r1.channels = c1
      a2.sinks.k1.channel = c1
  4. 创建 flume3-flume-logger.conf
    • 配置 source 用于接收 flume1flume2 发送过来的数据流,最终合并后 sink 到控制台。

      hadoop14 上编辑配置文件

      [lzl@hadoop14 group3]$ touch flume3-flume-logger.conf
      [lzl@hadoop14 group3]$ vim flume3-flume-logger.conf

      添加如下内容

      # Name the components on this agent
      a3.sources = r1
      a3.sinks = k1
      a3.channels = c1# Describe/configure the source
      a3.sources.r1.type = avro 
      a3.sources.r1.bind = hadoop14
      a3.sources.r1.port = 4141# Describe the sink
      a3.sinks.k1.type = logger# Describe the channel
      a3.channels.c1.type = memory
      a3.channels.c1.capacity = 1000
      a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a3.sources.r1.channels = c1
      a3.sinks.k1.channel = c1
  5. 执行配置文件
    • 分别开启对应配置文件:flume3-flume-logger.confflume2-netcat-flume.confflume1-logger-flume.conf

      [lzl@hadoop14 flume]$ bin/flume-ng agent --conf conf/ --name 
      a3 --conf-file job/group3/flume3-flume-logger.conf 
      -Dflume.root.logger=INFO,console
      [lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name 
      a2 --conf-file job/group3/flume1-logger-flume.conf
      [lzl@hadoop13 flume]$ bin/flume-ng agent --conf conf/ --name 
      a1 --conf-file job/group3/flume2-netcat-flume.conf
  6. hadoop13 上向 /opt/module 目录下的 group.log 追加内容

    [lzl@hadoop13 module]$ echo 'hello' >> group.log
  7. hadoop12 上向 44444 端口发送数据

    [lzl@hadoop12 flume]$ telnet hadoop13 44444
  8. 检查 hadoop14 上数据

 

这篇关于大数据技术之Flume 企业开发案例——聚合(7)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115984

相关文章

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

PyQt5 GUI 开发的基础知识

《PyQt5GUI开发的基础知识》Qt是一个跨平台的C++图形用户界面开发框架,支持GUI和非GUI程序开发,本文介绍了使用PyQt5进行界面开发的基础知识,包括创建简单窗口、常用控件、窗口属性设... 目录简介第一个PyQt程序最常用的三个功能模块控件QPushButton(按钮)控件QLable(纯文本

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Python实现PDF按页分割的技术指南

《Python实现PDF按页分割的技术指南》PDF文件处理是日常工作中的常见需求,特别是当我们需要将大型PDF文档拆分为多个部分时,下面我们就来看看如何使用Python创建一个灵活的PDF分割工具吧... 目录需求分析技术方案工具选择安装依赖完整代码实现使用说明基本用法示例命令输出示例技术亮点实际应用场景扩

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

基于Python开发一个图像水印批量添加工具

《基于Python开发一个图像水印批量添加工具》在当今数字化内容爆炸式增长的时代,图像版权保护已成为创作者和企业的核心需求,本方案将详细介绍一个基于PythonPIL库的工业级图像水印解决方案,有需要... 目录一、系统架构设计1.1 整体处理流程1.2 类结构设计(扩展版本)二、核心算法深入解析2.1 自