[尚硅谷flink学习笔记] 实战案例TopN 问题

2024-04-10 07:28

本文主要是介绍[尚硅谷flink学习笔记] 实战案例TopN 问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  • 实时统计一段时间内的出现次数最多的水位。* 例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。* 我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。

文章目录

        • 全窗口
        • 优化

全窗口
package org.example.process;...
public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 实时统计一段时间内的出现次数最多的水位。* 例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。* 我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。*/SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor waterSensor, long l) {return waterSensor.getTs() * 1000;}}));/*** 最近10s=窗口长度 每5s输出=滑动步长* 思路一:使用hashmap存储数据*/sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> collector) throws Exception {HashMap<Integer, Integer> vcCounter = new HashMap<>();elements.forEach(r->{Integer vc = r.getVc();vcCounter.put(vc, vcCounter.getOrDefault(vc, 0) +1);});// 使用 list 进行排序ArrayList<Tuple2<Integer, Integer>> data = new ArrayList<>();vcCounter.forEach((k,v)->data.add(Tuple2.of(k,v)));// 降序data.sort(((o1, o2) -> o2.f1 - o1.f1));// 输出StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < Math.min(2, data.size()); i++) {Tuple2<Integer, Integer> tuple2 = data.get(i);stringBuilder.append("TOP").append(i+1).append(",").append("vc=").append(tuple2.f0).append(",count=").append(tuple2.f1).append("\r\n");}stringBuilder.append("窗口结束时间=").append(DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd hh:mm:ss.SSS")).append("================").append("\r\n");collector.collect(stringBuilder.toString());}}).print();env.execute();}}
优化
在上一小节的实现过程中,我们没有进行按键分区,直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为1,在实际应用中是要尽量避免的,所以Flink官方也并不推荐使用AllWindowedStream进行处理。另外,我们在全窗口函数中定义了HashMap来统计vc的出现次数,计算过程是要先收集齐所有数据、然后再逐一遍历更新HashMap,这显然不够高效。

基于这样的想法,我们可以从两个方面去做优化:一是对数据进行按键分区,分别统计vc的出现次数;二是进行增量聚合,得到结果最后再做排序输出。所以,我们可以使用增量聚合函数AggregateFunction进行浏览量的统计,然后结合ProcessWindowFunction排序输出来实现Top N的需求。
具体实现可以分成两步:先对每个vc统计出现次数,然后再将统计结果收集起来,排序输出最终结果。由于最后的排序还是基于每个时间窗口的,输出的统计结果中要包含窗口信息,我们可以输出包含了vc、出现次数(count)以及窗口结束时间的Tuple3。之后先按窗口结束时间分区,然后用KeyedProcessFunction来实现。
用KeyedProcessFunction来收集数据做排序,这时面对的是窗口聚合之后的数据流,而窗口已经不存在了;我们需要确保能够收集齐所有数据,所以应该在窗口结束时间基础上再“多等一会儿”。具体实现上,可以采用一个延迟触发的事件时间定时器。基于窗口的结束时间来设定延迟,其实并不需要等太久——因为我们是靠水位线的推进来触发定时器,而水位线的含义就是“之前的数据都到齐了”。所以我们只需要设置1毫秒的延迟,就一定可以保证这一点。
而在等待过程中,之前已经到达的数据应该缓存起来,我们这里用一个自定义的HashMap来进行存储,key为窗口的标记,value为List。之后每来一条数据,就把它添加到当前的HashMap中,并注册一个触发时间为窗口结束时间加1毫秒(windowEnd + 1)的定时器。待到水位线到达这个时间,定时器触发,我们可以保证当前窗口所有vc的统计结果Tuple3都到齐了;于是从HashMap中取出进行排序输出。

package org.example.process;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.example.bean.WaterSensor;
import org.example.utils.WaterSensorMapFunction;import java.time.Duration;
import java.util.*;public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("localhost", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));// 最近10秒= 窗口长度, 每5秒输出 = 滑动步长/*** TODO 思路二: 使用 KeyedProcessFunction实现* 1、按照vc做keyby,开窗,分别count*    ==》 增量聚合,计算 count*    ==》 全窗口,对计算结果 count值封装 ,  带上 窗口结束时间的 标签*          ==》 为了让同一个窗口时间范围的计算结果到一起去** 2、对同一个窗口范围的count值进行处理: 排序、取前N个*    =》 按照 windowEnd做keyby*    =》 使用process, 来一条调用一次,需要先存,分开存,用HashMap,key=windowEnd,value=List*      =》 使用定时器,对 存起来的结果 进行 排序、取前N个*/// 1. 按照 vc 分组、开窗、聚合(增量计算+全量打标签)//  开窗聚合后,就是普通的流,没有了窗口信息,需要自己打上窗口的标记 windowEndSingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS.keyBy(WaterSensor::getVc).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2. 按照窗口标签(窗口结束时间)keyby,保证同一个窗口时间范围的结果,到一起去。排序、取TopNwindowAgg.keyBy(r -> r.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下:* 第一个:输入类型 = 增量函数的输出  count值,Integer* 第二个:输出类型 = Tuple3(vc,count,windowEnd) ,带上 窗口结束时间 的标签* 第三个:key类型 , vc,Integer* 第四个:窗口类型*/public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {// 迭代器里面只有一条数据,next一次即可Integer count = elements.iterator().next();long windowEnd = context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {// 存不同窗口的 统计结果,key=windowEnd,value=list数据private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;// 要取的Top数量private int threshold;public TopN(int threshold) {this.threshold = threshold;dataListMap = new HashMap<>();}@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,只是一条数据,要排序,得到齐才行 ===》 存起来,不同窗口分开存// 1. 存到HashMap中Long windowEnd = value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含vc,不是该vc的第一条,直接添加到List中List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.add(value);} else {// 1.1 不包含vc,是该vc的第一条,需要初始化listList<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();dataList.add(value);dataListMap.put(windowEnd, dataList);}// 2. 注册一个定时器, windowEnd+1ms即可(// 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,只需要延迟1ms即可ctx.timerService().registerEventTimeTimer(windowEnd + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);// 定时器触发,同一个窗口范围的计算结果攒齐了,开始 排序、取TopNLong windowEnd = ctx.getCurrentKey();// 1. 排序List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {@Overridepublic int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {// 降序, 后 减 前return o2.f1 - o1.f1;}});// 2. 取TopNStringBuilder outStr = new StringBuilder();outStr.append("================================\n");// 遍历 排序后的 List,取出前 threshold 个, 考虑可能List不够2个的情况  ==》 List中元素的个数 和 2 取最小值for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);outStr.append("Top" + (i + 1) + "\n");outStr.append("vc=" + vcCount.f0 + "\n");outStr.append("count=" + vcCount.f1 + "\n");outStr.append("窗口结束时间=" + vcCount.f2 + "\n");outStr.append("================================\n");}// 用完的List,及时清理,节省资源dataList.clear();out.collect(outStr.toString());}}
}

