Pyspark DataFrame常用操作函数和示例

2024-09-06 19:12

本文主要是介绍Pyspark DataFrame常用操作函数和示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

针对类型:pyspark.sql.dataframe.DataFrame

目录

1.打印前几行

1.1 show()函数

1.2 take()函数

2. 读取文件

2.1 spark.read.csv

3. 获取某行某列的值(具体值)

4.查看列名

5.修改列名

5.1 修改单个列名

5.2 修改多个列名

5.2.1 链式调用 withColumnRenamed 方法

5.2.2 使用 selectExpr 方法

6. pandas类型转化为pyspark  pandas

7.选择特定的列,创建一个新的 DataFrame

8.列表套字典格式转化为pyspark DataFrame

9. 根据某列或者某列进行去重

10. pyspark 的两个dataframe合并

11.查看 pyspark dataframe中某列为空的数量

12.删除 pyspark dataframe中 第一行数据

13.pyspark dataframe用空格拼接两列得到新的列

14.将pyspark dataframe 保存到集群(分片)

16.将pyspark dataframe 保存为csv

实际场景1

实际场景2


1.打印前几行

1.1 show()函数

  • show() 函数会将指定数量的行(默认是 20 行)转换为字符串并打印到控制台。
  • 无返回值,直接打印数据到控制台。

用法:

df.show()  # 默认显示前 20 行
df.show(10)  # 显示前 10 行

1.2 take()函数

  • 用于获取 DataFrame 的前 N 行数据,返回一个包含 Row 对象的列表。
  • 返回一个包含 Row 对象的列表。
  • 返回一个包含前 N 行数据的列表,每行数据以 Row 对象的形式存在。你可以通过索引访问这些行,并进一步处理它们。
rows = df.take(5)  # 获取前 5 行数据
for row in rows:print(row)

2. 读取文件

2.1 spark.read.csv

df = spark.read.csv(path, sep="\t", header=False, inferSchema=True).toDF('id','time','label','feature')
  • inferSchema=True: 让 Spark 自动推断 CSV 文件中各列的数据类型
  • toDF: 这是一个 DataFrame 方法,用于为 DataFrame 的列指定新的列名。

3. 获取某行某列的值(具体值)

直接获取 DataFrame 的特定行(例如第 562962 行)并不是一个高效的操作,因为 Spark 是

分布式计算框架,数据被分割并在多个节点上并行处理

# 获取第一行
first_row = df.first()# 获取 feature 列的值
first_row['feature_1']
# 获取前两行
rows = df.take(2)# 获取第二行
second_row = rows[1]# 获取 feature 列的值
second_row['feature']

4.查看列名

df.columns

5.修改列名

5.1 修改单个列名

# 修改列名
df_renamed = df.withColumnRenamed("name", "new_name")

5.2 修改多个列名

5.2.1 链式调用 withColumnRenamed 方法

# 修改多个列名
df_renamed = df.withColumnRenamed("id", "new_id").withColumnRenamed("name", "new_name")

5.2.2 使用 selectExpr 方法

注意:使用 selectExpr 方法时,最后只会得到你修改的列,即,在函数参数中的列名

如果想使用该方法时,还想要原来的列名,就直接, 在参数中加入,"原列名 as 原列名"

# 使用 selectExpr 修改列名
df_renamed = df.selectExpr("id as new_id", "name as new_name")

6. pandas类型转化为pyspark  pandas

pandas.core.frame.DataFrame 类型转化为 pyspark.sql.dataframe.DataFrame
# 将 Pandas DataFrame 转换为 PySpark DataFrame
pyspark_df = spark.createDataFrame(pandas_df)

7.选择特定的列,创建一个新的 DataFrame

# 选择某几列并创建新的 DataFrame
new_df = df.select("name", "age")

8.列表套字典格式转化为pyspark DataFrame

# 示例列表套字典
data = [{"name": "Alice", "age": 25, "id": 1},{"name": "Bob", "age": 30, "id": 2},{"name": "Cathy", "age": 35, "id": 3}
]# 将列表套字典转换为 PySpark DataFrame
df = spark.createDataFrame(data)# 显示 DataFrame
df.show()

9. 根据某列或者某列进行去重

duyuv3_1_df = duyuv3_1_df.dropDuplicates(['md5', 'time', 'label'])

10. pyspark 的两个dataframe合并

merged_v3_1_df = duyuv3_1_df.join(passid_md5_df, on=['md5'], how='left')

11.查看 pyspark dataframe中某列为空的数量

null_passid_count = merged_v3_1_df.filter(merged_v3_1_df['passid'].isNull()).count()
print(f"passid is null:{null_passid_count}")

12.删除 pyspark dataframe中 第一行数据

data_df = data_df.filter(col("_c0") != data_df.first()[0])
  • data_df.first(): 获取 DataFrame 的第一行数据。

  • col("_c0"): 获取 DataFrame 的第一列(默认情况下,Spark 会将 CSV 文件的列命名为 _c0_c1_c2, ...)。

  • data_df.filter(col("_c0") != data_df.first()[0]): 过滤掉第一行数据。这里假设第一行的第一列值与后续行的第一列值不同,因此通过比较第一列的值来过滤掉第一行。

13.pyspark dataframe用空格拼接两列得到新的列

