Structured Streaming基础--学习笔记

2024-01-23 21:12

本文主要是介绍Structured Streaming基础--学习笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Structured streaming介绍

spark进行实时数据流计算时有两个工具:

  • Spark Streaming:编写rdd代码处理数据流,可以解决非结构化的流式数据
  • Structured Streaming:编写df代码处理数据流,可以解决结构化和半结构化的流式数据

1,数据相关介绍

有界数据和无界数据

①有界数据:

  • 有起始位置,有结束位置。比如文件数据 有起始行,有结束行
  • 有明确的数据容量大小。处理数据时就能知道处理的数据大小
  • 在处理数据时,按批次处理。数据处理完成程序就结束
  • 离线计算时处理的都是有界数据

②无界数据

  • 有起始位置,没有结束位置,知道数据的起始位置在哪里,但是数据到哪结束不知道(因为数据在不断产生,什么时候结束不知道)
  • 流式数据都是无界数据
  • 无界数据的总量是不确定的
  • 数据是不断产生的
  • 数据有时效性 (有效期)
  • 处理无界数据时,程序是持续运行的
  • 实时计算时处理的都是无界数据
  • 近期实时计算处理的微批数据

离线计算:

  • 离线计算就是在计算开始前已知所有输入数据,输入数据不会产生变化,且在解决一个问题后就要立即得出结果的前提下进行的计算。
  • 数据处理时间大于1个小时,一般离线计算的处理时间都是t+1天
  • mapreduce框架/spark框架

近实时计算:

  • 近实时计算就是在计算开始前将多条数据(流数据)放在一起处理,同时处理的是几条数据
  • 数据处理时间在5分钟到1小时范围内
  • spark框架

实时计算:

  • 实时计算就是一条一条的处理数据,处理的时间延迟很低
  • 数据处理时间小于5分钟
  • flink框架

2,Structured streaming基本使用

没有ncat服务的话,在线安装或离线导入

命令:yum install nc

执行前需要先启动ncat服务

命令:ncat -lk 8888

from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取socket工具中的流数据options = {#指定ip地址'host':'192.168.88.100',#指定socket的端口号'port':'8888'
}df1 = ss.readStream.load(format='socket',**options)
#查看里面的数据不能通过show()方法查看
df1.printSchema()
#展示数据
#start:启动流计算
#awaitTermiantion():使应用程序一直运行
df1.writeStream.start(format='console',outputMode='append').awaitTermination()

3,Structured Streaming编程模型

1、Input Table 输入数据表 无界表

2、Query 对数据进行查询计算

3、Result Table 保存计算结果

4、Output 输出结果

变成模型遵循ETL处理流程:
①E->读取流数据,转换成无界表
②T->使用sparkSql处理流数据,流计算,查询计算
③L->存储E的结果

from pyspark.sql import SparkSession,functions as Fss = SparkSession.builder.getOrCreate()options = {# 指定ip地址'host': '192.168.88.100',# 指定socket的端口号'port': '8888'
}df1 = ss.readStream.load(format='socket',**options)
df_split = df1.select(F.split('value',','

这篇关于Structured Streaming基础--学习笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/637545

相关文章

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

安装centos8设置基础软件仓库时出错的解决方案

《安装centos8设置基础软件仓库时出错的解决方案》:本文主要介绍安装centos8设置基础软件仓库时出错的解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录安装Centos8设置基础软件仓库时出错版本 8版本 8.2.200android4版本 javas

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio

Linux基础命令@grep、wc、管道符的使用详解

《Linux基础命令@grep、wc、管道符的使用详解》:本文主要介绍Linux基础命令@grep、wc、管道符的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录grep概念语法作用演示一演示二演示三,带选项 -nwc概念语法作用wc,不带选项-c,统计字节数-

python操作redis基础

《python操作redis基础》Redis(RemoteDictionaryServer)是一个开源的、基于内存的键值对(Key-Value)存储系统,它通常用作数据库、缓存和消息代理,这篇文章... 目录1. Redis 简介2. 前提条件3. 安装 python Redis 客户端库4. 连接到 Re

SpringBoot基础框架详解

《SpringBoot基础框架详解》SpringBoot开发目的是为了简化Spring应用的创建、运行、调试和部署等,使用SpringBoot可以不用或者只需要很少的Spring配置就可以让企业项目快... 目录SpringBoot基础 – 框架介绍1.SpringBoot介绍1.1 概述1.2 核心功能2

Spring Boot集成SLF4j从基础到高级实践(最新推荐)

《SpringBoot集成SLF4j从基础到高级实践(最新推荐)》SLF4j(SimpleLoggingFacadeforJava)是一个日志门面(Facade),不是具体的日志实现,这篇文章主要介... 目录一、日志框架概述与SLF4j简介1.1 为什么需要日志框架1.2 主流日志框架对比1.3 SLF4

Spring Boot集成Logback终极指南之从基础到高级配置实战指南

《SpringBoot集成Logback终极指南之从基础到高级配置实战指南》Logback是一个可靠、通用且快速的Java日志框架,作为Log4j的继承者,由Log4j创始人设计,:本文主要介绍... 目录一、Logback简介与Spring Boot集成基础1.1 Logback是什么?1.2 Sprin

重新对Java的类加载器的学习方式

《重新对Java的类加载器的学习方式》:本文主要介绍重新对Java的类加载器的学习方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍1.1、简介1.2、符号引用和直接引用1、符号引用2、直接引用3、符号转直接的过程2、加载流程3、类加载的分类3.1、显示