Flink MapState的ConcurrentModificationException问题

2023-10-28 14:40

本文主要是介绍Flink MapState的ConcurrentModificationException问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

直接上代码

import com.alibaba.fastjson.JSON;
import com.tc.flink.analysis.label.bean.output.ItemIdWithAction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.HashMap;
import java.util.Map;public class CityItemIdClickedState extends RichMapFunction<Tuple2<ItemIdWithAction, Integer>, Tuple2<String, String>> {private transient MapState<ItemIdWithAction, Integer> map;public transient static final String CLICK_PREFIX_KEY = "cityClicked";public transient static final String CREATE_PREFIX_KEY = "cityCreated";@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(60 * 2)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();MapStateDescriptor<ItemIdWithAction, Integer> descriptor = new MapStateDescriptor<ItemIdWithAction, Integer>("paln_click_num", ItemIdWithAction.class, Integer.class);descriptor.enableTimeToLive(ttlConfig);map = getRuntimeContext().getMapState(descriptor);super.open(parameters);}@Overridepublic Tuple2<String, String> map(Tuple2<ItemIdWithAction, Integer> keyValue) throws Exception {Integer num = keyValue.f1;ItemIdWithAction itemIdWithAction = keyValue.f0;if (num.equals(map.get(itemIdWithAction))) {return Tuple2.of(null, null);}map.put(itemIdWithAction, num);String prefixKey = itemIdWithAction.getAction().equals("click") ? CLICK_PREFIX_KEY : CREATE_PREFIX_KEY;String key = String.format("%s@%s@%s", prefixKey, itemIdWithAction.getStartCityId(), itemIdWithAction.getEndCityId());Map<String, Integer> valueMap = new HashMap<String, Integer>();for (ItemIdWithAction tmp : map.keys()) { //报错的异常点valueMap.put(tmp.getItemId(), map.get(tmp));}return Tuple2.of(key, JSON.toJSONString(valueMap));}
}

map是一条条处理,每次取所有MapState数据输出。
本地跑,集群跑都没问题,但当流量大MapState过大时候,就报如下错误,每天报错一两次,重启。

java.util.ConcurrentModificationExceptionat java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)at java.util.HashMap$EntryIterator.next(HashMap.java:1476)at java.util.HashMap$EntryIterator.next(HashMap.java:1474)at org.apache.flink.runtime.state.ttl.TtlMapState$EntriesIterator.hasNext(TtlMapState.java:161)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:42)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:14)at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)at java.lang.Thread.run(Thread.java:748)

报错原因在于 for (ItemIdWithAction tmp : map.keys()) mapstate被并发修改了。
比较奇怪的是map-function单线程处理,为什么出现ConcurrentModificationException
查看TtlMapState源码
在这里插入图片描述
originalIterator::remove是剔除动作。有点类似redis,当再次访问时候,才会触发剔除(有可能产生内存泄漏)。
但是map-funciton是key-by下单线程操作,为什么会出现并发问题。
再看TtlStateFactory类
在这里插入图片描述

确实异步删除,所以mapstate过大的时候,就会出现这种问题。
修改代码

      Iterator< Map.Entry<ItemAction,Integer>> mapIterator= map.iterator();while(mapIterator.hasNext()){Map.Entry<ItemIdWithAction,Integer>  entry= mapIterator.next();valueMap.put(entry.getKey().getItemId(),entry.getValue());}

犯了低级错误,不过也细读了源码

这篇关于Flink MapState的ConcurrentModificationException问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293898

相关文章

MySQL 设置AUTO_INCREMENT 无效的问题解决

《MySQL设置AUTO_INCREMENT无效的问题解决》本文主要介绍了MySQL设置AUTO_INCREMENT无效的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录快速设置mysql的auto_increment参数一、修改 AUTO_INCREMENT 的值。

关于跨域无效的问题及解决(java后端方案)

《关于跨域无效的问题及解决(java后端方案)》:本文主要介绍关于跨域无效的问题及解决(java后端方案),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录通用后端跨域方法1、@CrossOrigin 注解2、springboot2.0 实现WebMvcConfig

Go语言中泄漏缓冲区的问题解决

《Go语言中泄漏缓冲区的问题解决》缓冲区是一种常见的数据结构,常被用于在不同的并发单元之间传递数据,然而,若缓冲区使用不当,就可能引发泄漏缓冲区问题,本文就来介绍一下问题的解决,感兴趣的可以了解一下... 目录引言泄漏缓冲区的基本概念代码示例:泄漏缓冲区的产生项目场景:Web 服务器中的请求缓冲场景描述代码

Java死锁问题解决方案及示例详解

《Java死锁问题解决方案及示例详解》死锁是指两个或多个线程因争夺资源而相互等待,导致所有线程都无法继续执行的一种状态,本文给大家详细介绍了Java死锁问题解决方案详解及实践样例,需要的朋友可以参考下... 目录1、简述死锁的四个必要条件:2、死锁示例代码3、如何检测死锁?3.1 使用 jstack3.2

解决JSONField、JsonProperty不生效的问题

《解决JSONField、JsonProperty不生效的问题》:本文主要介绍解决JSONField、JsonProperty不生效的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录jsONField、JsonProperty不生效javascript问题排查总结JSONField

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

MySQL版本问题导致项目无法启动问题的解决方案

《MySQL版本问题导致项目无法启动问题的解决方案》本文记录了一次因MySQL版本不一致导致项目启动失败的经历,详细解析了连接错误的原因,并提供了两种解决方案:调整连接字符串禁用SSL或统一MySQL... 目录本地项目启动报错报错原因:解决方案第一个:第二种:容器启动mysql的坑两种修改时区的方法:本地

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

springboot加载不到nacos配置中心的配置问题处理

《springboot加载不到nacos配置中心的配置问题处理》:本文主要介绍springboot加载不到nacos配置中心的配置问题处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录springboot加载不到nacos配置中心的配置两种可能Spring Boot 版本Nacos

Java中JSON格式反序列化为Map且保证存取顺序一致的问题

《Java中JSON格式反序列化为Map且保证存取顺序一致的问题》:本文主要介绍Java中JSON格式反序列化为Map且保证存取顺序一致的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未... 目录背景问题解决方法总结背景做项目涉及两个微服务之间传数据时,需要提供方将Map类型的数据序列化为co