【Java】SparkRDD算子案例:统计出每一个省份广告被点击次数的TOP3

2023-11-11 04:20

本文主要是介绍【Java】SparkRDD算子案例:统计出每一个省份广告被点击次数的TOP3,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

题目描述

统计出每一个省份广告被点击次数的TOP3
假设这些信息都存储在一个文件里,并且该文件的格式如下,时间戳,省份,城市,用户,广告,中间字段使用空格分割。

构造样例数据

1684484483 省份1 北京 1001 鞋子
1684484483 省份1 上海 1002 衣服
1684484483 省份3 广州 1003 电脑
1684484483 省份4 深圳 1004 手机
1684484483 省份4 成都 1005 眼镜
1684484483 省份6 天津 1001 鞋子
1684484483 省份8 重庆 1002 衣服
1684484483 省份8 杭州 1003 电脑
1684484483 省份8 南京 1004 手机
1684484483 省份10 厦门 1005 眼镜
1684484483 省份1 北京 1001 鞋子
1684484483 省份1 上海 1002 衣服
1684484483 省份3 广州 1003 电脑
1684484483 省份4 深圳 1004 手机
1684484483 省份4 成都 1005 眼镜
1684484483 省份6 天津 1001 鞋子
1684484483 省份8 重庆 1002 衣服
1684484483 省份8 杭州 1003 电脑
1684484483 省份8 南京 1004 手机
1684484483 省份10 厦门 1005 眼镜
1684484483 省份1 北京 1001 鞋子
1684484483 省份1 上海 1002 衣服
1684484483 省份3 广州 1003 电脑
1684484483 省份4 深圳 1004 手机
1684484483 省份4 成都 1005 眼镜
1684484483 省份6 天津 1001 鞋子
1684484483 省份8 重庆 1002 衣服
1684484483 省份8 杭州 1003 电脑
1684484483 省份8 南京 1004 手机
1684484483 省份10 厦门 1005 眼镜

Java Spark代码实现

package T051801;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;public class AdClickTop3 {public static void main(String[] args) {// 创建SparkConf和JavaSparkContextSparkConf conf = new SparkConf();// 设置应用名称conf.setAppName("AdClickTop3");// 设置运行模式// local:表示在本地单机上以单线程模式运行// local[*]:表示在本地单机上以多线程模式运行,线程数由系统自动决定// spark://HOST:PORT:表示连接到指定的 Spark 集群运行// mesos://HOST:PORT:表示连接到指定的 Mesos 集群运行// yarn:表示在 YARN 集群上运行conf.setMaster("local[*]");// 创建 JavaSpark 上下文对象JavaSparkContext sc = new JavaSparkContext(conf);// 读取数据文件JavaRDD<String> fileRDD = sc.textFile("ad.txt");// 按照空格分割数据取得省份和广告 ((省份, 广告), 1)JavaPairRDD<Tuple2<String, String>, Integer> pairRDD = fileRDD.mapToPair(s -> new Tuple2<>(new Tuple2<>(s.split(" ")[1], s.split(" ")[4]), 1));// 计算点击数 ((省份, 广告), 点击数和)JavaPairRDD<Tuple2<String, String>, Integer> reduceRDD = pairRDD.reduceByKey(Integer::sum);// 转换 key 的结构 ((省份, 广告), 点击数和) => (省份, (广告, 点击数和))JavaPairRDD<String, Tuple2<String, Integer>> provinceAdClicksRDD = reduceRDD.mapToPair((PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>) tuple -> {String province = tuple._1()._1();String ad = tuple._1()._2();int clicks = tuple._2();return new Tuple2<>(province, new Tuple2<>(ad, clicks));});// 按照省份进行分组,将同一省份的元素放到同一个 Iterable 中JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> provinceAdClicksListRDD = provinceAdClicksRDD.groupByKey();// 获取每个城市 Top3 点击广告JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> topTwoAdsByProvinceRDD = provinceAdClicksListRDD.mapValues((Function<Iterable<Tuple2<String, Integer>>, Iterable<Tuple2<String, Integer>>>) tuple2s -> {ArrayList<Tuple2<String, Integer>> tuple2s1 = new ArrayList<>();for (Tuple2<String, Integer> next : tuple2s) {tuple2s1.add(next);}// 降序排列tuple2s1.sort((o1, o2) -> o2._2() - o1._2());ArrayList<Tuple2<String, Integer>> t = new ArrayList<>();Iterator<Tuple2<String, Integer>> iterator1 = tuple2s1.iterator();// 遍历前 Top3 点击广告添加到 Listint i = 0;while (iterator1.hasNext() & i < 3) {t.add(iterator1.next());i++;}return t;});// 计算结果收集到 ListList<Tuple2<String, Iterable<Tuple2<String, Integer>>>> data = topTwoAdsByProvinceRDD.collect();// 输出结果data.forEach(System.out::println);sc.stop();}}

结果验证

在这里插入图片描述

这篇关于【Java】SparkRDD算子案例:统计出每一个省份广告被点击次数的TOP3的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387525

相关文章

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

破茧 JDBC:MyBatis 在 Spring Boot 中的轻量实践指南

《破茧JDBC:MyBatis在SpringBoot中的轻量实践指南》MyBatis是持久层框架,简化JDBC开发,通过接口+XML/注解实现数据访问,动态代理生成实现类,支持增删改查及参数... 目录一、什么是 MyBATis二、 MyBatis 入门2.1、创建项目2.2、配置数据库连接字符串2.3、入

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

深度解析Spring Security 中的 SecurityFilterChain核心功能

《深度解析SpringSecurity中的SecurityFilterChain核心功能》SecurityFilterChain通过组件化配置、类型安全路径匹配、多链协同三大特性,重构了Spri... 目录Spring Security 中的SecurityFilterChain深度解析一、Security

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Apache Ignite 与 Spring Boot 集成详细指南

《ApacheIgnite与SpringBoot集成详细指南》ApacheIgnite官方指南详解如何通过SpringBootStarter扩展实现自动配置,支持厚/轻客户端模式,简化Ign... 目录 一、背景:为什么需要这个集成? 二、两种集成方式(对应两种客户端模型) 三、方式一:自动配置 Thick

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

Spring WebClient从入门到精通

《SpringWebClient从入门到精通》本文详解SpringWebClient非阻塞响应式特性及优势,涵盖核心API、实战应用与性能优化,对比RestTemplate,为微服务通信提供高效解决... 目录一、WebClient 概述1.1 为什么选择 WebClient?1.2 WebClient 与

Java.lang.InterruptedException被中止异常的原因及解决方案

《Java.lang.InterruptedException被中止异常的原因及解决方案》Java.lang.InterruptedException是线程被中断时抛出的异常,用于协作停止执行,常见于... 目录报错问题报错原因解决方法Java.lang.InterruptedException 是 Jav

深入浅出SpringBoot WebSocket构建实时应用全面指南

《深入浅出SpringBootWebSocket构建实时应用全面指南》WebSocket是一种在单个TCP连接上进行全双工通信的协议,这篇文章主要为大家详细介绍了SpringBoot如何集成WebS... 目录前言为什么需要 WebSocketWebSocket 是什么Spring Boot 如何简化 We