flink 状态参数设置

2024-08-26 01:12
文章标签 参数设置 状态 flink

本文主要是介绍flink 状态参数设置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前提

代码示例,通过flink消费kafka,查看list状态中的数据,确定参数的具体含义
kafka的代码:发送两个key值,一秒发送一次

	for(int i = 0; i< 100; i++){JSONObject object = new JSONObject();object.put("id", 1);object.put("value", i);String s = object.toJSONString();kafkaProducer.send(new ProducerRecord("test_topic_partition_one", s.getBytes(StandardCharsets.UTF_8))).get();object = new JSONObject();object.put("id", 2);object.put("value", 100 + i);s = object.toJSONString();kafkaProducer.send(new ProducerRecord("test_topic_partition_one", s.getBytes(StandardCharsets.UTF_8))).get();Thread.sleep(1000);}

flink消费kafka示例:

	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10 * 1000);KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("broker:9092").setProperties(properties).setTopics("test_topic_partition_one").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(2);DataStream<Tuple2<String, Integer>> dataStream = kafkaSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {JSONObject object = JSONObject.parseObject(value);return new Tuple2<String, Integer>(object.getString("id"), object.getInteger("value"));}});DataStream<String> resultStream = dataStream.keyBy(value -> value.f0) // 根据第一个字段(键)进行分组.process(new ListValueProcess());// 打印结果resultStream.print();

ListValueProcess状态函数

 	@Overridepublic void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {// 添加元素到 ListStatelistState.add(value.f1);// 获取 ListState 中的所有元素,并输出它们String key = value.f0;List<Integer> list = new ArrayList<>();for (Integer integer : listState.get()) {list.add(integer);}String result = "key:" + key + ", value:" +list;// 输出结果out.collect(result);}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();// 初始化 ListState// 不同的key 具有不用的listState// 用于存储一个key多个值ListStateDescriptor<Integer> integerListStateDescriptor = new ListStateDescriptor<>("my-list-state", Integer.class);integerListStateDescriptor.enableTimeToLive(ttlConfig);listState = getRuntimeContext().getListState(integerListStateDescriptor);}

可以看到StateTtlConfig大部份有三个参数

  • 指定状态保存时间
  • setUpdateType 设置状态更新策略:OnCreateAndWriteOnReadAndWrite
  • setStateVisibility 设置状态可见行 :ReturnExpiredIfNotCleanedUpNeverReturnExpired

这里我们保存状态时间是10s
OnCreateAndWrite: 表示当状态被创建与更新的时候,表示更新了状态
OnReadAndWrite:表示状态被创建与更新和读取的时候,表示更新了状态
ReturnExpiredIfNotCleanedUp:表示状态过期了但没有删除,也可以读取到状态
NeverReturnExpired:表示状态过期就读取不到

结果示例:
当:OnCreateAndWriteReturnExpiredIfNotCleanedUp

1> key:1, value:[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124]
1> key:1, value:[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125]
1> key:1, value:[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126]
1> key:1, value:[18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127]
1> key:1, value:[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128]
1> key:1, value:[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129]
1> key:1, value:[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130]

可以看到,状态会定期删除过期的数据,而且数据可见可能大于10s的范围。

OnCreateAndWriteNeverReturnExpired

1> key:2, value:[109, 110, 111, 112, 113, 114, 115, 116, 117]
1> key:1, value:[10, 11, 12, 13, 14, 15, 16, 17, 18]
1> key:2, value:[110, 111, 112, 113, 114, 115, 116, 117, 118]
1> key:1, value:[11, 12, 13, 14, 15, 16, 17, 18, 19]
1> key:2, value:[111, 112, 113, 114, 115, 116, 117, 118, 119]
1> key:1, value:[12, 13, 14, 15, 16, 17, 18, 19, 20]
1> key:2, value:[112, 113, 114, 115, 116, 117, 118, 119, 120]
1> key:1, value:[13, 14, 15, 16, 17, 18, 19, 20, 21]
1> key:2, value:[113, 114, 115, 116, 117, 118, 119, 120, 121]
1> key:1, value:[14, 15, 16, 17, 18, 19, 20, 21, 22]
1> key:2, value:[114, 115, 116, 117, 118, 119, 120, 121, 122]

可以看到,状态的数据只保留最近10s内的值

OnReadAndWriteReturnExpiredIfNotCleanedUp

1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132]

可以看到,状态保留了所有的数据,因为每次都会读取了数据,所以不会过期

OnReadAndWriteNeverReturnExpired

1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130]
1> key:1, value:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
1> key:2, value:[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131]

可以看到,状态保留了所有的数据,因为每次都会读取了数据,所以不会过期

这篇关于flink 状态参数设置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1107124

相关文章

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

IIS 7.0 及更高版本中的 FTP 状态代码

《IIS7.0及更高版本中的FTP状态代码》本文介绍IIS7.0中的FTP状态代码,方便大家在使用iis中发现ftp的问题... 简介尝试使用 FTP 访问运行 Internet Information Services (IIS) 7.0 或更高版本的服务器上的内容时,IIS 将返回指示响应状态的数字代

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

SpringSecurity JWT基于令牌的无状态认证实现

《SpringSecurityJWT基于令牌的无状态认证实现》SpringSecurity中实现基于JWT的无状态认证是一种常见的做法,本文就来介绍一下SpringSecurityJWT基于令牌的无... 目录引言一、JWT基本原理与结构二、Spring Security JWT依赖配置三、JWT令牌生成与

关于WebSocket协议状态码解析

《关于WebSocket协议状态码解析》:本文主要介绍关于WebSocket协议状态码的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录WebSocket协议状态码解析1. 引言2. WebSocket协议状态码概述3. WebSocket协议状态码详解3

Flutter监听当前页面可见与隐藏状态的代码详解

《Flutter监听当前页面可见与隐藏状态的代码详解》文章介绍了如何在Flutter中使用路由观察者来监听应用进入前台或后台状态以及页面的显示和隐藏,并通过代码示例讲解的非常详细,需要的朋友可以参考下... flutter 可以监听 app 进入前台还是后台状态,也可以监听当http://www.cppcn

MySQL 中的服务器配置和状态详解(MySQL Server Configuration and Status)

《MySQL中的服务器配置和状态详解(MySQLServerConfigurationandStatus)》MySQL服务器配置和状态设置包括服务器选项、系统变量和状态变量三个方面,可以通过... 目录mysql 之服务器配置和状态1 MySQL 架构和性能优化1.1 服务器配置和状态1.1.1 服务器选项

linux进程D状态的解决思路分享

《linux进程D状态的解决思路分享》在Linux系统中,进程在内核模式下等待I/O完成时会进入不间断睡眠状态(D状态),这种状态下,进程无法通过普通方式被杀死,本文通过实验模拟了这种状态,并分析了如... 目录1. 问题描述2. 问题分析3. 实验模拟3.1 使用losetup创建一个卷作为pv的磁盘3.

Java实现状态模式的示例代码

《Java实现状态模式的示例代码》状态模式是一种行为型设计模式,允许对象根据其内部状态改变行为,本文主要介绍了Java实现状态模式的示例代码,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来... 目录一、简介1、定义2、状态模式的结构二、Java实现案例1、电灯开关状态案例2、番茄工作法状态案例