大数据(8q)流计算updateStateByKey

2023-11-11 02:40

本文主要是介绍大数据(8q)流计算updateStateByKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • updateStateByKey示例
  • updateStateByKey源码
  • Option知识补充
    • getOrElse
    • isEmpty

前言

  • 本文属于Spark Streaming分支章节
  • 流式处理中,分为有状态冇状态
  • 有状态:记录之前数据流处理的信息
  • updateStateByKey有状态Transformation

updateStateByKey示例

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable// 创建SparkContext和SparkStreamingContext
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local[2]")
val sc: SparkContext = new SparkContext(c0)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(9))
// 创建RDD队列,并放入QueueInputDStream
val rddQueue: mutable.Queue[RDD[String]] = new mutable.Queue[RDD[String]]()
val iDS: InputDStream[String] = ssc.queueStream(rddQueue, oneAtATime = false)
//===========================================================================
// 数据预处理
val dS: DStream[(String, Int)] = iDS.map((_, 1))
// 无状态
dS.reduceByKey(_ + _).print()
//设置检查点路径,用于保存状态
ssc.checkpoint("checkpoint")
// 根据 Key 来更新状态
dS.updateStateByKey(// seq是一个DStream内所有RDD相同Key连成的Value队列(seq: Seq[Int], state: Option[Int]) => {Option(seq.sum + state.getOrElse(0))}
).print()
//===========================================================================
// 启动任务:循环输入文本,按空格切分
ssc.start()
while (true) {rddQueue += sc.makeRDD(scala.io.StdIn.readLine.split(" "))
}
ssc.awaitTermination()

结果打印

updateStateByKey源码

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true)
}
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {val cleanedFunc = ssc.sc.clean(updateFunc)val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => {cleanedFunc(it)}new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, None)
}

Option知识补充

  • Option译作选项,用来表示一个值是可选的(有值或无值)
  • Option[T]是一个类型为T的可选值的容器:
    若值存在,Option[T]就是一个Some[T]
    若值不存在,Option[T]就是对象None
val myMap: Map[String, String] = Map("key1" -> "value1")
val value1: Option[String] = myMap.get("key1")
val value2: Option[String] = myMap.get("key2")
println(value1)  // Some("value1")
println(value2)  // None

getOrElse

val a: Option[Int] = Some(5)
val b: Option[Int] = None
println(a.getOrElse(0))  // 5
println(b.getOrElse(0))  // 0

isEmpty

val a: Option[Int] = Some(5)
val b: Option[Int] = None
println(a.isEmpty)  // false
println(b.isEmpty)  // true

这篇关于大数据(8q)流计算updateStateByKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/386990

相关文章

使用Python开发一个Ditto剪贴板数据导出工具

《使用Python开发一个Ditto剪贴板数据导出工具》在日常工作中,我们经常需要处理大量的剪贴板数据,下面将介绍如何使用Python的wxPython库开发一个图形化工具,实现从Ditto数据库中读... 目录前言运行结果项目需求分析技术选型核心功能实现1. Ditto数据库结构分析2. 数据库自动定位3

pandas数据的合并concat()和merge()方式

《pandas数据的合并concat()和merge()方式》Pandas中concat沿轴合并数据框(行或列),merge基于键连接(内/外/左/右),concat用于纵向或横向拼接,merge用于... 目录concat() 轴向连接合并(1) join='outer',axis=0(2)join='o

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口