Delta lake with Java--数据增删改查

2024-05-04 15:20

本文主要是介绍Delta lake with Java--数据增删改查,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java结合真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

package detal.lake.java;import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;import java.text.SimpleDateFormat;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashMap;public class DeltaLakeCURD {//将字符串转换成java.sql.Timestamppublic static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {SimpleDateFormat sf = new SimpleDateFormat(dateFormat);java.util.Date date = null;try {date = sf.parse(strDate);} catch (Exception e) {e.printStackTrace();}java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());return dateSQL;}public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.databricks.delta.autoCompact.enabled", "true").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";String tableName = "taxidb.YellowTaxis";spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");//定义表DeltaTable.createIfNotExists(spark).tableName(tableName).addColumn("RideId","INT").addColumn("VendorId","INT").addColumn("PickupTime","TIMESTAMP").addColumn("DropTime","TIMESTAMP").location(savePath).execute();//加载csv数据并导入delta表var df=spark.read().format("delta").table(tableName);var schema=df.schema();System.out.println(schema.simpleString());var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);System.out.println("记录总行数:"+df_for_append.count());System.out.println("导入数据,开始时间"+  sdf.format(new Date()));df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);System.out.println("导入数据,结束时间" + sdf.format(new Date()));DeltaTable deltaTable = DeltaTable.forName(spark,tableName);//插入数据List<Row> list = new ArrayList<Row>();list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));StructType structType = DataTypes.createStructType(structFields);var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframeSystem.out.println("插入数据,开始时间"+  sdf.format(new Date()));yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);System.out.println("插入数据,结束时间"+  sdf.format(new Date()));System.out.println("插入后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);//更新数据System.out.println("更新前数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);System.out.println("更新数据,开始时间"+  sdf.format(new Date()));deltaTable.updateExpr("RideId = 999994",new HashMap<String, String>() {{put("VendorId", "250");}});System.out.println("更新数据,结束时间"+  sdf.format(new Date()));System.out.println("更新后数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);//查询数据System.out.println("查询数据,开始时间"+  sdf.format(new Date()));var selectDf= deltaTable.toDF().select("*").where("RideId=1");selectDf.show(false);System.out.println("查询数据,结束时间" + sdf.format(new Date()));//删除数据System.out.println("删除数据,开始时间"+  sdf.format(new Date()));deltaTable.delete("RideId=1");System.out.println("删除数据,结束时间"+  sdf.format(new Date()));deltaTable.toDF().select("*").where("RideId=1").show(false);}
}

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果

这篇关于Delta lake with Java--数据增删改查的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/959511

相关文章

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

HTTP 与 SpringBoot 参数提交与接收协议方式

《HTTP与SpringBoot参数提交与接收协议方式》HTTP参数提交方式包括URL查询、表单、JSON/XML、路径变量、头部、Cookie、GraphQL、WebSocket和SSE,依据... 目录HTTP 协议支持多种参数提交方式,主要取决于请求方法(Method)和内容类型(Content-Ty

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

Spring 依赖注入与循环依赖总结

《Spring依赖注入与循环依赖总结》这篇文章给大家介绍Spring依赖注入与循环依赖总结篇,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Spring 三级缓存解决循环依赖1. 创建UserService原始对象2. 将原始对象包装成工

Java中如何正确的停掉线程

《Java中如何正确的停掉线程》Java通过interrupt()通知线程停止而非强制,确保线程自主处理中断,避免数据损坏,线程池的shutdown()等待任务完成,shutdownNow()强制中断... 目录为什么不强制停止为什么 Java 不提供强制停止线程的能力呢?如何用interrupt停止线程s

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

SpringBoot路径映射配置的实现步骤

《SpringBoot路径映射配置的实现步骤》本文介绍了如何在SpringBoot项目中配置路径映射,使得除static目录外的资源可被访问,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一... 目录SpringBoot路径映射补:springboot 配置虚拟路径映射 @RequestMapp

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映