Flink MapState的ConcurrentModificationException问题

2023-10-28 14:40

本文主要是介绍Flink MapState的ConcurrentModificationException问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

直接上代码

import com.alibaba.fastjson.JSON;
import com.tc.flink.analysis.label.bean.output.ItemIdWithAction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.HashMap;
import java.util.Map;public class CityItemIdClickedState extends RichMapFunction<Tuple2<ItemIdWithAction, Integer>, Tuple2<String, String>> {private transient MapState<ItemIdWithAction, Integer> map;public transient static final String CLICK_PREFIX_KEY = "cityClicked";public transient static final String CREATE_PREFIX_KEY = "cityCreated";@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(60 * 2)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();MapStateDescriptor<ItemIdWithAction, Integer> descriptor = new MapStateDescriptor<ItemIdWithAction, Integer>("paln_click_num", ItemIdWithAction.class, Integer.class);descriptor.enableTimeToLive(ttlConfig);map = getRuntimeContext().getMapState(descriptor);super.open(parameters);}@Overridepublic Tuple2<String, String> map(Tuple2<ItemIdWithAction, Integer> keyValue) throws Exception {Integer num = keyValue.f1;ItemIdWithAction itemIdWithAction = keyValue.f0;if (num.equals(map.get(itemIdWithAction))) {return Tuple2.of(null, null);}map.put(itemIdWithAction, num);String prefixKey = itemIdWithAction.getAction().equals("click") ? CLICK_PREFIX_KEY : CREATE_PREFIX_KEY;String key = String.format("%s@%s@%s", prefixKey, itemIdWithAction.getStartCityId(), itemIdWithAction.getEndCityId());Map<String, Integer> valueMap = new HashMap<String, Integer>();for (ItemIdWithAction tmp : map.keys()) { //报错的异常点valueMap.put(tmp.getItemId(), map.get(tmp));}return Tuple2.of(key, JSON.toJSONString(valueMap));}
}

map是一条条处理,每次取所有MapState数据输出。
本地跑,集群跑都没问题,但当流量大MapState过大时候,就报如下错误,每天报错一两次,重启。

java.util.ConcurrentModificationExceptionat java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)at java.util.HashMap$EntryIterator.next(HashMap.java:1476)at java.util.HashMap$EntryIterator.next(HashMap.java:1474)at org.apache.flink.runtime.state.ttl.TtlMapState$EntriesIterator.hasNext(TtlMapState.java:161)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:42)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:14)at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)at java.lang.Thread.run(Thread.java:748)

报错原因在于 for (ItemIdWithAction tmp : map.keys()) mapstate被并发修改了。
比较奇怪的是map-function单线程处理,为什么出现ConcurrentModificationException
查看TtlMapState源码
在这里插入图片描述
originalIterator::remove是剔除动作。有点类似redis,当再次访问时候,才会触发剔除(有可能产生内存泄漏)。
但是map-funciton是key-by下单线程操作,为什么会出现并发问题。
再看TtlStateFactory类
在这里插入图片描述

确实异步删除,所以mapstate过大的时候,就会出现这种问题。
修改代码

      Iterator< Map.Entry<ItemAction,Integer>> mapIterator= map.iterator();while(mapIterator.hasNext()){Map.Entry<ItemIdWithAction,Integer>  entry= mapIterator.next();valueMap.put(entry.getKey().getItemId(),entry.getValue());}

犯了低级错误,不过也细读了源码

这篇关于Flink MapState的ConcurrentModificationException问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293898

相关文章

解决IDEA报错:编码GBK的不可映射字符问题

《解决IDEA报错:编码GBK的不可映射字符问题》:本文主要介绍解决IDEA报错:编码GBK的不可映射字符问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录IDEA报错:编码GBK的不可映射字符终端软件问题描述原因分析解决方案方法1:将命令改为方法2:右下jav

MyBatis模糊查询报错:ParserException: not supported.pos 问题解决

《MyBatis模糊查询报错:ParserException:notsupported.pos问题解决》本文主要介绍了MyBatis模糊查询报错:ParserException:notsuppo... 目录问题描述问题根源错误SQL解析逻辑深层原因分析三种解决方案方案一:使用CONCAT函数(推荐)方案二:

Redis 热 key 和大 key 问题小结

《Redis热key和大key问题小结》:本文主要介绍Redis热key和大key问题小结,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、什么是 Redis 热 key?热 key(Hot Key)定义: 热 key 常见表现:热 key 的风险:二、

IntelliJ IDEA 中配置 Spring MVC 环境的详细步骤及问题解决

《IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决》:本文主要介绍IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决,本文分步骤结合实例给大... 目录步骤 1:创建 Maven Web 项目步骤 2:添加 Spring MVC 依赖1、保存后执行2、将新的依赖

Spring 中的循环引用问题解决方法

《Spring中的循环引用问题解决方法》:本文主要介绍Spring中的循环引用问题解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录什么是循环引用?循环依赖三级缓存解决循环依赖二级缓存三级缓存本章来聊聊Spring 中的循环引用问题该如何解决。这里聊

Spring Boot中JSON数值溢出问题从报错到优雅解决办法

《SpringBoot中JSON数值溢出问题从报错到优雅解决办法》:本文主要介绍SpringBoot中JSON数值溢出问题从报错到优雅的解决办法,通过修改字段类型为Long、添加全局异常处理和... 目录一、问题背景:为什么我的接口突然报错了?二、为什么会发生这个错误?1. Java 数据类型的“容量”限制

关于MongoDB图片URL存储异常问题以及解决

《关于MongoDB图片URL存储异常问题以及解决》:本文主要介绍关于MongoDB图片URL存储异常问题以及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录MongoDB图片URL存储异常问题项目场景问题描述原因分析解决方案预防措施js总结MongoDB图

SpringBoot项目中报错The field screenShot exceeds its maximum permitted size of 1048576 bytes.的问题及解决

《SpringBoot项目中报错ThefieldscreenShotexceedsitsmaximumpermittedsizeof1048576bytes.的问题及解决》这篇文章... 目录项目场景问题描述原因分析解决方案总结项目场景javascript提示:项目相关背景:项目场景:基于Spring

解决Maven项目idea找不到本地仓库jar包问题以及使用mvn install:install-file

《解决Maven项目idea找不到本地仓库jar包问题以及使用mvninstall:install-file》:本文主要介绍解决Maven项目idea找不到本地仓库jar包问题以及使用mvnin... 目录Maven项目idea找不到本地仓库jar包以及使用mvn install:install-file基

usb接口驱动异常问题常用解决方案

《usb接口驱动异常问题常用解决方案》当遇到USB接口驱动异常时,可以通过多种方法来解决,其中主要就包括重装USB控制器、禁用USB选择性暂停设置、更新或安装新的主板驱动等... usb接口驱动异常怎么办,USB接口驱动异常是常见问题,通常由驱动损坏、系统更新冲突、硬件故障或电源管理设置导致。以下是常用解决