Flink MapState的ConcurrentModificationException问题

2023-10-28 14:40

本文主要是介绍Flink MapState的ConcurrentModificationException问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

直接上代码

import com.alibaba.fastjson.JSON;
import com.tc.flink.analysis.label.bean.output.ItemIdWithAction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.HashMap;
import java.util.Map;public class CityItemIdClickedState extends RichMapFunction<Tuple2<ItemIdWithAction, Integer>, Tuple2<String, String>> {private transient MapState<ItemIdWithAction, Integer> map;public transient static final String CLICK_PREFIX_KEY = "cityClicked";public transient static final String CREATE_PREFIX_KEY = "cityCreated";@Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(60 * 2)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();MapStateDescriptor<ItemIdWithAction, Integer> descriptor = new MapStateDescriptor<ItemIdWithAction, Integer>("paln_click_num", ItemIdWithAction.class, Integer.class);descriptor.enableTimeToLive(ttlConfig);map = getRuntimeContext().getMapState(descriptor);super.open(parameters);}@Overridepublic Tuple2<String, String> map(Tuple2<ItemIdWithAction, Integer> keyValue) throws Exception {Integer num = keyValue.f1;ItemIdWithAction itemIdWithAction = keyValue.f0;if (num.equals(map.get(itemIdWithAction))) {return Tuple2.of(null, null);}map.put(itemIdWithAction, num);String prefixKey = itemIdWithAction.getAction().equals("click") ? CLICK_PREFIX_KEY : CREATE_PREFIX_KEY;String key = String.format("%s@%s@%s", prefixKey, itemIdWithAction.getStartCityId(), itemIdWithAction.getEndCityId());Map<String, Integer> valueMap = new HashMap<String, Integer>();for (ItemIdWithAction tmp : map.keys()) { //报错的异常点valueMap.put(tmp.getItemId(), map.get(tmp));}return Tuple2.of(key, JSON.toJSONString(valueMap));}
}

map是一条条处理,每次取所有MapState数据输出。
本地跑,集群跑都没问题,但当流量大MapState过大时候,就报如下错误,每天报错一两次,重启。

java.util.ConcurrentModificationExceptionat java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)at java.util.HashMap$EntryIterator.next(HashMap.java:1476)at java.util.HashMap$EntryIterator.next(HashMap.java:1474)at org.apache.flink.runtime.state.ttl.TtlMapState$EntriesIterator.hasNext(TtlMapState.java:161)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:42)at com.tc.flink.operator.state.CityItemIdClickedState.map(CityItemIdClickedState.java:14)at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)at java.lang.Thread.run(Thread.java:748)

报错原因在于 for (ItemIdWithAction tmp : map.keys()) mapstate被并发修改了。
比较奇怪的是map-function单线程处理,为什么出现ConcurrentModificationException
查看TtlMapState源码
在这里插入图片描述
originalIterator::remove是剔除动作。有点类似redis,当再次访问时候,才会触发剔除(有可能产生内存泄漏)。
但是map-funciton是key-by下单线程操作,为什么会出现并发问题。
再看TtlStateFactory类
在这里插入图片描述

确实异步删除,所以mapstate过大的时候,就会出现这种问题。
修改代码

      Iterator< Map.Entry<ItemAction,Integer>> mapIterator= map.iterator();while(mapIterator.hasNext()){Map.Entry<ItemIdWithAction,Integer>  entry= mapIterator.next();valueMap.put(entry.getKey().getItemId(),entry.getValue());}

犯了低级错误,不过也细读了源码

这篇关于Flink MapState的ConcurrentModificationException问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/293898

相关文章

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

kkFileView启动报错:报错2003端口占用的问题及解决

《kkFileView启动报错:报错2003端口占用的问题及解决》kkFileView启动报错因office组件2003端口未关闭,解决:查杀占用端口的进程,终止Java进程,使用shutdown.s... 目录原因解决总结kkFileViewjavascript启动报错启动office组件失败,请检查of

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

Python错误AttributeError: 'NoneType' object has no attribute问题的彻底解决方法

《Python错误AttributeError:NoneTypeobjecthasnoattribute问题的彻底解决方法》在Python项目开发和调试过程中,经常会碰到这样一个异常信息... 目录问题背景与概述错误解读:AttributeError: 'NoneType' object has no at

Spring的RedisTemplate的json反序列泛型丢失问题解决

《Spring的RedisTemplate的json反序列泛型丢失问题解决》本文主要介绍了SpringRedisTemplate中使用JSON序列化时泛型信息丢失的问题及其提出三种解决方案,可以根据性... 目录背景解决方案方案一方案二方案三总结背景在使用RedisTemplate操作redis时我们针对

Kotlin Map映射转换问题小结

《KotlinMap映射转换问题小结》文章介绍了Kotlin集合转换的多种方法,包括map(一对一转换)、mapIndexed(带索引)、mapNotNull(过滤null)、mapKeys/map... 目录Kotlin 集合转换:map、mapIndexed、mapNotNull、mapKeys、map

nginx中端口无权限的问题解决

《nginx中端口无权限的问题解决》当Nginx日志报错bind()to80failed(13:Permissiondenied)时,这通常是由于权限不足导致Nginx无法绑定到80端口,下面就来... 目录一、问题原因分析二、解决方案1. 以 root 权限运行 Nginx(不推荐)2. 为 Nginx

解决1093 - You can‘t specify target table报错问题及原因分析

《解决1093-Youcan‘tspecifytargettable报错问题及原因分析》MySQL1093错误因UPDATE/DELETE语句的FROM子句直接引用目标表或嵌套子查询导致,... 目录报js错原因分析具体原因解决办法方法一:使用临时表方法二:使用JOIN方法三:使用EXISTS示例总结报错原