# 拼接特征列replace_df = replace_df.withColumn('merged_feature',when(col('featurev3').isNotNull() & col('feature_v3_1').isNotNull(),concat_ws(' ', col('featurev3'), col('feature_v3_1'))).when(col('featurev3').isNotNull(), col('featurev3')).when(col('feature_v3_1').isNotNull(), col('feature_v3_1')).otherwise(lit('')))

14.将pyspark dataframe 保存到集群(分片)

save_path =f'afs://szth.afs.****.com:9902/user/fsi/duyuv3_1_feature/result_duyuv3_1/'
rdd_combined_duyuv3_1 = feature_cgc.rdd.map(lambda x: "\t".join(x))
rdd_combined_duyuv3_1.saveAsTextFile(save_path)

16.将pyspark dataframe 保存为csv

output_path = "afs://szth.afs.baidu.com:9902/user/fsi/tongweiwei/duyuv3_1_feature/data.csv"
final_df.write.csv(output_path, header=True, mode="overwrite")

实际场景1

对某列的值进行按照空格进行切割,然后在对切割后的数据判断冒号前面的字符串判断是否在某一个字符串中,如果在则去除掉

from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws, col, when, lit, udfdef filter_feature(feature_str, filter_list):parts = feature_str.split()filtered_parts = [part for part in parts if str(part.split(':')[0]) not in filter_list.split(',')]return ' '.join(filtered_parts)filter_feature_udf = udf(filter_feature, StringType())df = duyuv3_df.withColumn("featurev3", filter_feature_udf(col("featurev3"), lit(duyuv3_str)))

实际场景2

对某列的值,按照空格进行切割后,按照冒号前面的进行排序

from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws, col, when, lit, udfdef sort_by_number(value):# 将输入字符串按空格分割为列表value = value.strip().split(" ")value_list = []# 遍历列表中的每个元素,提取数字部分并排序for val in value:try:feat_num = int(val.split(":")[0])value_list.append(val)except:continuesorted_pairs = sorted(value_list, key=lambda x: int(x.split(":")[0]))return " ".join(sorted_pairs)sort_by_number_udf = udf(sort_by_number, StringType())feature_cgc = replace_df.withColumn("sorted_feat",sort_by_number_udf(replace_df["merged_feature"]))

这篇关于Pyspark DataFrame常用操作函数和示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142872

相关文章

Python位移操作和位运算的实现示例

《Python位移操作和位运算的实现示例》本文主要介绍了Python位移操作和位运算的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 位移操作1.1 左移操作 (<<)1.2 右移操作 (>>)注意事项:2. 位运算2.1

pandas中位数填充空值的实现示例

《pandas中位数填充空值的实现示例》中位数填充是一种简单而有效的方法,用于填充数据集中缺失的值,本文就来介绍一下pandas中位数填充空值的实现,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是中位数填充?为什么选择中位数填充?示例数据结果分析完整代码总结在数据分析和机器学习过程中,处理缺失数

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

Python的time模块一些常用功能(各种与时间相关的函数)

《Python的time模块一些常用功能(各种与时间相关的函数)》Python的time模块提供了各种与时间相关的函数,包括获取当前时间、处理时间间隔、执行时间测量等,:本文主要介绍Python的... 目录1. 获取当前时间2. 时间格式化3. 延时执行4. 时间戳运算5. 计算代码执行时间6. 转换为指

利用Python调试串口的示例代码

《利用Python调试串口的示例代码》在嵌入式开发、物联网设备调试过程中,串口通信是最基础的调试手段本文将带你用Python+ttkbootstrap打造一款高颜值、多功能的串口调试助手,需要的可以了... 目录概述:为什么需要专业的串口调试工具项目架构设计1.1 技术栈选型1.2 关键类说明1.3 线程模

Python ZIP文件操作技巧详解

《PythonZIP文件操作技巧详解》在数据处理和系统开发中,ZIP文件操作是开发者必须掌握的核心技能,Python标准库提供的zipfile模块以简洁的API和跨平台特性,成为处理ZIP文件的首选... 目录一、ZIP文件操作基础三板斧1.1 创建压缩包1.2 解压操作1.3 文件遍历与信息获取二、进阶技

Java中字符串转时间与时间转字符串的操作详解

《Java中字符串转时间与时间转字符串的操作详解》Java的java.time包提供了强大的日期和时间处理功能,通过DateTimeFormatter可以轻松地在日期时间对象和字符串之间进行转换,下面... 目录一、字符串转时间(一)使用预定义格式(二)自定义格式二、时间转字符串(一)使用预定义格式(二)自

Python正则表达式语法及re模块中的常用函数详解

《Python正则表达式语法及re模块中的常用函数详解》这篇文章主要给大家介绍了关于Python正则表达式语法及re模块中常用函数的相关资料,正则表达式是一种强大的字符串处理工具,可以用于匹配、切分、... 目录概念、作用和步骤语法re模块中的常用函数总结 概念、作用和步骤概念: 本身也是一个字符串,其中

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

usb接口驱动异常问题常用解决方案

《usb接口驱动异常问题常用解决方案》当遇到USB接口驱动异常时,可以通过多种方法来解决,其中主要就包括重装USB控制器、禁用USB选择性暂停设置、更新或安装新的主板驱动等... usb接口驱动异常怎么办,USB接口驱动异常是常见问题,通常由驱动损坏、系统更新冲突、硬件故障或电源管理设置导致。以下是常用解决