Flink MapState的ConcurrentModificationException问题

2023-10-28 14:40

本文主要是介绍Flink MapState的ConcurrentModificationException问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

直接上代码

import com.alibaba.fastjson.JSON;
import com.tc.flink.analysis.label.bean.output.ItemIdWithAction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.HashMap;
import java.util.Map;public class CityItemIdClickedState extends RichMapFunction<Tuple2<ItemIdWithAction, Integer>, Tuple2<String, String>> {private transient MapState<ItemIdWithAction, Integer> map;public transient static final String CLICK_PREFIX_KEY = "cityClicked";public transient static final String CREATE_PREFIX_KEY = "cityCreated";@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(60 * 2)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();MapStateDescriptor<ItemIdWithAction, Integer> descriptor = new MapStateDescriptor<ItemIdWithAction, Integer>("paln_click_num", ItemIdWithAction.class, Integer.class);descriptor.enableTimeToLive(ttlConfig);map = getRuntimeContext().getMapState(descriptor);super.open(parameters);}@Overridepublic Tuple2<String, String> map(Tuple2<ItemIdWithAction, Integer> keyValue) throws Exception {Integer num = keyValue.f1;ItemIdWithAction itemIdWithAction = keyValue.f0;if (num.equals(map.get(itemIdWithAction))) {return Tuple2.of(null, null);}map.put(itemIdWithAction, num);String prefixKey = itemIdWithAction.getAction().equals("click") ? CLICK_PREFIX_KEY : CREATE_PREFIX_KEY;String key = String.format("%s@%s@%s", prefixKey, itemIdWithAction.getStartCityId(), itemIdWithAction.getEndCityId());Map<String, Integer> valueMap = new HashMap<String, Integer>();for (ItemIdWithAction tmp : map.keys()) { //报错的异常点valueMap.put(tmp.getItemId(), map.get(tmp));}return Tuple2.of(key, JSON.toJSONString(valueMap));}
}

map是一条条处理,每次取所有MapState数据输出。
本地跑,集群跑都没问题,但当流量大MapState过大时候,就报如下错误,每天报错一两次,重启。

java.util.ConcurrentModificationExceptionat java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)at java.util.HashMap$EntryIterator.next(HashMap.java:1476)at java.util.HashMap$EntryIterator.next(HashMap.java:1474)at org.apache.flink.runtime.state.ttl.TtlMapState$EntriesIterator.hasNext(TtlMapState.java:161)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:42)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:14)at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)at java.lang.Thread.run(Thread.java:748)

报错原因在于 for (ItemIdWithAction tmp : map.keys()) mapstate被并发修改了。
比较奇怪的是map-function单线程处理,为什么出现ConcurrentModificationException
查看TtlMapState源码
在这里插入图片描述
originalIterator::remove是剔除动作。有点类似redis,当再次访问时候,才会触发剔除(有可能产生内存泄漏)。
但是map-funciton是key-by下单线程操作,为什么会出现并发问题。
再看TtlStateFactory类
在这里插入图片描述

确实异步删除,所以mapstate过大的时候,就会出现这种问题。
修改代码

      Iterator< Map.Entry<ItemAction,Integer>> mapIterator= map.iterator();while(mapIterator.hasNext()){Map.Entry<ItemIdWithAction,Integer>  entry= mapIterator.next();valueMap.put(entry.getKey().getItemId(),entry.getValue());}

犯了低级错误,不过也细读了源码

这篇关于Flink MapState的ConcurrentModificationException问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293898

相关文章

线上Java OOM问题定位与解决方案超详细解析

《线上JavaOOM问题定位与解决方案超详细解析》OOM是JVM抛出的错误,表示内存分配失败,:本文主要介绍线上JavaOOM问题定位与解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一、OOM问题核心认知1.1 OOM定义与技术定位1.2 OOM常见类型及技术特征二、OOM问题定位工具

Vue3绑定props默认值问题

《Vue3绑定props默认值问题》使用Vue3的defineProps配合TypeScript的interface定义props类型,并通过withDefaults设置默认值,使组件能安全访问传入的... 目录前言步骤步骤1:使用 defineProps 定义 Props步骤2:设置默认值总结前言使用T

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

解决升级JDK报错:module java.base does not“opens java.lang.reflect“to unnamed module问题

《解决升级JDK报错:modulejava.basedoesnot“opensjava.lang.reflect“tounnamedmodule问题》SpringBoot启动错误源于Jav... 目录问题描述原因分析解决方案总结问题描述启动sprintboot时报以下错误原因分析编程异js常是由Ja

MySQL 表空却 ibd 文件过大的问题及解决方法

《MySQL表空却ibd文件过大的问题及解决方法》本文给大家介绍MySQL表空却ibd文件过大的问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录一、问题背景:表空却 “吃满” 磁盘的怪事二、问题复现:一步步编程还原异常场景1. 准备测试源表与数据

解决Nginx启动报错Job for nginx.service failed because the control process exited with error code问题

《解决Nginx启动报错Jobfornginx.servicefailedbecausethecontrolprocessexitedwitherrorcode问题》Nginx启... 目录一、报错如下二、解决原因三、解决方式总结一、报错如下Job for nginx.service failed bec

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

MySQ中出现幻读问题的解决过程

《MySQ中出现幻读问题的解决过程》文章解析MySQLInnoDB通过MVCC与间隙锁机制在可重复读隔离级别下解决幻读,确保事务一致性,同时指出性能影响及乐观锁等替代方案,帮助开发者优化数据库应用... 目录一、幻读的准确定义与核心特征幻读 vs 不可重复读二、mysql隔离级别深度解析各隔离级别的实现差异

C++ vector越界问题的完整解决方案

《C++vector越界问题的完整解决方案》在C++开发中,std::vector作为最常用的动态数组容器,其便捷性与性能优势使其成为处理可变长度数据的首选,然而,数组越界访问始终是威胁程序稳定性的... 目录引言一、vector越界的底层原理与危害1.1 越界访问的本质原因1.2 越界访问的实际危害二、基

Python多线程应用中的卡死问题优化方案指南

《Python多线程应用中的卡死问题优化方案指南》在利用Python语言开发某查询软件时,遇到了点击搜索按钮后软件卡死的问题,本文将简单分析一下出现的原因以及对应的优化方案,希望对大家有所帮助... 目录问题描述优化方案1. 网络请求优化2. 多线程架构优化3. 全局异常处理4. 配置管理优化优化效果1.