用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc())))

本文主要是介绍用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc()))),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

数据:

1,11,111
2,22,222
3,33,333
1,22,333
1,22,444

代码:

package com.emg.etp.analysis.preproces.nullphotoimport com.emg.etp.analysis.preproces.nullphoto.pojo.EcarData
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkStrategies
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBuffer/*** @Auther: sss* @Date: 2020/7/21 16:20* @Description:*/
object Tests {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("etpProcess").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array[Class[_]](EcarData.getClass))val spark = SparkSession.builder().config(conf).getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._val rdd = sc.textFile("C:\\Users\\sss\\Desktop\\qqq\\aa.txt")val win = Window.partitionBy("id1").orderBy(desc("id3"))val rdd2 = rdd.map(line => {val data = line.split(",", -1)(data(0), data(1), data(2))})val data = rdd2.toDF("id1", "id2", "id3").withColumn("aa", lag("id3", 1).over(win))data.show()val df_difftime = data.withColumn("diff", when(isnull(col("id3") - col("aa")), 0).otherwise((col("id3") - col("aa"))))df_difftime.show()}
}

 结果:

 

这篇关于用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc())))的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/242710

相关文章

Python使用Tenacity一行代码实现自动重试详解

《Python使用Tenacity一行代码实现自动重试详解》tenacity是一个专为Python设计的通用重试库,它的核心理念就是用简单、清晰的方式,为任何可能失败的操作添加重试能力,下面我们就来看... 目录一切始于一个简单的 API 调用Tenacity 入门:一行代码实现优雅重试精细控制:让重试按我

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Python获取浏览器Cookies的四种方式小结

《Python获取浏览器Cookies的四种方式小结》在进行Web应用程序测试和开发时,获取浏览器Cookies是一项重要任务,本文我们介绍四种用Python获取浏览器Cookies的方式,具有一定的... 目录什么是 Cookie?1.使用Selenium库获取浏览器Cookies2.使用浏览器开发者工具

Java获取当前时间String类型和Date类型方式

《Java获取当前时间String类型和Date类型方式》:本文主要介绍Java获取当前时间String类型和Date类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录Java获取当前时间String和Date类型String类型和Date类型输出结果总结Java获取

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

一文详解如何使用Java获取PDF页面信息

《一文详解如何使用Java获取PDF页面信息》了解PDF页面属性是我们在处理文档、内容提取、打印设置或页面重组等任务时不可或缺的一环,下面我们就来看看如何使用Java语言获取这些信息吧... 目录引言一、安装和引入PDF处理库引入依赖二、获取 PDF 页数三、获取页面尺寸(宽高)四、获取页面旋转角度五、判断