104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()

本文主要是介绍104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct(),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

代码

ConcatLongStringUDF.java

GroupConcatDistinctUDAF.java

AreaTop3ProductSpark.java


本篇文章记录各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()。

代码

spark.product

ConcatLongStringUDF.java

package graduation.java.spark.product;import org.apache.spark.sql.api.java.UDF3;/*** FileName: ConcatLongStringUDF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:27* Description:* 将两个字段拼接起来(使用指定的分隔符)*/
public class ConcatLongStringUDF implements UDF3<Long,String,String,String> {private static final  long serialVersionUID = 1L;@Overridepublic String call(Long v1, String v2, String split) throws Exception {return String.valueOf(v1) + split+v2;}
}

GroupConcatDistinctUDAF.java

package graduation.java.spark.product;import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;import java.util.Arrays;/*** FileName: GroupConcatDistinctUDAF* Author:   hadoop* Email:    3165845957@qq.com* Date:     19-4-1 下午8:39* Description:* 内部拼接去重函数(group_concat_distinct)*/
public class GroupConcatDistinctUDAF  extends UserDefinedAggregateFunction {//指定输入数据字段和类型private StructType inputScheam = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("cityInfo",DataTypes.StringType,true)));//指定缓冲数据的字段和类型private StructType bufferSchema = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bufferCityInfo",DataTypes.StringType,true)));//指定返回类型private DataType dataType = DataTypes.StringType;//指定是否是确定性private boolean deterministic = true;@Overridepublic StructType inputSchema() {return inputScheam;}@Overridepublic StructType bufferSchema() {return bufferSchema;}@Overridepublic DataType dataType() {return dataType;}@Overridepublic boolean deterministic() {return deterministic;}/*** 初始化,可以认为是在内部指定一个初始值* @param buffer*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0,"");}/*** 更新* 可以认为 一个一个的将组内的字段传递进来进行逻辑拼接* @param buffer* @param input*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {// 缓冲中的是已经拼接的城市信息String bufferCityInfo = buffer.getString(0);//传递进来的城市信息String cityInfo = input.getString(0);//实现逻辑拼接//判断:之前没有拼接过的某个城市信息,那么这里才可以把它拼接进去if (!bufferCityInfo.contains(cityInfo)){if ("".equals(bufferCityInfo)){bufferCityInfo += cityInfo;}else {bufferCityInfo += "," + cityInfo;}buffer.update(0,bufferCityInfo);}}/*** 合并* update操作,可能是针对一个分组内的部分数据,在某个节点上发生的* 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {String bufferCityInfo1 = buffer1.getString(0);String bufferCityInfo2 = buffer2.getString(0);for (String cityInfo : bufferCityInfo2.split(",")){if (! bufferCityInfo1.contains(cityInfo)){if ("".equals(bufferCityInfo1)){bufferCityInfo1 += cityInfo;}else {bufferCityInfo1 += ","+ cityInfo;}}}buffer1.update(0,bufferCityInfo1);}@Overridepublic Object evaluate(Row row) {return row.getString(0);}
}

AreaTop3ProductSpark.java

        //3.注册字符串拼接函数sqlContext.udf().register("concat_long_string",new ConcatLongStringUDF(),DataTypes.StringType);sqlContext.udf().register("group_concat_distinct",new GroupConcatDistinctUDAF());

 

这篇关于104.Spark大型电商项目-各区域热门商品统计-开发自定义UDAF聚合函数之group_concat_distinct()的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/624367

相关文章

Spring Boot项目打包和运行的操作方法

《SpringBoot项目打包和运行的操作方法》SpringBoot应用内嵌了Web服务器,所以基于SpringBoot开发的web应用也可以独立运行,无须部署到其他Web服务器中,下面以打包dem... 目录一、打包为JAR包并运行1.打包为可执行的 JAR 包2.运行 JAR 包二、打包为WAR包并运行

Nginx部署React项目时重定向循环问题的解决方案

《Nginx部署React项目时重定向循环问题的解决方案》Nginx在处理React项目请求时出现重定向循环,通常是由于`try_files`配置错误或`root`路径配置不当导致的,本文给大家详细介... 目录问题原因1. try_files 配置错误2. root 路径错误解决方法1. 检查 try_f

一文教你如何解决Python开发总是import出错的问题

《一文教你如何解决Python开发总是import出错的问题》经常朋友碰到Python开发的过程中import包报错的问题,所以本文将和大家介绍一下可编辑安装(EditableInstall)模式,可... 目录摘要1. 可编辑安装(Editable Install)模式到底在解决什么问题?2. 原理3.

Kotlin运算符重载函数及作用场景

《Kotlin运算符重载函数及作用场景》在Kotlin里,运算符重载函数允许为自定义类型重新定义现有的运算符(如+-…)行为,从而让自定义类型能像内置类型那样使用运算符,本文给大家介绍Kotlin运算... 目录基本语法作用场景类对象数据类型接口注意事项在 Kotlin 里,运算符重载函数允许为自定义类型重

Python+PyQt5开发一个Windows电脑启动项管理神器

《Python+PyQt5开发一个Windows电脑启动项管理神器》:本文主要介绍如何使用PyQt5开发一款颜值与功能并存的Windows启动项管理工具,不仅能查看/删除现有启动项,还能智能添加新... 目录开篇:为什么我们需要启动项管理工具功能全景图核心技术解析1. Windows注册表操作2. 启动文件

解决Maven项目报错:failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.13.0的问题

《解决Maven项目报错:failedtoexecutegoalorg.apache.maven.plugins:maven-compiler-plugin:3.13.0的问题》这篇文章主要介... 目录Maven项目报错:failed to execute goal org.apache.maven.pl

使用Python开发Markdown兼容公式格式转换工具

《使用Python开发Markdown兼容公式格式转换工具》在技术写作中我们经常遇到公式格式问题,例如MathML无法显示,LaTeX格式错乱等,所以本文我们将使用Python开发Markdown兼容... 目录一、工具背景二、环境配置(Windows 10/11)1. 创建conda环境2. 获取XSLT

Android开发环境配置避坑指南

《Android开发环境配置避坑指南》本文主要介绍了Android开发环境配置过程中遇到的问题及解决方案,包括VPN注意事项、工具版本统一、Gerrit邮箱配置、Git拉取和提交代码、MergevsR... 目录网络环境:VPN 注意事项工具版本统一:android Studio & JDKGerrit的邮

Python开发文字版随机事件游戏的项目实例

《Python开发文字版随机事件游戏的项目实例》随机事件游戏是一种通过生成不可预测的事件来增强游戏体验的类型,在这篇博文中,我们将使用Python开发一款文字版随机事件游戏,通过这个项目,读者不仅能够... 目录项目概述2.1 游戏概念2.2 游戏特色2.3 目标玩家群体技术选择与环境准备3.1 开发环境3

Pandas中统计汇总可视化函数plot()的使用

《Pandas中统计汇总可视化函数plot()的使用》Pandas提供了许多强大的数据处理和分析功能,其中plot()函数就是其可视化功能的一个重要组成部分,本文主要介绍了Pandas中统计汇总可视化... 目录一、plot()函数简介二、plot()函数的基本用法三、plot()函数的参数详解四、使用pl