大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)

2024-03-09 17:58

本文主要是介绍大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)

一、简述:
Structured Streaming 是基于Spark SQL引擎构建的、可扩展且容错性高的流处理引擎。它以检查点和预写日志记录每个触发时间正处理数据的偏移范围,保证端到端数据的一致性。Spark2.3.0版本引入持续流失处理模型后,可将数据延迟降低到毫秒级。Structured Streaming默认处理模型是微批处理模型,它是将当前一批作业处理完成后,记录日志偏移量后才启动下一批作业,延迟超过100毫秒;持续处理模型将每个任务输入流进行标记记录,遇到任务标记后将偏移量异步报告给引擎,可实现流计算的毫秒级延迟。但持续处理只能做到“至少一次”的一致性。Spark Streaming 采用的数据抽象是DStream(一系列的RDD),Structured Streaming 采用的数据抽象是DataFrame,Spark SQL 只能处理静态数据,而Structured Streaming可处理结构化的流数据,它是Spark Streaming 和 Spark SQL的结合体。
二、Structured Streaming 程序

数据源端,模拟数据发送

[root@hadoop1 ~]# nc -lk 9999
hadoop spark
spark hive
hive spark

流计算端,编写程序

[root@hadoop1 temp]# vi sparksstructtreamwordcount.py
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql import SparkSessionif __name__ == "__main__":
#独立运行该程序,防止别的程序导入spark = SparkSession.builder.appName("structurestreamwordcount").getOrCreate()#遵循工厂设计模式,利用该统一接口创建一系列对象,appName是应用名称,唯一标识应用,getOrCreate检查是否有SparkSession,否则建立一个SparkSession,并设置SparkSession为全局默认的SparkSessionspark.sparkContext.setLogLevel('WARN')#设置日志级别,排除日志查看干扰lines = spark.readStream.format("socket").option("host","192.168.80.2").option("port",9999).load()#创建输入数据源,模式,地址,端口和载入数据words = lines.select(explode(split(lines.value," ")).alias("word"))wordsCounts = words.groupBy("word").count()#定义流计算过程query = wordsCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="5 seconds").start()#启动流计算并输出结果query.awaitTermination()#使得查询在后台持续运行,直到受到用户退出的指令

执行程序

[root@hadoop1 temp]# /home/hadoop/spark/bin/spark-submit /home/hadoop/temp/sparksstructtreamwordcount.py

结果显示
在这里插入图片描述

这篇关于大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/791482

相关文章

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

Python错误AttributeError: 'NoneType' object has no attribute问题的彻底解决方法

《Python错误AttributeError:NoneTypeobjecthasnoattribute问题的彻底解决方法》在Python项目开发和调试过程中,经常会碰到这样一个异常信息... 目录问题背景与概述错误解读:AttributeError: 'NoneType' object has no at

Python使用openpyxl读取Excel的操作详解

《Python使用openpyxl读取Excel的操作详解》本文介绍了使用Python的openpyxl库进行Excel文件的创建、读写、数据操作、工作簿与工作表管理,包括创建工作簿、加载工作簿、操作... 目录1 概述1.1 图示1.2 安装第三方库2 工作簿 workbook2.1 创建:Workboo

基于Python实现简易视频剪辑工具

《基于Python实现简易视频剪辑工具》这篇文章主要为大家详细介绍了如何用Python打造一个功能完备的简易视频剪辑工具,包括视频文件导入与格式转换,基础剪辑操作,音频处理等功能,感兴趣的小伙伴可以了... 目录一、技术选型与环境搭建二、核心功能模块实现1. 视频基础操作2. 音频处理3. 特效与转场三、高

Python实现中文文本处理与分析程序的示例详解

《Python实现中文文本处理与分析程序的示例详解》在当今信息爆炸的时代,文本数据的处理与分析成为了数据科学领域的重要课题,本文将使用Python开发一款基于Python的中文文本处理与分析程序,希望... 目录一、程序概述二、主要功能解析2.1 文件操作2.2 基础分析2.3 高级分析2.4 可视化2.5

一文解密Python进行监控进程的黑科技

《一文解密Python进行监控进程的黑科技》在计算机系统管理和应用性能优化中,监控进程的CPU、内存和IO使用率是非常重要的任务,下面我们就来讲讲如何Python写一个简单使用的监控进程的工具吧... 目录准备工作监控CPU使用率监控内存使用率监控IO使用率小工具代码整合在计算机系统管理和应用性能优化中,监

Python实现终端清屏的几种方式详解

《Python实现终端清屏的几种方式详解》在使用Python进行终端交互式编程时,我们经常需要清空当前终端屏幕的内容,本文为大家整理了几种常见的实现方法,有需要的小伙伴可以参考下... 目录方法一:使用 `os` 模块调用系统命令方法二:使用 `subprocess` 模块执行命令方法三:打印多个换行符模拟