Flink通过滚动窗口达到滑动窗口目的 节省内存和CPU资源(背压)

2023-11-02 08:59

本文主要是介绍Flink通过滚动窗口达到滑动窗口目的 节省内存和CPU资源(背压),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink在实时处理滑动窗口数据时, 由于窗口时间长, 滑动较为频繁, 导致算子计算压力过大, 下游算子计算速度抵不上上游数据产生速度, 会出现背压现象.

需求: 统计6小时用户设备共同用户数, 每10min统计一次

公共类

@Data
@AllArgsConstructor
// flatMap转换对象
private static class UserDevice {private final String userId;private final String deviceId;
}@Data
// 用户设备统计结果
// 第一个map存放用户最新设备, 直接put覆盖, 取最新设备
// 第二个map存放设备对应用户, 因为要去重, 所以使用set存放
private static class UserDeviceSummary {private final Map<String, String> userDevices = new HashMap<>(60000); // (uid, did)private final Map<String, Set<String>> deviceUsers = new HashMap<>(60000); // (did, Set<uid>)
}

原算子 滑动窗口

dataStreamSource.flatMap((FlatMapFunction<JSONArray, UserDevice>) (array, collector) -> {try {array.forEach(e -> {JSONObject one = (JSONObject) e;// 只处理opay_show事件  app_name in ('opay', '1')if (one.containsKey("uid") && one.containsKey("did")) {collector.collect(new UserDevice(one.getString("uid"), one.getString("did")));}});} catch (Exception ignored) {}}).returns(TypeInformation.of(new TypeHint<UserDevice>() {})).name("Stream flat map").timeWindowAll(Time.hours(6), Time.minutes(10)) // 滑动窗口.allowedLateness(Time.minutes(1)).process(new ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>.Context context, Iterable<UserDevice> elements, Collector<UserDeviceSummary> out) throws Exception {UserDeviceSummary uds = new UserDeviceSummary();for (UserDevice ud : elements) {try {// 不用线程安全集合, 提升效率 由于并行度为1, 应该不会有并发uds.getUserDevices().put(ud.getUserId(), ud.getDeviceId());if (!uds.getDeviceUsers().containsKey(ud.getDeviceId())) {uds.getDeviceUsers().put(ud.getDeviceId(), new HashSet<>());}uds.getDeviceUsers().get(ud.getDeviceId()).add(ud.getUserId());} catch (Exception ignore) {}}out.collect(uds);}}).name("Process to Map").process(new ProcessFunction<UserDeviceSummary, Map<String, Integer>>() {@Overridepublic void processElement(UserDeviceSummary uds, ProcessFunction<UserDeviceSummary, Map<String, Integer>>.Context ctx, Collector<Map<String, Integer>> out) throws Exception {Map<String, Integer> result = new HashMap<>();for (String uid : uds.getUserDevices().keySet()) {try {int count = uds.getDeviceUsers().get(uds.getUserDevices().get(uid)).size();result.put(uid, count);} catch (Exception e) {System.out.println("Process for sink error: " + e.getMessage());}}out.collect(result);// 清空数据 协助gcuds.getUserDevices().clear();uds.getDeviceUsers().clear();result.clear();}}).name("User device calc").print();

开始运行正常, 随着时间的推移, 数据堆积越来越大, 滑动过程中, 最大会有6h / 10min = 36次并行计算, cpu压力比较大, 并行度只能为1
在这里插入图片描述

优化

使用滚动窗口替换滑动窗口, 既节省了内存, 也减少了cpu计算. 每10min滚动一次, 外部使用queue存储, 最大保存36个元素


private static final int SUMMARY_LIST_CAPACITY = 36;
// merge list中36个元素 生成一个新的元素, 输出到下游
private static UserDeviceSummary merge(List<UserDeviceSummary> list) {UserDeviceSummary result = list.get(0);// 此处最好应该添加summary时间, 避免长时间没数据流入导致数据错误int length = Math.min(list.size(), SUMMARY_LIST_CAPACITY);System.out.println("Merge tumbling summary: " + length);for (int i = 1; i < length; i++) {UserDeviceSummary current = list.get(i);result.getUserDevices().putAll(current.getUserDevices());current.getDeviceUsers().forEach((key, value) -> result.getDeviceUsers().merge(key, value, (s1, s2) -> {s1.addAll(s2);return s1;}));}return result;
}
List<UserDeviceSummary> list = new LinkedList<>();dataStreamSource.flatMap((FlatMapFunction<JSONArray, UserDevice>) (array, collector) -> {try {array.forEach(e -> {JSONObject one = (JSONObject) e;// 只处理opay_show事件  app_name in ('opay', '1')if (one.containsKey("uid") && one.containsKey("did")) {collector.collect(new UserDevice(one.getString("uid"), one.getString("did")));}});} catch (Exception ignored) {}}).returns(TypeInformation.of(new TypeHint<UserDevice>() {})).name("Stream flat map").timeWindowAll(Time.minutes(10)) // 使用滚动窗口代替滑动窗口, 节省资源.process(new ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<UserDevice, UserDeviceSummary, TimeWindow>.Context context, Iterable<UserDevice> elements, Collector<UserDeviceSummary> out) throws Exception {UserDeviceSummary uds = new UserDeviceSummary();for (UserDevice ud : elements) {try {// 不用线程安全集合, 提升效率uds.getUserDevices().put(ud.getUserId(), ud.getDeviceId());if (!uds.getDeviceUsers().containsKey(ud.getDeviceId())) {uds.getDeviceUsers().put(ud.getDeviceId(), new HashSet<>());}uds.getDeviceUsers().get(ud.getDeviceId()).add(ud.getUserId());} catch (Exception ignore) {}}list.add(uds);if (list.size() > SUMMARY_LIST_CAPACITY) {list.remove(0);}out.collect(merge(list));}}).name("Process to Map").process(new ProcessFunction<UserDeviceSummary, Map<String, Integer>>() {@Overridepublic void processElement(UserDeviceSummary uds, ProcessFunction<UserDeviceSummary, Map<String, Integer>>.Context ctx, Collector<Map<String, Integer>> out) throws Exception {Map<String, Integer> result = new HashMap<>();for (String uid : uds.getUserDevices().keySet()) {try {int count = uds.getDeviceUsers().get(uds.getUserDevices().get(uid)).size();result.put(uid, count);} catch (Exception e) {System.out.println("Process for sink error: " + e.getMessage());}}out.collect(result);uds.getUserDevices().clear();uds.getDeviceUsers().clear();result.clear();}}).name("User device calc").print();

再次部署, 服务运行正常!

这篇关于Flink通过滚动窗口达到滑动窗口目的 节省内存和CPU资源(背压)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/329526

相关文章

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

最新Spring Security的基于内存用户认证方式

《最新SpringSecurity的基于内存用户认证方式》本文讲解SpringSecurity内存认证配置,适用于开发、测试等场景,通过代码创建用户及权限管理,支持密码加密,虽简单但不持久化,生产环... 目录1. 前言2. 因何选择内存认证?3. 基础配置实战❶ 创建Spring Security配置文件

java内存泄漏排查过程及解决

《java内存泄漏排查过程及解决》公司某服务内存持续增长,疑似内存泄漏,未触发OOM,排查方法包括检查JVM配置、分析GC执行状态、导出堆内存快照并用IDEAProfiler工具定位大对象及代码... 目录内存泄漏内存问题排查1.查看JVM内存配置2.分析gc是否正常执行3.导出 dump 各种工具分析4.

Linux进程CPU绑定优化与实践过程

《Linux进程CPU绑定优化与实践过程》Linux支持进程绑定至特定CPU核心,通过sched_setaffinity系统调用和taskset工具实现,优化缓存效率与上下文切换,提升多核计算性能,适... 目录1. 多核处理器及并行计算概念1.1 多核处理器架构概述1.2 并行计算的含义及重要性1.3 并

PostgreSQL中rank()窗口函数实用指南与示例

《PostgreSQL中rank()窗口函数实用指南与示例》在数据分析和数据库管理中,经常需要对数据进行排名操作,PostgreSQL提供了强大的窗口函数rank(),可以方便地对结果集中的行进行排名... 目录一、rank()函数简介二、基础示例:部门内员工薪资排名示例数据排名查询三、高级应用示例1. 每

Linux下进程的CPU配置与线程绑定过程

《Linux下进程的CPU配置与线程绑定过程》本文介绍Linux系统中基于进程和线程的CPU配置方法,通过taskset命令和pthread库调整亲和力,将进程/线程绑定到特定CPU核心以优化资源分配... 目录1 基于进程的CPU配置1.1 对CPU亲和力的配置1.2 绑定进程到指定CPU核上运行2 基于

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空

C++中RAII资源获取即初始化

《C++中RAII资源获取即初始化》RAII通过构造/析构自动管理资源生命周期,确保安全释放,本文就来介绍一下C++中的RAII技术及其应用,具有一定的参考价值,感兴趣的可以了解一下... 目录一、核心原理与机制二、标准库中的RAII实现三、自定义RAII类设计原则四、常见应用场景1. 内存管理2. 文件操

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM