【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId

2024-08-29 10:32

本文主要是介绍【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • issue详情
    • reader和writer的通道数不一致
    • 获取JobId
  • 代码分析
    • #issue145
      • 配置说明
      • 源码分析:
    • #issue148

最近准备再花点时间优化一下之前的FlinkX版本,特地去看了一下项目的issues区域,发现两个自己比较关注的issue。

issue详情

reader和writer的通道数不一致

  • 异构数据源reader和writer设置不同的Parallelism数#145

这个issue是我之前提的。当时我在测试拉取mysql数据写入我司一个自研MQ时发现,当channel过小时写MQ会比较慢(写入过程是同步的),当channel跳大后没有提速,读取成为瓶颈,甚至比单channel更慢。

详见我更早之前的一个issue#143。

后来虽然通过其他方法将写入能力提升勉强达到可用状态,但一直想框架本身能支持设置不同的Parallelism数。

获取JobId

  • flink on yarn获取jobid #148

这个就不用说了,现网程序大规模上线后肯定需要能获取获取job id做更精细的告警。

代码分析

#issue145

配置说明

目前版本已经支持,配置demo:

"speed": {"bytes": 1048576,"channel": 2,"rebalance": false,"readerChannel": 1,"writerChannel": 1
}
  • channel:任务并发数
  • readerChannel:reader的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为reader的并发数。
  • writerChannel:writer的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为writer的并发数。
  • rebalance:此参数配置为true时将强制对reader的数据做Rebalance,不配置此参数或者配置为false时,程序会根据reader和writer的通道数选择是否Rebalance,reader和writer的通道数一致时不使用Reblance,通道数不一致时使用Reblance。

源码分析:

// com.dtstack.flinkx.Main 
StreamExecutionEnvironment env = ……
……
// 设置全局并发
env.setParallelism(speedConfig.getChannel());
……
// 设置读并发
dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());
// 强制Rebalance有助于数据均匀
if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}
……
// 设置写并发
dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());

读写默认并发时是 -1。在flink中setParallelism(-1) 时就说使用系统当前的默认的并发

// com.dtstack.flinkx.config.SpeedConfig
public static final int DEFAULT_NUM_READER_WRITER_CHANNEL = -1;
	public static final int PARALLELISM_DEFAULT = -1;/*** The flag value indicating an unknown or unset parallelism. This value is* not a valid parallelism and indicates that the parallelism should remain* unchanged.*/public static final int PARALLELISM_UNKNOWN = -2;

#issue148

官方回答:RichInputFormat.initJobInfo()里面可以拿到
实际上RichOutputForma中也可以获得,这里我写一个demo

// com.dtstack.flinkx.stream.writer.StreamOutputFormat
public class StreamOutputFormat extends BaseRichOutputFormat {……@Overrideprotected void initJobInfo() {Map<String, String> vars = context.getMetricGroup().getAllVariables();System.out.println("Metrics.JOB_ID:" + vars.get(Metrics.JOB_ID));
//        super.initJobInfo();}
}

与RichFunction类似,Rich类可以拿到运行时的上下文,包括Job ID,Metric等

这篇关于【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1117609

相关文章

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

解决1093 - You can‘t specify target table报错问题及原因分析

《解决1093-Youcan‘tspecifytargettable报错问题及原因分析》MySQL1093错误因UPDATE/DELETE语句的FROM子句直接引用目标表或嵌套子查询导致,... 目录报js错原因分析具体原因解决办法方法一:使用临时表方法二:使用JOIN方法三:使用EXISTS示例总结报错原

一文详解如何使用Java获取PDF页面信息

《一文详解如何使用Java获取PDF页面信息》了解PDF页面属性是我们在处理文档、内容提取、打印设置或页面重组等任务时不可或缺的一环,下面我们就来看看如何使用Java语言获取这些信息吧... 目录引言一、安装和引入PDF处理库引入依赖二、获取 PDF 页数三、获取页面尺寸(宽高)四、获取页面旋转角度五、判断

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

Python使用OpenCV实现获取视频时长的小工具

《Python使用OpenCV实现获取视频时长的小工具》在处理视频数据时,获取视频的时长是一项常见且基础的需求,本文将详细介绍如何使用Python和OpenCV获取视频时长,并对每一行代码进行深入解析... 目录一、代码实现二、代码解析1. 导入 OpenCV 库2. 定义获取视频时长的函数3. 打开视频文

C# 比较两个list 之间元素差异的常用方法

《C#比较两个list之间元素差异的常用方法》:本文主要介绍C#比较两个list之间元素差异,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. 使用Except方法2. 使用Except的逆操作3. 使用LINQ的Join,GroupJoin

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

MySQL 获取字符串长度及注意事项

《MySQL获取字符串长度及注意事项》本文通过实例代码给大家介绍MySQL获取字符串长度及注意事项,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql 获取字符串长度详解 核心长度函数对比⚠️ 六大关键注意事项1. 字符编码决定字节长度2