Spark Streaming(三)—— 高级数据源Flume

2024-06-19 04:38

本文主要是介绍Spark Streaming(三)—— 高级数据源Flume,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 高级数据源Flume
    • 1. Push方式
    • 2. 基于Custom Sink的Pull模式

高级数据源Flume

Spark Streaming 是一个流式计算引擎,就需要对接外部数据源来对接、接收数据。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。Spark Streaming的基本数据源(文件流、RDD队列流、套接字流)上篇已经介绍过了,而Spark Streaming的高级数据流主要有Kafka,Flume,Kinesis,Twitter等。本文主要介绍Flume作为高级数据源的使用。

1. Push方式

Flume将数据推送给Spark Streaming。在这种方式下,Spark Streaming可以很方便的建立一个Receiver,起到一个Avro agent的作用。Flume可以将数据推送到该Receiver。

Flume配置文件:

#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1#具体定义source 采集该目录下的日志
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /root/training/logs#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100#具体定义sink
a4.sinks = k1
a4.sinks.k1.type = avro
a4.sinks.k1.channel = c1
a4.sinks.k1.hostname = 192.168.15.131
a4.sinks.k1.port = 1234#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

Spark Streaming Demo:
注意除了需要使用Flume的lib的jar包以外,还需要spark-streaming-flume_2.10-2.1.0.jar

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtilsobject MyFlumeStream {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("MyFlumeStream").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(3))//创建 flume event 从 flume中接收push来的数据 ---> 也是DStream//flume将数据push到了 ip 和 端口中  ip和端口在Flume配置文件中设置val flumeEventDstream = FlumeUtils.createStream(ssc, "192.168.15.131", 1234)val lineDStream = flumeEventDstream.map( e => {new String(e.event.getBody.array)})lineDStream.print()ssc.start()ssc.awaitTermination()}
}

测试:
1、启动Spark Streaming程序。
2、启动Flume。

bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console

3、拷贝日志文件到/root/training/logs目录。
4、观察输出,采集到数据。

2. 基于Custom Sink的Pull模式

比第一种有更好的健壮性和容错性。生产使用这个方式。

不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume Sink。Flume将数据推送到Sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从Sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的Sink。

Flume 配置文件

a1.channels = c1
a1.sinks = k1
a1.sources = r1a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/training/logsa1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.15.131
a1.sinks.k1.port = 1234#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Spark Streaming Full Demo:
注意除了需要使用Flume的lib的jar包以外,还需要
将Spark的jar包拷贝到Flume的lib目录下,spark-streaming-flume_2.10-2.1.0.jar也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath。

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevelobject FlumeLogPull {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(1))val flumeEvent = FlumeUtils.createPollingStream(ssc, "192.168.15.131", 1234, StorageLevel.MEMORY_ONLY_SER)val lineDStream = flumeEvent.map( e => {new String(e.event.getBody.array)})lineDStream.print()ssc.start()ssc.awaitTermination()}
}

测试:
1、启动Flume。

bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console

2、启动Spark Streaming程序。
3、拷贝日志文件到/root/training/logs目录。
4、观察输出,采集到数据。

这篇关于Spark Streaming(三)—— 高级数据源Flume的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074030

相关文章

JavaScript中的高级调试方法全攻略指南

《JavaScript中的高级调试方法全攻略指南》什么是高级JavaScript调试技巧,它比console.log有何优势,如何使用断点调试定位问题,通过本文,我们将深入解答这些问题,带您从理论到实... 目录观点与案例结合观点1观点2观点3观点4观点5高级调试技巧详解实战案例断点调试:定位变量错误性能分

从基础到高级详解Python数值格式化输出的完全指南

《从基础到高级详解Python数值格式化输出的完全指南》在数据分析、金融计算和科学报告领域,数值格式化是提升可读性和专业性的关键技术,本文将深入解析Python中数值格式化输出的相关方法,感兴趣的小伙... 目录引言:数值格式化的核心价值一、基础格式化方法1.1 三种核心格式化方式对比1.2 基础格式化示例

Android协程高级用法大全

《Android协程高级用法大全》这篇文章给大家介绍Android协程高级用法大全,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友跟随小编一起学习吧... 目录1️⃣ 协程作用域(CoroutineScope)与生命周期绑定Activity/Fragment 中手

深度解析Python yfinance的核心功能和高级用法

《深度解析Pythonyfinance的核心功能和高级用法》yfinance是一个功能强大且易于使用的Python库,用于从YahooFinance获取金融数据,本教程将深入探讨yfinance的核... 目录yfinance 深度解析教程 (python)1. 简介与安装1.1 什么是 yfinance?

MySQL数据类型与表操作全指南( 从基础到高级实践)

《MySQL数据类型与表操作全指南(从基础到高级实践)》本文详解MySQL数据类型分类(数值、日期/时间、字符串)及表操作(创建、修改、维护),涵盖优化技巧如数据类型选择、备份、分区,强调规范设计与... 目录mysql数据类型详解数值类型日期时间类型字符串类型表操作全解析创建表修改表结构添加列修改列删除列

java实现多数据源切换方式

《java实现多数据源切换方式》本文介绍实现多数据源切换的四步方法:导入依赖、配置文件、启动类注解、使用@DS标记mapper和服务层,通过注解实现数据源动态切换,适用于实际开发中的多数据源场景... 目录一、导入依赖二、配置文件三、在启动类上配置四、在需要切换数据源的类上、方法上使用@DS注解结论一、导入

Python 函数详解:从基础语法到高级使用技巧

《Python函数详解:从基础语法到高级使用技巧》本文基于实例代码,全面讲解Python函数的定义、参数传递、变量作用域及类型标注等知识点,帮助初学者快速掌握函数的使用技巧,感兴趣的朋友跟随小编一起... 目录一、函数的基本概念与作用二、函数的定义与调用1. 无参函数2. 带参函数3. 带返回值的函数4.

Java Stream 的 Collectors.toMap高级应用与最佳实践

《JavaStream的Collectors.toMap高级应用与最佳实践》文章讲解JavaStreamAPI中Collectors.toMap的使用,涵盖基础语法、键冲突处理、自定义Map... 目录一、基础用法回顾二、处理键冲突三、自定义 Map 实现类型四、处理 null 值五、复杂值类型转换六、处理

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be