Flink AsyncFunction导致的Kafka数据不消费

2024-09-06 21:18

本文主要是介绍Flink AsyncFunction导致的Kafka数据不消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

问题描述

flinksql从kafka读取数据,异步函数加载Mysql数据进行维表关联,最后将数据写入到mysql中。任务在启动时会消费kafka数据,一段时间后不读kafka或者能够持续读kafka数据但是异步函数不发送给下游算子。

  • 不读kafka数据:kafka读取线程像卡住一样,从kafka中读取不到数据,以为是网络原因,但是计算节点和工作节点在同一台机器中,于是排除网络原因。

  • 持续读kafka数据,但是异步函数不下发数据:以为是设置的异步超时间超时,默认是10s,增大超时时间后依然不下发。


Jstack 排查

打印执行线程堆栈信息,虽然BLOCKED状态的线程很多,但大部分是第三方类的执行线程,都比较正常。突然发现和我们程序有关的代码阻塞线程。

原来是调用我们的timeout函数出现了阻塞。

   public void timeout(Row input, ResultFuture<Row> resultFuture) {StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;try {// 阻塞等待if (null == future.get()) {resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));}} catch (Exception e) {resultFuture.completeExceptionally(new Exception(e));}}
阻塞原因

在flink异步函数asyncInvoke中,只处理了正常逻辑。也就是匹配上调用resultFuture.complete(rowList);但是fillData里面进行数据类型转换时很容易发生异常,当发生异常时,resultFuture并没有结果输出,从而导致整个链路阻塞。

 List<Row> rowList = Lists.newArrayList();for (Object jsonArray : (List) val.getContent()) {Row row = fillData(input, jsonArray);rowList.add(row);}resultFuture.complete(rowList);
解决以及注意事项

fillData进行try-catch捕获发生异常时调用resultFuture.completeExceptionally(exception);

在flink异步函数中,resultFuture.complete()只会被调用一次,complete一个集合需要先在填充然后一次性发送,而不是通过遍历调用多次resultFuture.complete()

使用异步Future一定要记得有输出值。
堆栈信息重点关注有没有我们自己的逻辑 。

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Flink AsyncFunction导致的Kafka数据不消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143147

相关文章

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

使用SpringBoot+InfluxDB实现高效数据存储与查询

《使用SpringBoot+InfluxDB实现高效数据存储与查询》InfluxDB是一个开源的时间序列数据库,特别适合处理带有时间戳的监控数据、指标数据等,下面详细介绍如何在SpringBoot项目... 目录1、项目介绍2、 InfluxDB 介绍3、Spring Boot 配置 InfluxDB4、I

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

Java整合Protocol Buffers实现高效数据序列化实践

《Java整合ProtocolBuffers实现高效数据序列化实践》ProtocolBuffers是Google开发的一种语言中立、平台中立、可扩展的结构化数据序列化机制,类似于XML但更小、更快... 目录一、Protocol Buffers简介1.1 什么是Protocol Buffers1.2 Pro