FlinkSQL时间更新问题

2023-12-20 10:38
文章标签 问题 更新 时间 flinksql

本文主要是介绍FlinkSQL时间更新问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近项目中使用FlinkSQL来做数据统计,遇到一些问题,小结一下。

第一个问题:聚合好的正确数据写入数据库后不正确。

场景:因为是做数据聚合,会upsert(更新或写入)数据,为了保证效率,批量每10s中在数据库中写一次数据,异步写入,每次最多更新500条。

结果:日志打印出最终的统计结果正确,但写入数据库的值不正确。

原因:异步写入,无法保证写入顺序,如果一批数据中有 对同一条记录进行更新的 一条以上的数据,无法保证两条记录的先后执行顺序,导致数据写入数据库不正确。而且会重复更新,对数据库造成压力。

解决办法:每条统计结果增加时间戳,每次把一批数据通过关键字(看自己业务逻辑确定关键字奥)去重,保留时间最大的那条记录。过滤后,在进行更新操作。但因为业务量大,只增加时间戳有时区分不了哪条记录是最新的,因为时间戳相同,所以采用【时间戳+递增字段值】做标记。解决了问题。

 

第二个问题:项目上线正常运行,但第二天发现数据没有更新

场景:因为没有使用窗口,所以统计当天数据的时间范围是自己限制的。至于为什么没有用窗口,是因为外层查询和子查询中都有group by操作,Flink的回撤流不能很好的处理,使用会报错。所以用Java代码生成了一个datekey(像这样20210603)用来框定时间。

原因:FlinkSQL在项目启动时加载一次,时间是作为变量写进去的,是一个固定的值,固定的值。

解决办法:使用FlinkSQL时间函数来动态获取当前时间,sql改为下面这样,问题解决。

whereCAST(dateKey as VARCHAR) = DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMdd')

这里需要注意,要使用本地TIMESTAMP (LOCALTIMESTAMP),不然时间会差八小时,会导致框选的时间不对哦。(时差问题在Flink 1.13版本中有了解决方案,提供了一种新的时间类型,返回long类型值,格式化后以当前服务器的时区为准)

其他问题,这个是因为算子进行计算是有异常产生,log日志加上就能看到哪里问题了。

java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id container_e26_1597048467840_3275_01_000003  timed out.at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1202)at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2021-05-20 17:58:38.226 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - Calculating tasks to restart to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1.
2021-05-20 17:58:38.229 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.f.flip1.RestartPipelinedRegionFailoverStrategy  - 58 tasks should be restarted to recover the failed task fc0da2485aaba36f11ecd88823335bb4_1. 

最后,愿所有的bug都能被解决

 

这篇关于FlinkSQL时间更新问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/515730

相关文章

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

MySQL 设置AUTO_INCREMENT 无效的问题解决

《MySQL设置AUTO_INCREMENT无效的问题解决》本文主要介绍了MySQL设置AUTO_INCREMENT无效的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参... 目录快速设置mysql的auto_increment参数一、修改 AUTO_INCREMENT 的值。

关于跨域无效的问题及解决(java后端方案)

《关于跨域无效的问题及解决(java后端方案)》:本文主要介绍关于跨域无效的问题及解决(java后端方案),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录通用后端跨域方法1、@CrossOrigin 注解2、springboot2.0 实现WebMvcConfig

Go语言中泄漏缓冲区的问题解决

《Go语言中泄漏缓冲区的问题解决》缓冲区是一种常见的数据结构,常被用于在不同的并发单元之间传递数据,然而,若缓冲区使用不当,就可能引发泄漏缓冲区问题,本文就来介绍一下问题的解决,感兴趣的可以了解一下... 目录引言泄漏缓冲区的基本概念代码示例:泄漏缓冲区的产生项目场景:Web 服务器中的请求缓冲场景描述代码

Java死锁问题解决方案及示例详解

《Java死锁问题解决方案及示例详解》死锁是指两个或多个线程因争夺资源而相互等待,导致所有线程都无法继续执行的一种状态,本文给大家详细介绍了Java死锁问题解决方案详解及实践样例,需要的朋友可以参考下... 目录1、简述死锁的四个必要条件:2、死锁示例代码3、如何检测死锁?3.1 使用 jstack3.2

解决JSONField、JsonProperty不生效的问题

《解决JSONField、JsonProperty不生效的问题》:本文主要介绍解决JSONField、JsonProperty不生效的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录jsONField、JsonProperty不生效javascript问题排查总结JSONField

github打不开的问题分析及解决

《github打不开的问题分析及解决》:本文主要介绍github打不开的问题分析及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、找到github.com域名解析的ip地址二、找到github.global.ssl.fastly.net网址解析的ip地址三

MySQL版本问题导致项目无法启动问题的解决方案

《MySQL版本问题导致项目无法启动问题的解决方案》本文记录了一次因MySQL版本不一致导致项目启动失败的经历,详细解析了连接错误的原因,并提供了两种解决方案:调整连接字符串禁用SSL或统一MySQL... 目录本地项目启动报错报错原因:解决方案第一个:第二种:容器启动mysql的坑两种修改时区的方法:本地

springboot加载不到nacos配置中心的配置问题处理

《springboot加载不到nacos配置中心的配置问题处理》:本文主要介绍springboot加载不到nacos配置中心的配置问题处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录springboot加载不到nacos配置中心的配置两种可能Spring Boot 版本Nacos