网站日志实时分析之Flink处理实时热门和PVUV统计

2024-09-06 20:38

本文主要是介绍网站日志实时分析之Flink处理实时热门和PVUV统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据

  • 使用map算子对数据进行预处理

  • 过滤数据,只留住pv数据

  • 使用timewindow,每隔10秒创建一个20秒的window

  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据

  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby

  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top

package com.ongbo.hotAnalysisimport java.sql.Timestamp
import java.util.Propertiesimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)object HotItems {def main(args: Array[String]): Unit = {//1:创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//设置为事件事件env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//2:读取数据/*kafka源*/val properties = new Properties()properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")properties.setProperty("group.id","web-consumer-group")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset","latest")val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv").map(data =>{System.out.println("data:"+data)val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L)//3:transform处理数据val processStream = dataStream//筛选出埋点pv数据.filter(_.behavior.equals("pv"))//先对itemID进行分组.keyBy(_.itemId)//然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口.timeWindow(Time.seconds(20), Time.seconds(10))//窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby.aggregate(new CountAgg(), new WindowResult()).keyBy(_.windowEnd)      //按照窗口分组.process(new TopNHotItems(10))//sink:输出数据processStream.print("processStream::")
//    dataStream.print()//执行env.execute("hot Items Job")}
}/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{//累加器初始值override def createAccumulator(): Long = 0//每来一次就加一override def add(in: UserBehavior, acc: Long): Long = acc+1//override def getResult(acc: Long): Long = accoverride def merge(acc: Long, acc1: Long): Long = acc + acc1
}//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))}
}//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {private var itemState: ListState[ItemViewCount] = _override def open(parameters: Configuration): Unit = {itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))}override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {//把每条数据存入状态列表itemState.add(value)//注册一个定时器ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}//定时器触发时,对所有的数据排序,并输出结果override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {//将所有state中的数据取出,放到一个list Buffer中val allItems: ListBuffer[ItemViewCount] = new ListBuffer()import scala.collection.JavaConversions._for(item <- itemState.get()){allItems += item}//按照点计量count大小排序,sortBy默认是升序,并且取前三个val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)//清空状态itemState.clear()//格式化输出排名结果val result : StringBuilder = new StringBuilderresult.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")//输出每一个商品信息for(i<- sortedItems.indices){val currentItem = sortedItems(i)result.append("No").append(i+1).append(":").append("  商品ID:").append(currentItem.itemId).append("  浏览量:").append(currentItem.count).append("\n")}result.append("============================\n")//控制输出频率Thread.sleep(1000)out.collect(result.toString())}
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{override def createAccumulator(): (Long, Int) = (0L,0)override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
package com.ongbo.NetWorkFlow_Analysisimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time/*
*定义输入数据的样例类*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)object PageVies {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile(resource.getPath).map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map(data => ("pv", 1)).keyBy(_._1).timeWindow(Time.hours(1)).sum(1)dataStream.print("pv count")env.execute("PV")}
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysisimport com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedisobject UvWithBloom {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)//用相对路径定义数据集val resource = getClass.getResource("/UserBehavior.csv")val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv").map(data =>{val dataArray = data.split(",")UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)}).assignAscendingTimestamps(_.timestamp * 1000L).filter(_.behavior.equals("pv")).map( data => ("dummyKey",data.userId)).keyBy(_._1).timeWindow(Time.hours(1)).trigger(new MyTrigger()).process(new UvCountWithBloom())dataStream.print()env.execute()}
}//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {//每来一条数据就直接触发窗口操作,并清空所有状态TriggerResult.FIRE_AND_PURGE}override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {// 定义Redis连接lazy val jedis = new Jedis("114.116.219.97",5000)//29位,也就是64Mlazy val bloom = new Bloom(1 << 29)override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {//位图的存储方式 , key是windowwen,value是位图val storeKey = context.window.getEnd.toStringvar count = 0L//把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)count = jedis.hget("count",storeKey).toLong}//用布隆过滤器判断当前用户是否已经存在val userId = elements.last._2.toStringval offset = bloom.hash(userId, 61)//定义一个标志位,判断Redis位图中有没有这一位val isExist = jedis.getbit(storeKey, offset)if(!isExist){//如果不存在位图对应位置变成1,count+1jedis.setbit(storeKey,offset,true)jedis.hset("count",storeKey,(count+1).toString)out.collect(UvCount(storeKey.toLong,count+1))}else{out.collect(UvCount(storeKey.toLong,count))}}
}class Bloom(size: Long) extends Serializable{//位图大小private val cap = if(size>0) size else 1 << 27//定义Hash函数def hash(value: String, seed: Int) : Long = {var result:Long = 0Lfor(i <- 0 until value.length){result = result * seed + value.charAt(i)}result & (cap-1)}
}

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于网站日志实时分析之Flink处理实时热门和PVUV统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143059

相关文章

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

解读GC日志中的各项指标用法

《解读GC日志中的各项指标用法》:本文主要介绍GC日志中的各项指标用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、基础 GC 日志格式(以 G1 为例)1. Minor GC 日志2. Full GC 日志二、关键指标解析1. GC 类型与触发原因2. 堆

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

在Linux终端中统计非二进制文件行数的实现方法

《在Linux终端中统计非二进制文件行数的实现方法》在Linux系统中,有时需要统计非二进制文件(如CSV、TXT文件)的行数,而不希望手动打开文件进行查看,例如,在处理大型日志文件、数据文件时,了解... 目录在linux终端中统计非二进制文件的行数技术背景实现步骤1. 使用wc命令2. 使用grep命令

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,

Java Stream的distinct去重原理分析

《JavaStream的distinct去重原理分析》Javastream中的distinct方法用于去除流中的重复元素,它返回一个包含过滤后唯一元素的新流,该方法会根据元素的hashcode和eq... 目录一、distinct 的基础用法与核心特性二、distinct 的底层实现原理1. 顺序流中的去重

关于MyISAM和InnoDB对比分析

《关于MyISAM和InnoDB对比分析》:本文主要介绍关于MyISAM和InnoDB对比分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录开篇:从交通规则看存储引擎选择理解存储引擎的基本概念技术原理对比1. 事务支持:ACID的守护者2. 锁机制:并发控制的艺

MySQL 打开binlog日志的方法及注意事项

《MySQL打开binlog日志的方法及注意事项》本文给大家介绍MySQL打开binlog日志的方法及注意事项,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、默认状态二、如何检查 binlog 状态三、如何开启 binlog3.1 临时开启(重启后失效)

使用Python和OpenCV库实现实时颜色识别系统

《使用Python和OpenCV库实现实时颜色识别系统》:本文主要介绍使用Python和OpenCV库实现的实时颜色识别系统,这个系统能够通过摄像头捕捉视频流,并在视频中指定区域内识别主要颜色(红... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间详解