Spark Streaming(二)—— Spark Streaming基本数据源

2024-06-19 04:38

本文主要是介绍Spark Streaming(二)—— Spark Streaming基本数据源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 基本数据源
    • 1. 文件流(textFileStream)
    • 2. RDD队列流(queueStream,队列里是RDD)
    • 3. 套接字流(socketTextStream)

基本数据源

Spark Streaming 是一个流式计算引擎,就需要对接外部数据源来接收数据。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。

基本数据源有:文件系统、套接字连接、Akka的actor等。

1. 文件流(textFileStream)

监控文件系统的变化,如果有文件增加,读取新的内容
① 这些文件具有相同的格式
② 这些文件通过原子移动或重命名文件的方式在dataDirectory创建
③ 如果在文件中追加内容,这些追加的新数据不会被读取。

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevelobject FileStreaming {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//local[2]代表开启两个线程val conf = new SparkConf().setAppName("FileStreaming").setMaster("local[2]")//接收两个参数,第一个conf,第二个是采样时间间隔val ssc = new StreamingContext(conf, Seconds(3))//监控目录 如果文件系统发生变化 就读取进来val lines = ssc.textFileStream("H:\\tmp_files\\test_file_stream")lines.print()ssc.start()ssc.awaitTermination()}
}

2. RDD队列流(queueStream,队列里是RDD)

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDDobject RDDQueueStream {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("RDDQueueStream").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(1))//需要一个RDD队列val rddQueue = new Queue[RDD[Int]]()for( i <- 1 to 3){rddQueue += ssc.sparkContext.makeRDD(1 to 10)Thread.sleep(5000)}//从队列中接收数据 创建DStreamval inputDStream = ssc.queueStream(rddQueue)val result = inputDStream.map(x=>(x,x*2))result.print()ssc.start()ssc.awaitTermination()}
}

3. 套接字流(socketTextStream)

val lines = sc.socketTextStream("192.168.15.131",1234)
lines.print()

这篇关于Spark Streaming(二)—— Spark Streaming基本数据源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074029

相关文章

Redis 基本数据类型和使用详解

《Redis基本数据类型和使用详解》String是Redis最基本的数据类型,一个键对应一个值,它的功能十分强大,可以存储字符串、整数、浮点数等多种数据格式,本文给大家介绍Redis基本数据类型和... 目录一、Redis 入门介绍二、Redis 的五大基本数据类型2.1 String 类型2.2 Hash

Java Instrumentation从概念到基本用法详解

《JavaInstrumentation从概念到基本用法详解》JavaInstrumentation是java.lang.instrument包提供的API,允许开发者在类被JVM加载时对其进行修改... 目录一、什么是 Java Instrumentation主要用途二、核心概念1. Java Agent

Kotlin 协程之Channel的概念和基本使用详解

《Kotlin协程之Channel的概念和基本使用详解》文章介绍协程在复杂场景中使用Channel进行数据传递与控制,涵盖创建参数、缓冲策略、操作方式及异常处理,适用于持续数据流、多协程协作等,需注... 目录前言launch / async 适合的场景Channel 的概念和基本使用概念Channel 的

Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧

《Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧》本文将通过实际代码示例,深入讲解Python函数的基本用法、返回值特性、全局变量修改以及异常处理技巧,感兴趣的朋友跟随小编一起看看... 目录一、python函数定义与调用1.1 基本函数定义1.2 函数调用二、函数返回值详解2.1 有返

Python ORM神器之SQLAlchemy基本使用完全指南

《PythonORM神器之SQLAlchemy基本使用完全指南》SQLAlchemy是Python主流ORM框架,通过对象化方式简化数据库操作,支持多数据库,提供引擎、会话、模型等核心组件,实现事务... 目录一、什么是SQLAlchemy?二、安装SQLAlchemy三、核心概念1. Engine(引擎)

Python异步编程之await与asyncio基本用法详解

《Python异步编程之await与asyncio基本用法详解》在Python中,await和asyncio是异步编程的核心工具,用于高效处理I/O密集型任务(如网络请求、文件读写、数据库操作等),接... 目录一、核心概念二、使用场景三、基本用法1. 定义协程2. 运行协程3. 并发执行多个任务四、关键

Go语言连接MySQL数据库执行基本的增删改查

《Go语言连接MySQL数据库执行基本的增删改查》在后端开发中,MySQL是最常用的关系型数据库之一,本文主要为大家详细介绍了如何使用Go连接MySQL数据库并执行基本的增删改查吧... 目录Go语言连接mysql数据库准备工作安装 MySQL 驱动代码实现运行结果注意事项Go语言执行基本的增删改查准备工作

java实现多数据源切换方式

《java实现多数据源切换方式》本文介绍实现多数据源切换的四步方法:导入依赖、配置文件、启动类注解、使用@DS标记mapper和服务层,通过注解实现数据源动态切换,适用于实际开发中的多数据源场景... 目录一、导入依赖二、配置文件三、在启动类上配置四、在需要切换数据源的类上、方法上使用@DS注解结论一、导入

DNS查询的利器! linux的dig命令基本用法详解

《DNS查询的利器!linux的dig命令基本用法详解》dig命令可以查询各种类型DNS记录信息,下面我们将通过实际示例和dig命令常用参数来详细说明如何使用dig实用程序... dig(Domain Information Groper)是一款功能强大的 linux 命令行实用程序,通过查询名称服务器并输

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be