spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql

2024-08-22 22:18

本文主要是介绍spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章地址:http://www.haha174.top/article/details/253627
1.简介

Spark Streaming 强大的地方在于,可以于spark core 和spark sql 整合使用,之前已经通过transform foreachRDD 等算子看到了 如何将DStream 种的RDD 使用spark core 执行批处理操作。现在就来看看 如何将spark sql 和spark Streaming 整合起来操作

2.案例

每隔10秒 ,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3 热门的商品
下面给出

public class Top3HotProduct {public static void main(String[] args) throws InterruptedException {SparkConf conf=new SparkConf().setAppName("Top3HotProduct").setMaster("local[2]");JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(1));//首先看一下,输入日志的格式//leo  product1 category1//首先获取输入数据JavaReceiverInputDStream<String> lines=jssc.socketTextStream("www.codeguoj.cn",9999);JavaPairDStream<String,Integer> categoryProductDStream=lines.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {String[] prudoctSplited=s.split(" ");return new Tuple2<>(prudoctSplited[2]+"-"+prudoctSplited[1],1);}});//然后执行window//到这里,就可以做到,每隔10秒钟,对最近60秒的数据,执行reduceByKey  操作//计算出来这60秒内,每个种类的每个商品的点击次数JavaPairDStream<String,Integer> categoryProductDStreamed=categoryProductDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}},Durations.seconds(60),Durations.seconds(10));//然后针对60秒内的每个种类的每个商品的点击次数categoryProductDStreamed.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {@Overridepublic void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {JavaRDD<Row> rowCategoryCount=    stringIntegerJavaPairRDD.map(new Function<Tuple2<String, Integer>, Row>() {@Overridepublic Row call(Tuple2<String, Integer> v1) throws Exception {String category=v1._1.split("-")[0];String product=v1._1.split("-")[1];int count=v1._2;return RowFactory.create(category,product,count);}});//DataSet  转换List<StructField> structFields=new ArrayList<>();structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("click_count",DataTypes.IntegerType,true));StructType structType=DataTypes.createStructType(structFields);SQLContext sqlContext=new SQLContext(rowCategoryCount.context());Dataset cataCountDS=sqlContext.createDataFrame(rowCategoryCount,structType);//  将60秒内的数据创建一个零时表cataCountDS.registerTempTable("product_click_log");Dataset cataSearchDS=   sqlContext.sql("SELECT category,product,click_count "+ "FROM ("+ "SELECT "+ "category,"+ "product,"+ "click_count,"+ "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "+ "FROM product_click_log"+ ") tmp "+ "WHERE rank<=3");cataSearchDS.show();}});jssc.start();jssc.awaitTermination();jssc.stop();jssc.close();}
}

欢迎关注,更多惊喜等着你

这里写图片描述

这篇关于spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097544

相关文章

Mysql中设计数据表的过程解析

《Mysql中设计数据表的过程解析》数据库约束通过NOTNULL、UNIQUE、DEFAULT、主键和外键等规则保障数据完整性,自动校验数据,减少人工错误,提升数据一致性和业务逻辑严谨性,本文介绍My... 目录1.引言2.NOT NULL——制定某列不可以存储NULL值2.UNIQUE——保证某一列的每一

解密SQL查询语句执行的过程

《解密SQL查询语句执行的过程》文章讲解了SQL语句的执行流程,涵盖解析、优化、执行三个核心阶段,并介绍执行计划查看方法EXPLAIN,同时提出性能优化技巧如合理使用索引、避免SELECT*、JOIN... 目录1. SQL语句的基本结构2. SQL语句的执行过程3. SQL语句的执行计划4. 常见的性能优

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND

SQL Server安装时候没有中文选项的解决方法

《SQLServer安装时候没有中文选项的解决方法》用户安装SQLServer时界面全英文,无中文选项,通过修改安装设置中的国家或地区为中文中国,重启安装程序后界面恢复中文,解决了问题,对SQLSe... 你是不是在安装SQL Server时候发现安装界面和别人不同,并且无论如何都没有中文选项?这个问题也

2025版mysql8.0.41 winx64 手动安装详细教程

《2025版mysql8.0.41winx64手动安装详细教程》本文指导Windows系统下MySQL安装配置,包含解压、设置环境变量、my.ini配置、初始化密码获取、服务安装与手动启动等步骤,... 目录一、下载安装包二、配置环境变量三、安装配置四、启动 mysql 服务,修改密码一、下载安装包安装地

MySQL CTE (Common Table Expressions)示例全解析

《MySQLCTE(CommonTableExpressions)示例全解析》MySQL8.0引入CTE,支持递归查询,可创建临时命名结果集,提升复杂查询的可读性与维护性,适用于层次结构数据处... 目录基本语法CTE 主要特点非递归 CTE简单 CTE 示例多 CTE 示例递归 CTE基本递归 CTE 结

MySQL多实例管理如何在一台主机上运行多个mysql

《MySQL多实例管理如何在一台主机上运行多个mysql》文章详解了在Linux主机上通过二进制方式安装MySQL多实例的步骤,涵盖端口配置、数据目录准备、初始化与启动流程,以及排错方法,适用于构建读... 目录一、什么是mysql多实例二、二进制方式安装MySQL1.获取二进制代码包2.安装基础依赖3.清

详解MySQL中JSON数据类型用法及与传统JSON字符串对比

《详解MySQL中JSON数据类型用法及与传统JSON字符串对比》MySQL从5.7版本开始引入了JSON数据类型,专门用于存储JSON格式的数据,本文将为大家简单介绍一下MySQL中JSON数据类型... 目录前言基本用法jsON数据类型 vs 传统JSON字符串1. 存储方式2. 查询方式对比3. 索引

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分