周期性清除Spark Streaming流状态的方法

2024-09-06 21:58

本文主要是介绍周期性清除Spark Streaming流状态的方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子:


现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。


编写脚本重启Streaming程序

用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本:

stream_app_name='com.xyz.streaming.MallForwardStreaming'
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`

if [ ${cnt} -eq 1 ]; then
pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
kill -9 ${pid}
sleep 20
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
if [ ${cnt} -eq 0 ]; then
nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
fi
fi

这种方式最简单,也不需要对程序本身做任何改动。但随着同时运行的Streaming任务越来越多,就会显得越来越累赘了。

给StreamingContext设置超时

在程序启动之前,先计算出当前时间点距离第二天凌晨0点的毫秒数:

def msTillTomorrow = {
val now = new Date()
val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
tomorrow.getTime - now.getTime
}

然后将Streaming程序的主要逻辑写在while(true)循环中,并且不像平常一样调用StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

while (true) {
val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
ssc.checkpoint(CHECKPOINT_DIR)

// ...处理逻辑...

ssc.start()
ssc.awaitTerminationOrTimeout(msTillTomorrow)
ssc.stop(false, true)
Thread.sleep(BATCH_INTERVAL * 1000)
}

在经过msTillTomorrow毫秒之后,StreamingContext就会超时,再调用其stop()方法(注意两个参数,stopSparkContext表示是否停止关联的SparkContext,stopGracefully表示是否优雅停止),就可以停止并重启StreamingContext。


以上两种方法都是仍然采用Spark Streaming的机制进行状态计算的。如果其他条件允许的话,我们还可以抛弃mapWithState(),直接借助外部存储自己维护状态。比如将Redis的Key设计为product_pv:[product_id]:[date],然后在Spark Streaming的每个批次中使用incrby指令,就能方便地统计PV了,不必考虑定时的问题。


640?wx_fmt=gif

640?wx_fmt=jpeg

这篇关于周期性清除Spark Streaming流状态的方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1143233

相关文章

IIS 7.0 及更高版本中的 FTP 状态代码

《IIS7.0及更高版本中的FTP状态代码》本文介绍IIS7.0中的FTP状态代码,方便大家在使用iis中发现ftp的问题... 简介尝试使用 FTP 访问运行 Internet Information Services (IIS) 7.0 或更高版本的服务器上的内容时,IIS 将返回指示响应状态的数字代

MySQL启动报错:InnoDB表空间丢失问题及解决方法

《MySQL启动报错:InnoDB表空间丢失问题及解决方法》在启动MySQL时,遇到了InnoDB:Tablespace5975wasnotfound,该错误表明MySQL在启动过程中无法找到指定的s... 目录mysql 启动报错:InnoDB 表空间丢失问题及解决方法错误分析解决方案1. 启用 inno

Python函数返回多个值的多种方法小结

《Python函数返回多个值的多种方法小结》在Python中,函数通常用于封装一段代码,使其可以重复调用,有时,我们希望一个函数能够返回多个值,Python提供了几种不同的方法来实现这一点,需要的朋友... 目录一、使用元组(Tuple):二、使用列表(list)三、使用字典(Dictionary)四、 使

Linux查看系统盘和SSD盘的容量、型号及挂载信息的方法

《Linux查看系统盘和SSD盘的容量、型号及挂载信息的方法》在Linux系统中,管理磁盘设备和分区是日常运维工作的重要部分,而lsblk命令是一个强大的工具,它用于列出系统中的块设备(blockde... 目录1. 查看所有磁盘的物理信息方法 1:使用 lsblk(推荐)方法 2:使用 fdisk -l(

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1

MySQL查看表的最后一个ID的常见方法

《MySQL查看表的最后一个ID的常见方法》在使用MySQL数据库时,我们经常会遇到需要查看表中最后一个id值的场景,无论是为了调试、数据分析还是其他用途,了解如何快速获取最后一个id都是非常实用的技... 目录背景介绍方法一:使用MAX()函数示例代码解释适用场景方法二:按id降序排序并取第一条示例代码解

Python中合并列表(list)的六种方法小结

《Python中合并列表(list)的六种方法小结》本文主要介绍了Python中合并列表(list)的六种方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录一、直接用 + 合并列表二、用 extend() js方法三、用 zip() 函数交叉合并四、用

Java 中的跨域问题解决方法

《Java中的跨域问题解决方法》跨域问题本质上是浏览器的一种安全机制,与Java本身无关,但Java后端开发者需要理解其来源以便正确解决,下面给大家介绍Java中的跨域问题解决方法,感兴趣的朋友一起... 目录1、Java 中跨域问题的来源1.1. 浏览器同源策略(Same-Origin Policy)1.

Java Stream.reduce()方法操作实际案例讲解

《JavaStream.reduce()方法操作实际案例讲解》reduce是JavaStreamAPI中的一个核心操作,用于将流中的元素组合起来产生单个结果,:本文主要介绍JavaStream.... 目录一、reduce的基本概念1. 什么是reduce操作2. reduce方法的三种形式二、reduce

MybatisX快速生成增删改查的方法示例

《MybatisX快速生成增删改查的方法示例》MybatisX是基于IDEA的MyBatis/MyBatis-Plus开发插件,本文主要介绍了MybatisX快速生成增删改查的方法示例,文中通过示例代... 目录1 安装2 基本功能2.1 XML跳转2.2 代码生成2.2.1 生成.xml中的sql语句头2