这篇关于[尚硅谷flink学习笔记] 实战案例TopN 问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/890419

相关文章

精选20个好玩又实用的的Python实战项目(有图文代码)

《精选20个好玩又实用的的Python实战项目(有图文代码)》文章介绍了20个实用Python项目,涵盖游戏开发、工具应用、图像处理、机器学习等,使用Tkinter、PIL、OpenCV、Kivy等库... 目录① 猜字游戏② 闹钟③ 骰子模拟器④ 二维码⑤ 语言检测⑥ 加密和解密⑦ URL缩短⑧ 音乐播放

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)

《java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)》:本文主要介绍java中pdf模版填充表单踩坑的相关资料,OpenPDF、iText、PDFBox是三... 目录准备Pdf模版方法1:itextpdf7填充表单(1)加入依赖(2)代码(3)遇到的问题方法2:pd

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

kkFileView启动报错:报错2003端口占用的问题及解决

《kkFileView启动报错:报错2003端口占用的问题及解决》kkFileView启动报错因office组件2003端口未关闭,解决:查杀占用端口的进程,终止Java进程,使用shutdown.s... 目录原因解决总结kkFileViewjavascript启动报错启动office组件失败,请检查of

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例

《PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例》词嵌入解决NLP维度灾难,捕捉语义关系,PyTorch的nn.Embedding模块提供灵活实现,支持参数配置、预训练及变长... 目录一、词嵌入(Word Embedding)简介为什么需要词嵌入?二、PyTorch中的nn.Em

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束