网站用户行为分析项目之会话切割(四)=> 代码重构

2023-11-22 02:38

本文主要是介绍网站用户行为分析项目之会话切割(四)=> 代码重构,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 0x00 文章内容
  • 0x01 实现输出代码的重构
          • 1. 抽离输出代码
          • 2. 重构输出路径
          • 3. 重构输出文件类型
  • 0x02 校验结果
          • 1. 生成不同的输出文件类型
  • 0xFF 总结

0x00 文章内容

  1. 实现输出代码的重构
  2. 校验结果

0x01 实现输出代码的重构

1. 抽离输出代码

a. 因为SessionCutETL里的main方法写了比较多的代码,此时我们可以将第6步骤的输出代码进行抽离,全选,选中Refactor=>Extract=>Method
在这里插入图片描述
b. 我们这里选择第一个在这里插入图片描述
c. 填写方法名,点击OK
在这里插入图片描述
即可发现已经抽离出了方法。

2. 重构输出路径

a. 下面的代码中路径都是写死的,而且出现了共同的路径,我们可以进行统一

    val trackerLogOutputPath = "data/output/trackerLog"val trackerSessionOutputPath = "data/output/trackerSession"

修改如下:

    writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)private def writeOutputData(sc: SparkContext, baseOutputPath: String, parsedLogRDD: RDD[TrackerLog], cookieLabeledSessionRDD: RDD[TrackerSession]) = {val trackerLogOutputPath = s"${baseOutputPath}/trackerLog"val trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"

我们暂且这样重构先,此时需要重新执行一下代码,看一下改后是否能执行。

d. 执行验证,执行会报错,表示路径的文件已经存在,此时可以手动删除再执行,这里下一步是在代码中实现删除。

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory data/output/trackerLog already exists

e. 添加判断路径代码

    val fileSystem = FileSystem.get(sc.hadoopConfiguration)val path = new Path(trackerLogOutputPath)if (fileSystem.exists(path)) {fileSystem.delete(path, true)}

写完之后再抽离出去,取名deletePathIfExists方法。

f. 此时再重新执行代码,发现代码没有报错,也能得到想要的结果。

3. 重构输出文件类型

目前的情况是以Parquet的形式保存着,如果此时如果需求发生变化了,或者有其他格式的需求,如保存成TextFile格式。就要重新改代码了,如果需求又变了,或者写成其他组件,又要重新改,重新然后打包,重新然后上传,这样非常麻烦。试想,如果此时将需要修改的代码抽象成一个接口,就会大大方便了。

a. 本优化内容比较多,涉及两个类,代码量及过程比较多:
在这里插入图片描述
b. OutputComponent完整代码如下:

package com.shaonaiyi.sessionimport com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.avro.{AvroParquetOutputFormat, AvroWriteSupport}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD/*** @Auther: shaonaiyi@163.com* @Date: 2019/12/30 21:09* @Description: 抽象输出文件组件*/
trait OutputComponent {def writeOutputData(sc: SparkContext, baseOutputPath: String,parsedLogRDD: RDD[TrackerLog],cookieLabeledSessionRDD: RDD[TrackerSession]) = {deletePathIfExists(sc, baseOutputPath)}private def deletePathIfExists(sc: SparkContext, trackerLogOutputPath: String) = {val fileSystem = FileSystem.get(sc.hadoopConfiguration)val path = new Path(trackerLogOutputPath)if (fileSystem.exists(path)) {fileSystem.delete(path, true)}}}object OutputComponent {def fromOutputFileType(fileType: String) = {if (fileType.equals("parquet")) {new ParquetFileOutput} else {new TextFileOutput}}
}/*** 写Parquet格式文件*/
class ParquetFileOutput extends OutputComponent {override def writeOutputData(sc: SparkContext, baseOutputPath: String,parsedLogRDD: RDD[TrackerLog],cookieLabeledSessionRDD: RDD[TrackerSession]): Unit = {super.writeOutputData(sc, baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)//6、保存数据//6.1、保存TrackerLog,对应的是parsedLogRDDval trackerLogOutputPath = s"${baseOutputPath}/trackerLog"AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerLog.SCHEMA$)parsedLogRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerLogOutputPath,classOf[Void], classOf[TrackerLog], classOf[AvroParquetOutputFormat[TrackerLog]])//6.2、保存TrackerSession,对应的是cookieLabeledSessionRDDval trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"AvroWriteSupport.setSchema(sc.hadoopConfiguration, TrackerSession.SCHEMA$)cookieLabeledSessionRDD.map((null, _)).saveAsNewAPIHadoopFile(trackerSessionOutputPath,classOf[Void], classOf[TrackerSession], classOf[AvroParquetOutputFormat[TrackerSession]])}}/*** 写TextFile格式文件*/
class TextFileOutput extends OutputComponent {override def writeOutputData(sc: SparkContext, baseOutputPath: String,parsedLogRDD: RDD[TrackerLog],cookieLabeledSessionRDD: RDD[TrackerSession]): Unit = {super.writeOutputData(sc, baseOutputPath, parsedLogRDD, cookieLabeledSessionRDD)//6、保存数据//6.1、保存TrackerLog,对应的是parsedLogRDDval trackerLogOutputPath = s"${baseOutputPath}/trackerLog"parsedLogRDD.saveAsTextFile(trackerLogOutputPath)//6.2、保存TrackerSession,对应的是cookieLabeledSessionRDDval trackerSessionOutputPath = s"${baseOutputPath}/trackerSession"cookieLabeledSessionRDD.saveAsTextFile(trackerSessionOutputPath)}}

编写思路:先写OutputComponent接口,然后写ParquetFileOutput类继承OutputComponent,最后写一个伴生类OutputComponent以方便调用,最后进行代码优化,将deletePathIfExists抽离出来。

代码讲解:

  1. OutputComponent是一个Trait类型,属于接口抽象类,目的将输出格式接口化,原本是只有一个Parquet格式的,现在再添加了一个TextFile格式,实现的功能其实就是与输出Parquet格式的代码相类似。ParquetFileOutputTextFileOutput均继承此类,所以需要实现writeOutputData方法,写好之后,需要再写一个伴生类来调用,并且判断输入的是哪种类型;除此之外,还简化了deletePathIfExists方法,统一用super进行了了调用。
  2. saveAsNewAPIHadoopFile需要的参数是key-value类型,所以需要转成key-value先。

c. SessionCutETL完整代码如下:

package com.shaonaiyi.sessionimport com.shaonaiyi.spark.session.{TrackerLog, TrackerSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** @Auther: shaonaiyi@163.com* @Date: 2019/9/12 10:09* @Description: 会话切割的程序主入口*/
object SessionCutETL {private val logTypeSet = Set("pageview", "click")def main(args: Array[String]): Unit = {var conf = new SparkConf()conf.setAppName("SessionCutETL")conf.setMaster("local")conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")var sc = new SparkContext(conf)//网站域名标签数据,此处只是演示,其实可以存放在数据库里val domainLabelMap = Map("www.baidu.com" -> "level1","www.taobao.com" -> "level2","jd.com" -> "level3","youku.com" -> "level4")//广播val domainLabelMapB = sc.broadcast(domainLabelMap)//    sc.setLogLevel("ERROR")// 1、加载日志源数据val rawRDD: RDD[String] = sc.textFile("data/rawdata/visit_log.txt")
//    rawRDD.collect().foreach(println)//2、解析rawRDD中每一行日志源数据
//    val parsedLogRDD: RDD[Option[TrackerLog]] = rawRDD.map( line => RawLogParserUtil.parse(line))
//    val parsedLogRDD1: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line))val parsedLogRDD: RDD[TrackerLog] = rawRDD.flatMap( line => RawLogParserUtil.parse(line)).filter(trackerLog => logTypeSet.contains(trackerLog.getLogType.toString))
//    parsedLogRDD.collect().foreach(println)
//    parsedLogRDD1.collect().foreach(println)//3、按照cookie进行分组val cookieGroupRDD: RDD[(String, Iterable[TrackerLog])] = parsedLogRDD.groupBy(trackerLog => trackerLog.getCookie.toString)
//   cookieGroupRDD.collect().foreach(println)//4、按user进行分组val sessionRDD: RDD[(String, TrackerSession)] = cookieGroupRDD.flatMapValues { case iter =>//处理每个user的日志val processor = new OneUserTrackerLogsProcessor(iter.toArray)processor.buildSessions(domainLabelMapB.value)}//5、给会话的cookie打标签val cookieLabelRDD: RDD[(String, String)] = sc.textFile("data/cookie_label.txt").map { case line =>val temp = line.split("\\|")(temp(0), temp(1)) // (cookie, cookie_label)}val joinRDD: RDD[(String,(TrackerSession, Option[String]))] = sessionRDD.leftOuterJoin(cookieLabelRDD)val cookieLabeledSessionRDD: RDD[TrackerSession] = joinRDD.map {case (cookie, (session, cookieLabelOpt)) =>if (cookieLabelOpt.nonEmpty) {session.setCookieLabel(cookieLabelOpt.get)} else {session.setCookieLabel("-")}session}//text & parquetOutputComponent.fromOutputFileType("text").writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)sc.stop()}}

代码讲解:删除原先的输出格式的代码,用OutputComponent调用来实现。

0x02 校验结果

1. 生成不同的输出文件类型

a. 执行SessionCutETL类,查看文件的输出类型,此时生成了TextFile格式文件。
在这里插入图片描述
b. 修改此行代码的"text"为parquet,重新执行,查看结果,此时生成了Parquet格式文件。

    //text & parquetOutputComponent.fromOutputFileType("parquet").writeOutputData(sc,"data/output", parsedLogRDD, cookieLabeledSessionRDD)

在这里插入图片描述

0xFF 总结

  1. 代码重构是一项非常重要的技能,增强代码的可读性。
  2. 下一篇文章还会继续优化,请留意本博客,关注、评论、加点赞。
  3. 网站用户行为分析项目系列:
    网站用户行为分析项目之会话切割(一)
    网站用户行为分析项目之会话切割(二)
    网站用户行为分析项目之会话切割(三)
    网站用户行为分析项目之会话切割(四)
    网站用户行为分析项目之会话切割(五)
    网站用户行为分析项目之会话切割(六)

作者简介:邵奈一
全栈工程师、市场洞察者、专栏编辑
| 公众号 | 微信 | 微博 | CSDN | 简书 |

福利:
邵奈一的技术博客导航
邵奈一 原创不易,如转载请标明出处。


这篇关于网站用户行为分析项目之会话切割(四)=> 代码重构的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/406873

相关文章

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

vite搭建vue3项目的搭建步骤

《vite搭建vue3项目的搭建步骤》本文主要介绍了vite搭建vue3项目的搭建步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1.确保Nodejs环境2.使用vite-cli工具3.进入项目安装依赖1.确保Nodejs环境

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

idea+spring boot创建项目的搭建全过程

《idea+springboot创建项目的搭建全过程》SpringBoot是Spring社区发布的一个开源项目,旨在帮助开发者快速并且更简单的构建项目,:本文主要介绍idea+springb... 目录一.idea四种搭建方式1.Javaidea命名规范2JavaWebTomcat的安装一.明确tomcat

pycharm跑python项目易出错的问题总结

《pycharm跑python项目易出错的问题总结》:本文主要介绍pycharm跑python项目易出错问题的相关资料,当你在PyCharm中运行Python程序时遇到报错,可以按照以下步骤进行排... 1. 一定不要在pycharm终端里面创建环境安装别人的项目子模块等,有可能出现的问题就是你不报错都安装

Java集合之Iterator迭代器实现代码解析

《Java集合之Iterator迭代器实现代码解析》迭代器Iterator是Java集合框架中的一个核心接口,位于java.util包下,它定义了一种标准的元素访问机制,为各种集合类型提供了一种统一的... 目录一、什么是Iterator二、Iterator的核心方法三、基本使用示例四、Iterator的工

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

Spring Boot分层架构详解之从Controller到Service再到Mapper的完整流程(用户管理系统为例)

《SpringBoot分层架构详解之从Controller到Service再到Mapper的完整流程(用户管理系统为例)》本文将以一个实际案例(用户管理系统)为例,详细解析SpringBoot中Co... 目录引言:为什么学习Spring Boot分层架构?第一部分:Spring Boot的整体架构1.1

JS纯前端实现浏览器语音播报、朗读功能的完整代码

《JS纯前端实现浏览器语音播报、朗读功能的完整代码》在现代互联网的发展中,语音技术正逐渐成为改变用户体验的重要一环,下面:本文主要介绍JS纯前端实现浏览器语音播报、朗读功能的相关资料,文中通过代码... 目录一、朗读单条文本:① 语音自选参数,按钮控制语音:② 效果图:二、朗读多条文本:① 语音有默认值:②