spark的学习,lambda,map,filter,flatmap/按照字典表中的指定键或值排序

2023-12-20 23:08

本文主要是介绍spark的学习,lambda,map,filter,flatmap/按照字典表中的指定键或值排序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

spark的学习,lambda,map,filter,flatmap
重点:需要明白各个不同函数作用后剩下的数据的情况,是保留全部列,还是当前作用的列;其次,还需明白不同函数他们的映射条件,通常都是二值变量作为条件:

经典写法1:

 df_crawler_merged_name_err = df_crawler_merged.rdd.filter(lambda _: not _legal_check(_["name"])).map(lambda _: Row(name=_["name"]))

经典写法2:

 a_data = json.loads(a_str)if a_data and a_data.get("shareHolderList", []):a_b_l = a_data.get("shareHolderList", [])shareholderName_s = filter(lambda _: len(_) > 0, [_.get("shareholderName", "") for _ in a_b_l if _])return shareholderName_sreturn []

经典写法3
比较复杂

 news_path_data_rows = news_data_2_gz_rawRdd\.map(lambda _: (news_data_2_aPathPattern.findall(_), contentPattern.findall(_)))\.filter(lambda _: len(_)>1 and len(_[0]) > 0 and len(_[1]) > 0 and json.loads(_[1][0]).get("content"))\.map(lambda _: Row(pth=_[0][0].replace('newsAnalysis/', ""), content=json.loads(_[1][0]).get("content")))\.map(lambda _: Row(result=mapping_tag_by_content(_["content"], _["pth"])))\.filter(lambda _: len(_["result"]) > 1)

经典写法4:filter里面含多个条件,类似地,其他映射函数也可以;

df_eid_person_text = spark.read.text("hdfs://sc-bd-10:9000/scdata/huangyu/person_new.csv")
df_eid_person = df_eid_person_text\.rdd\.map(lambda _: Row(**clean_person_row(_["value"])))\.filter(lambda _: _["new_eid"] and person_is_legal(_["person_name"]) and not filter_inv_name(_["person_name"])).toDF()
df_eid_person.createOrReplaceTempView("eid_person_table")

上述filter里面含有多个条件:

filter(lambda _: _["new_eid"] and person_is_legal(_["person_name"]) and not filter_inv_name(_["person_name"]))

按照字典表中的指定键或值排序:

 classifies = sorted(result.items(), key=lambda _: _[1], reverse=True)

上式中lambda表达式逗号后面的列表组成一个判断的对象(变量列表),[.get(“shareholderName”, “”) for in a_b_l if ],表示变量,首先是for _ in a_b_l ,再是if ,再是.get(“shareholderName”, “”),最后为一个列表;

#!/usr/bin/env python
# encoding: utf-8import sysreload(sys)
sys.setdefaultencoding('utf8')def _parse_ent_status(a_str):if not a_str:return ""a_data = json.loads(a_str)if a_data and a_data.get("basicList", []):a_b_l = a_data.get("basicList", [])return a_b_l[0].get("enterpriseStatus", "")return ""def parse_ent_status(a_str):try:return _parse_ent_status(a_str)except:return ""def statistic_status():parse_ent_status_udf = udf(parse_ent_status, StringType())df_crawler_status = df_crawler_merged.withColumn("status", parse_ent_status_udf(df_crawler_merged["results"]))df_crawler_status.createOrReplaceTempView("crawler_status")spark.sql("select DISTINCT(status) from crawler_status").show(2000, False)def _parse_ent_regCapCur(a_str):regCapCur = ""if not a_str:return ""a_data = json.loads(a_str)if a_data and a_data.get("basicList", []):a_b_l = a_data.get("basicList", [])regCapCur = a_b_l[0].get("regCapCur", "")for a_share in a_data.get("shareHolderList", []):regCapCur = regCapCur or a_share.get("regCapCur", "")return regCapCurdef parse_ent_regCapCur(a_str):try:return _parse_ent_regCapCur(a_str)except:return ""def statistic_regCapCur():a_udf = udf(parse_ent_regCapCur, StringType())df_crawler_field = df_crawler_merged.withColumn("regCapCur", a_udf(df_crawler_merged["results"]))df_crawler_field.createOrReplaceTempView("crawler_regCapCur")spark.sql("select regCapCur, name from crawler_regCapCur where LENGTH(regCapCur) > 10 or regCapCur like '%.%' or regCapCur like '%0%'").show(2000, False)# spark.sql("select DISTINCT(regCapCur) from crawler_regCapCur").show(2000, False)def _parse_position(a_str):if not a_str:return ""a_data = json.loads(a_str)a_person_list = [u"监" , u"其" , u"董" , u"经", u"负责人", u"代表" , u"理事长", u"人", u"投资人", u"支局长", u"长", u"工程师",]if a_data and a_data.get("personList", []):a_b_l = a_data.get("personList", [])# return u"、".join(set([_.get("position", u"") for _ in a_b_l if _]))position_filter = u"、".join(set([_.get("position", u"") for _ in a_b_l if _]))# 统计过滤if any(map(lambda _: _ in position_filter, a_person_list)):return ""else:return position_filterreturn ""def parse_position(a_str):try:return _parse_position(a_str)except:return ""def statistic_position():# position_str_rdd = df_crawler_merged.rdd.map(lambda _: Row(position_str=parse_position(_["results"]), name=_["name"]))position_str_rdd = df_crawler_merged.rdd.map(lambda _: Row(name=_["name"], position_str=_parse_position(_["results"])))position_str_rdd.toDF().createOrReplaceTempView("position_all")spark.sql("select name from position_all where LENGTH(position_str) > 1 ").show(10000000, False)def _parse_share_name(a_str):if not a_str:return []a_data = json.loads(a_str)if a_data and a_data.get("shareHolderList", []):a_b_l = a_data.get("shareHolderList", [])shareholderName_s = filter(lambda _: len(_) > 0, [_.get("shareholderName", "") for _ in a_b_l if _])return shareholderName_sreturn []def statistic_share_name():# todo 需要继续做position_str_rdd = df_crawler_merged.rdd.map(lambda _: Row(share_name=_parse_share_name(_["results"]))).flatMap(lambda x: x["share_name"]).map(lambda _: Row(name=_))position_str_rdd.toDF().createOrReplaceTempView("share_name_table")# df_name_all = spark.sql("select _c0 as name from base_info_named_eid UNION select name from crawler_merged")df_name_all = spark.sql("select _c0 as name from base_info_named_eid")df_name_all.createOrReplaceTempView("name_all")# df_share_name = spark.sql("select t1.name from share_name_table t1 where t1.name in (SELECT t2.name from name_all t2) or LENGTH(t1.name) > 5")df_share_name = spark.sql("select t1.name from share_name_table t1 where t1.name in (SELECT t2.name from name_all t2) and LENGTH(t1.name)>4")print df_share_name.count()# print df_share_name.show(10000, False)def statistic_company_name():df_crawler_merged_name_err = df_crawler_merged.rdd.filter(lambda _: not _legal_check(_["name"])).map(lambda _: Row(name=_["name"]))print df_crawler_merged_name_err.distinct().toDF().show(10000, False)

