网站日志实时分析之Flink处理实时热门和PVUV统计

2024-09-06 20:38

本文主要是介绍网站日志实时分析之Flink处理实时热门和PVUV统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据

  • 使用map算子对数据进行预处理

  • 过滤数据,只留住pv数据

  • 使用timewindow,每隔10秒创建一个20秒的window

  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据

  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby

  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top

package com.ongbo.hotAnalysisimport java.sql.Timestamp
import java.util.Propertiesimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)object HotItems {def main(args: Array[String]): Unit = {//1:创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//设置为事件事件env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//2:读取数据/*kafka源*/val properties = new Properties()properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")properties.setProperty("group.id","web-consumer-group")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset","latest")val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv").map(data =>{System.out.println("data:"+data)val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L)//3:transform处理数据val processStream = dataStream//筛选出埋点pv数据.filter(_.behavior.equals("pv"))//先对itemID进行分组.keyBy(_.itemId)//然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口.timeWindow(Time.seconds(20), Time.seconds(10))//窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby.aggregate(new CountAgg(), new WindowResult()).keyBy(_.windowEnd)      //按照窗口分组.process(new TopNHotItems(10))//sink:输出数据processStream.print("processStream::")
//    dataStream.print()//执行env.execute("hot Items Job")}
}/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{//累加器初始值override def createAccumulator(): Long = 0//每来一次就加一override def add(in: UserBehavior, acc: Long): Long = acc+1//override def getResult(acc: Long): Long = accoverride def merge(acc: Long, acc1: Long): Long = acc + acc1
}//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))}
}//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {private var itemState: ListState[ItemViewCount] = _override def open(parameters: Configuration): Unit = {itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))}override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {//把每条数据存入状态列表itemState.add(value)//注册一个定时器ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}//定时器触发时,对所有的数据排序,并输出结果override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {//将所有state中的数据取出,放到一个list Buffer中val allItems: ListBuffer[ItemViewCount] = new ListBuffer()import scala.collection.JavaConversions._for(item <- itemState.get()){allItems += item}//按照点计量count大小排序,sortBy默认是升序,并且取前三个val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)//清空状态itemState.clear()//格式化输出排名结果val result : StringBuilder = new StringBuilderresult.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")//输出每一个商品信息for(i<- sortedItems.indices){val currentItem = sortedItems(i)result.append("No").append(i+1).append(":").append("  商品ID:").append(currentItem.itemId).append("  浏览量:").append(currentItem.count).append("\n")}result.append("============================\n")//控制输出频率Thread.sleep(1000)out.collect(result.toString())}
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{override def createAccumulator(): (Long, Int) = (0L,0)override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
package com.ongbo.NetWorkFlow_Analysisimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)object PageVies {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile(resource.getPath).map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map(data => ("pv", 1)).keyBy(_._1).timeWindow(Time.hours(1)).sum(1)dataStream.print("pv count")env.execute("PV")}
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysisimport com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedisobject UvWithBloom {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv").map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map( data => ("dummyKey",data.userId)).keyBy(_._1).timeWindow(Time.hours(1)).trigger(new MyTrigger()).process(new UvCountWithBloom())dataStream.print()env.execute()}
}//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {//每来一条数据就直接触发窗口操作,并清空所有状态TriggerResult.FIRE_AND_PURGE}override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {// 定义Redis连接lazy val jedis = new Jedis("114.116.219.97",5000)//29位,也就是64Mlazy val bloom = new Bloom(1 << 29)override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {//位图的存储方式 , key是windowwen,value是位图val storeKey = context.window.getEnd.toStringvar count = 0L//把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)count = jedis.hget("count",storeKey).toLong}//用布隆过滤器判断当前用户是否已经存在val userId = elements.last._2.toStringval offset = bloom.hash(userId, 61)//定义一个标志位,判断Redis位图中有没有这一位val isExist = jedis.getbit(storeKey, offset)if(!isExist){//如果不存在位图对应位置变成1,count+1jedis.setbit(storeKey,offset,true)jedis.hset("count",storeKey,(count+1).toString)out.collect(UvCount(storeKey.toLong,count+1))}else{out.collect(UvCount(storeKey.toLong,count))}}
}class Bloom(size: Long) extends Serializable{//位图大小private val cap = if(size>0) size else 1 << 27//定义Hash函数def hash(value: String, seed: Int) : Long = {var result:Long = 0Lfor(i <- 0 until value.length){result = result * seed + value.charAt(i)}result & (cap-1)}
}

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于网站日志实时分析之Flink处理实时热门和PVUV统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143059

相关文章

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

Python主动抛出异常的各种用法和场景分析

《Python主动抛出异常的各种用法和场景分析》在Python中,我们不仅可以捕获和处理异常,还可以主动抛出异常,也就是以类的方式自定义错误的类型和提示信息,这在编程中非常有用,下面我将详细解释主动抛... 目录一、为什么要主动抛出异常?二、基本语法:raise关键字基本示例三、raise的多种用法1. 抛

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

Golang 日志处理和正则处理的操作方法

《Golang日志处理和正则处理的操作方法》:本文主要介绍Golang日志处理和正则处理的操作方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录1、logx日志处理1.1、logx简介1.2、日志初始化与配置1.3、常用方法1.4、配合defer

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

springboot加载不到nacos配置中心的配置问题处理

《springboot加载不到nacos配置中心的配置问题处理》:本文主要介绍springboot加载不到nacos配置中心的配置问题处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录springboot加载不到nacos配置中心的配置两种可能Spring Boot 版本Nacos

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Mysql的主从同步/复制的原理分析

《Mysql的主从同步/复制的原理分析》:本文主要介绍Mysql的主从同步/复制的原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录为什么要主从同步?mysql主从同步架构有哪些?Mysql主从复制的原理/整体流程级联复制架构为什么好?Mysql主从复制注意