用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc())))

本文主要是介绍用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc()))),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

数据:

1,11,111
2,22,222
3,33,333
1,22,333
1,22,444

代码:

package com.emg.etp.analysis.preproces.nullphotoimport com.emg.etp.analysis.preproces.nullphoto.pojo.EcarData
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkStrategies
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBuffer/*** @Auther: sss* @Date: 2020/7/21 16:20* @Description:*/
object Tests {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("etpProcess").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array[Class[_]](EcarData.getClass))val spark = SparkSession.builder().config(conf).getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._val rdd = sc.textFile("C:\\Users\\sss\\Desktop\\qqq\\aa.txt")val win = Window.partitionBy("id1").orderBy(desc("id3"))val rdd2 = rdd.map(line => {val data = line.split(",", -1)(data(0), data(1), data(2))})val data = rdd2.toDF("id1", "id2", "id3").withColumn("aa", lag("id3", 1).over(win))data.show()val df_difftime = data.withColumn("diff", when(isnull(col("id3") - col("aa")), 0).otherwise((col("id3") - col("aa"))))df_difftime.show()}
}

 结果:

 

这篇关于用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc())))的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/242710

相关文章

SpringMVC高效获取JavaBean对象指南

《SpringMVC高效获取JavaBean对象指南》SpringMVC通过数据绑定自动将请求参数映射到JavaBean,支持表单、URL及JSON数据,需用@ModelAttribute、@Requ... 目录Spring MVC 获取 JavaBean 对象指南核心机制:数据绑定实现步骤1. 定义 Ja

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

C++中RAII资源获取即初始化

《C++中RAII资源获取即初始化》RAII通过构造/析构自动管理资源生命周期,确保安全释放,本文就来介绍一下C++中的RAII技术及其应用,具有一定的参考价值,感兴趣的可以了解一下... 目录一、核心原理与机制二、标准库中的RAII实现三、自定义RAII类设计原则四、常见应用场景1. 内存管理2. 文件操

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

使用Python实现获取屏幕像素颜色值

《使用Python实现获取屏幕像素颜色值》这篇文章主要为大家详细介绍了如何使用Python实现获取屏幕像素颜色值,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 一、一个小工具,按住F10键,颜色值会跟着显示。完整代码import tkinter as tkimport pyau

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

python获取cmd环境变量值的实现代码

《python获取cmd环境变量值的实现代码》:本文主要介绍在Python中获取命令行(cmd)环境变量的值,可以使用标准库中的os模块,需要的朋友可以参考下... 前言全局说明在执行py过程中,总要使用到系统环境变量一、说明1.1 环境:Windows 11 家庭版 24H2 26100.4061

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据