python保存csv的代码:

def get_origin_content(file_path):with codecs.open(filename=file_path, mode="r", encoding="utf8") as f:try:data = json.loads(f.read())except Exception as e:data = ""return datadef label_save(save_path, content, sentence_lst):if not os.path.exists(save_path):os.makedirs(save_path)with codecs.open(save_path, "a", "utf8") as f:f.write(json.dumps({"content": content, "sentenceLst": sentence_lst}, ensure_ascii=False, indent=2))

这篇关于spark的学习,lambda,map,filter,flatmap/按照字典表中的指定键或值排序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/517858

相关文章

Java Map排序如何按照值按照键排序

《JavaMap排序如何按照值按照键排序》该文章主要介绍Java中三种Map(HashMap、LinkedHashMap、TreeMap)的默认排序行为及实现按键排序和按值排序的方法,每种方法结合实... 目录一、先理清 3 种 Map 的默认排序行为二、按「键」排序的实现方式1. 方式 1:用 TreeM

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Python 常用数据类型详解之字符串、列表、字典操作方法

《Python常用数据类型详解之字符串、列表、字典操作方法》在Python中,字符串、列表和字典是最常用的数据类型,它们在数据处理、程序设计和算法实现中扮演着重要角色,接下来通过本文给大家介绍这三种... 目录一、字符串(String)(一)创建字符串(二)字符串操作1. 字符串连接2. 字符串重复3. 字

Python中的sort方法、sorted函数与lambda表达式及用法详解

《Python中的sort方法、sorted函数与lambda表达式及用法详解》文章对比了Python中list.sort()与sorted()函数的区别,指出sort()原地排序返回None,sor... 目录1. sort()方法1.1 sort()方法1.2 基本语法和参数A. reverse参数B.

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

深入解析C++ 中std::map内存管理

《深入解析C++中std::map内存管理》文章详解C++std::map内存管理,指出clear()仅删除元素可能不释放底层内存,建议用swap()与空map交换以彻底释放,针对指针类型需手动de... 目录1️、基本清空std::map2️、使用 swap 彻底释放内存3️、map 中存储指针类型的对象

Unity新手入门学习殿堂级知识详细讲解(图文)

《Unity新手入门学习殿堂级知识详细讲解(图文)》Unity是一款跨平台游戏引擎,支持2D/3D及VR/AR开发,核心功能模块包括图形、音频、物理等,通过可视化编辑器与脚本扩展实现开发,项目结构含A... 目录入门概述什么是 UnityUnity引擎基础认知编辑器核心操作Unity 编辑器项目模式分类工程

Python中的filter() 函数的工作原理及应用技巧

《Python中的filter()函数的工作原理及应用技巧》Python的filter()函数用于筛选序列元素,返回迭代器,适合函数式编程,相比列表推导式,内存更优,尤其适用于大数据集,结合lamb... 目录前言一、基本概念基本语法二、使用方式1. 使用 lambda 函数2. 使用普通函数3. 使用 N