Spark Streaming(二)—— Spark Streaming基本数据源

2024-06-19 04:38

本文主要是介绍Spark Streaming(二)—— Spark Streaming基本数据源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 基本数据源
    • 1. 文件流(textFileStream)
    • 2. RDD队列流(queueStream,队列里是RDD)
    • 3. 套接字流(socketTextStream)

基本数据源

Spark Streaming 是一个流式计算引擎,就需要对接外部数据源来接收数据。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。

基本数据源有:文件系统、套接字连接、Akka的actor等。

1. 文件流(textFileStream)

监控文件系统的变化,如果有文件增加,读取新的内容
① 这些文件具有相同的格式
② 这些文件通过原子移动或重命名文件的方式在dataDirectory创建
③ 如果在文件中追加内容,这些追加的新数据不会被读取。

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevelobject FileStreaming {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//local[2]代表开启两个线程val conf = new SparkConf().setAppName("FileStreaming").setMaster("local[2]")//接收两个参数,第一个conf,第二个是采样时间间隔val ssc = new StreamingContext(conf, Seconds(3))//监控目录 如果文件系统发生变化 就读取进来val lines = ssc.textFileStream("H:\\tmp_files\\test_file_stream")lines.print()ssc.start()ssc.awaitTermination()}
}

2. RDD队列流(queueStream,队列里是RDD)

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDDobject RDDQueueStream {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("RDDQueueStream").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(1))//需要一个RDD队列val rddQueue = new Queue[RDD[Int]]()for( i <- 1 to 3){rddQueue += ssc.sparkContext.makeRDD(1 to 10)Thread.sleep(5000)}//从队列中接收数据 创建DStreamval inputDStream = ssc.queueStream(rddQueue)val result = inputDStream.map(x=>(x,x*2))result.print()ssc.start()ssc.awaitTermination()}
}

3. 套接字流(socketTextStream)

val lines = sc.socketTextStream("192.168.15.131",1234)
lines.print()

这篇关于Spark Streaming(二)—— Spark Streaming基本数据源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074029

相关文章

Python ORM神器之SQLAlchemy基本使用完全指南

《PythonORM神器之SQLAlchemy基本使用完全指南》SQLAlchemy是Python主流ORM框架,通过对象化方式简化数据库操作,支持多数据库,提供引擎、会话、模型等核心组件,实现事务... 目录一、什么是SQLAlchemy?二、安装SQLAlchemy三、核心概念1. Engine(引擎)

Python异步编程之await与asyncio基本用法详解

《Python异步编程之await与asyncio基本用法详解》在Python中,await和asyncio是异步编程的核心工具,用于高效处理I/O密集型任务(如网络请求、文件读写、数据库操作等),接... 目录一、核心概念二、使用场景三、基本用法1. 定义协程2. 运行协程3. 并发执行多个任务四、关键

Go语言连接MySQL数据库执行基本的增删改查

《Go语言连接MySQL数据库执行基本的增删改查》在后端开发中,MySQL是最常用的关系型数据库之一,本文主要为大家详细介绍了如何使用Go连接MySQL数据库并执行基本的增删改查吧... 目录Go语言连接mysql数据库准备工作安装 MySQL 驱动代码实现运行结果注意事项Go语言执行基本的增删改查准备工作

java实现多数据源切换方式

《java实现多数据源切换方式》本文介绍实现多数据源切换的四步方法:导入依赖、配置文件、启动类注解、使用@DS标记mapper和服务层,通过注解实现数据源动态切换,适用于实际开发中的多数据源场景... 目录一、导入依赖二、配置文件三、在启动类上配置四、在需要切换数据源的类上、方法上使用@DS注解结论一、导入

DNS查询的利器! linux的dig命令基本用法详解

《DNS查询的利器!linux的dig命令基本用法详解》dig命令可以查询各种类型DNS记录信息,下面我们将通过实际示例和dig命令常用参数来详细说明如何使用dig实用程序... dig(Domain Information Groper)是一款功能强大的 linux 命令行实用程序,通过查询名称服务器并输

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

C#连接SQL server数据库命令的基本步骤

《C#连接SQLserver数据库命令的基本步骤》文章讲解了连接SQLServer数据库的步骤,包括引入命名空间、构建连接字符串、使用SqlConnection和SqlCommand执行SQL操作,... 目录建议配合使用:如何下载和安装SQL server数据库-CSDN博客1. 引入必要的命名空间2.

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Java中的数组与集合基本用法详解

《Java中的数组与集合基本用法详解》本文介绍了Java数组和集合框架的基础知识,数组部分涵盖了一维、二维及多维数组的声明、初始化、访问与遍历方法,以及Arrays类的常用操作,对Java数组与集合相... 目录一、Java数组基础1.1 数组结构概述1.2 一维数组1.2.1 声明与初始化1.2.2 访问