Spark Streaming(三)—— 高级数据源Flume

2024-06-19 04:38

本文主要是介绍Spark Streaming(三)—— 高级数据源Flume,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 高级数据源Flume
    • 1. Push方式
    • 2. 基于Custom Sink的Pull模式

高级数据源Flume

Spark Streaming 是一个流式计算引擎,就需要对接外部数据源来对接、接收数据。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。Spark Streaming的基本数据源(文件流、RDD队列流、套接字流)上篇已经介绍过了,而Spark Streaming的高级数据流主要有Kafka,Flume,Kinesis,Twitter等。本文主要介绍Flume作为高级数据源的使用。

1. Push方式

Flume将数据推送给Spark Streaming。在这种方式下,Spark Streaming可以很方便的建立一个Receiver,起到一个Avro agent的作用。Flume可以将数据推送到该Receiver。

Flume配置文件:

#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1#具体定义source 采集该目录下的日志
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /root/training/logs#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100#具体定义sink
a4.sinks = k1
a4.sinks.k1.type = avro
a4.sinks.k1.channel = c1
a4.sinks.k1.hostname = 192.168.15.131
a4.sinks.k1.port = 1234#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

Spark Streaming Demo:
注意除了需要使用Flume的lib的jar包以外,还需要spark-streaming-flume_2.10-2.1.0.jar

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtilsobject MyFlumeStream {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("MyFlumeStream").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(3))//创建 flume event 从 flume中接收push来的数据 ---> 也是DStream//flume将数据push到了 ip 和 端口中  ip和端口在Flume配置文件中设置val flumeEventDstream = FlumeUtils.createStream(ssc, "192.168.15.131", 1234)val lineDStream = flumeEventDstream.map( e => {new String(e.event.getBody.array)})lineDStream.print()ssc.start()ssc.awaitTermination()}
}

测试:
1、启动Spark Streaming程序。
2、启动Flume。

bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console

3、拷贝日志文件到/root/training/logs目录。
4、观察输出,采集到数据。

2. 基于Custom Sink的Pull模式

比第一种有更好的健壮性和容错性。生产使用这个方式。

不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume Sink。Flume将数据推送到Sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从Sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的Sink。

Flume 配置文件

a1.channels = c1
a1.sinks = k1
a1.sources = r1a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/training/logsa1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.15.131
a1.sinks.k1.port = 1234#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Spark Streaming Full Demo:
注意除了需要使用Flume的lib的jar包以外,还需要
将Spark的jar包拷贝到Flume的lib目录下,spark-streaming-flume_2.10-2.1.0.jar也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath。

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevelobject FlumeLogPull {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(1))val flumeEvent = FlumeUtils.createPollingStream(ssc, "192.168.15.131", 1234, StorageLevel.MEMORY_ONLY_SER)val lineDStream = flumeEvent.map( e => {new String(e.event.getBody.array)})lineDStream.print()ssc.start()ssc.awaitTermination()}
}

测试:
1、启动Flume。

bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console

2、启动Spark Streaming程序。
3、拷贝日志文件到/root/training/logs目录。
4、观察输出,采集到数据。

这篇关于Spark Streaming(三)—— 高级数据源Flume的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074030

相关文章

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Python中你不知道的gzip高级用法分享

《Python中你不知道的gzip高级用法分享》在当今大数据时代,数据存储和传输成本已成为每个开发者必须考虑的问题,Python内置的gzip模块提供了一种简单高效的解决方案,下面小编就来和大家详细讲... 目录前言:为什么数据压缩如此重要1. gzip 模块基础介绍2. 基本压缩与解压缩操作2.1 压缩文

Java中的for循环高级用法

《Java中的for循环高级用法》本文系统解析Java中传统、增强型for循环、StreamAPI及并行流的实现原理与性能差异,并通过大量代码示例展示实际开发中的最佳实践,感兴趣的朋友一起看看吧... 目录前言一、基础篇:传统for循环1.1 标准语法结构1.2 典型应用场景二、进阶篇:增强型for循环2.

使用Python进行GRPC和Dubbo协议的高级测试

《使用Python进行GRPC和Dubbo协议的高级测试》GRPC(GoogleRemoteProcedureCall)是一种高性能、开源的远程过程调用(RPC)框架,Dubbo是一种高性能的分布式服... 目录01 GRPC测试安装gRPC编写.proto文件实现服务02 Dubbo测试1. 安装Dubb

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性

mysql中的group by高级用法详解

《mysql中的groupby高级用法详解》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,本文给大家介绍mysql中的groupby... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

PyTorch高级特性与性能优化方式

《PyTorch高级特性与性能优化方式》:本文主要介绍PyTorch高级特性与性能优化方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、自动化机制1.自动微分机制2.动态计算图二、性能优化1.内存管理2.GPU加速3.多GPU训练三、分布式训练1.分布式数据

Spring Boot集成SLF4j从基础到高级实践(最新推荐)

《SpringBoot集成SLF4j从基础到高级实践(最新推荐)》SLF4j(SimpleLoggingFacadeforJava)是一个日志门面(Facade),不是具体的日志实现,这篇文章主要介... 目录一、日志框架概述与SLF4j简介1.1 为什么需要日志框架1.2 主流日志框架对比1.3 SLF4

Spring Boot集成Logback终极指南之从基础到高级配置实战指南

《SpringBoot集成Logback终极指南之从基础到高级配置实战指南》Logback是一个可靠、通用且快速的Java日志框架,作为Log4j的继承者,由Log4j创始人设计,:本文主要介绍... 目录一、Logback简介与Spring Boot集成基础1.1 Logback是什么?1.2 Sprin

MySQL复合查询从基础到多表关联与高级技巧全解析

《MySQL复合查询从基础到多表关联与高级技巧全解析》本文主要讲解了在MySQL中的复合查询,下面是关于本文章所需要数据的建表语句,感兴趣的朋友跟随小编一起看看吧... 目录前言:1.基本查询回顾:1.1.查询工资高于500或岗位为MANAGER的雇员,同时还要满足他们的姓名首字母为大写的J1.2.按照部门