【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId

2024-08-29 10:32

本文主要是介绍【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • issue详情
    • reader和writer的通道数不一致
    • 获取JobId
  • 代码分析
    • #issue145
      • 配置说明
      • 源码分析:
    • #issue148

最近准备再花点时间优化一下之前的FlinkX版本,特地去看了一下项目的issues区域,发现两个自己比较关注的issue。

issue详情

reader和writer的通道数不一致

  • 异构数据源reader和writer设置不同的Parallelism数#145

这个issue是我之前提的。当时我在测试拉取mysql数据写入我司一个自研MQ时发现,当channel过小时写MQ会比较慢(写入过程是同步的),当channel跳大后没有提速,读取成为瓶颈,甚至比单channel更慢。

详见我更早之前的一个issue#143。

后来虽然通过其他方法将写入能力提升勉强达到可用状态,但一直想框架本身能支持设置不同的Parallelism数。

获取JobId

  • flink on yarn获取jobid #148

这个就不用说了,现网程序大规模上线后肯定需要能获取获取job id做更精细的告警。

代码分析

#issue145

配置说明

目前版本已经支持,配置demo:

"speed": {"bytes": 1048576,"channel": 2,"rebalance": false,"readerChannel": 1,"writerChannel": 1
}
  • channel:任务并发数
  • readerChannel:reader的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为reader的并发数。
  • writerChannel:writer的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为writer的并发数。
  • rebalance:此参数配置为true时将强制对reader的数据做Rebalance,不配置此参数或者配置为false时,程序会根据reader和writer的通道数选择是否Rebalance,reader和writer的通道数一致时不使用Reblance,通道数不一致时使用Reblance。

源码分析:

// com.dtstack.flinkx.Main 
StreamExecutionEnvironment env = ……
……
// 设置全局并发
env.setParallelism(speedConfig.getChannel());
……
// 设置读并发
dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());
// 强制Rebalance有助于数据均匀
if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}
……
// 设置写并发
dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());

读写默认并发时是 -1。在flink中setParallelism(-1) 时就说使用系统当前的默认的并发

// com.dtstack.flinkx.config.SpeedConfig
public static final int DEFAULT_NUM_READER_WRITER_CHANNEL = -1;
	public static final int PARALLELISM_DEFAULT = -1;/*** The flag value indicating an unknown or unset parallelism. This value is* not a valid parallelism and indicates that the parallelism should remain* unchanged.*/public static final int PARALLELISM_UNKNOWN = -2;

#issue148

官方回答:RichInputFormat.initJobInfo()里面可以拿到
实际上RichOutputForma中也可以获得,这里我写一个demo

// com.dtstack.flinkx.stream.writer.StreamOutputFormat
public class StreamOutputFormat extends BaseRichOutputFormat {……@Overrideprotected void initJobInfo() {Map<String, String> vars = context.getMetricGroup().getAllVariables();System.out.println("Metrics.JOB_ID:" + vars.get(Metrics.JOB_ID));
//        super.initJobInfo();}
}

与RichFunction类似,Rich类可以拿到运行时的上下文,包括Job ID,Metric等

这篇关于【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1117609

相关文章

JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法

《JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法》:本文主要介绍JavaScript中比较两个数组是否有相同元素(交集)的三种常用方法,每种方法结合实例代码给大家介绍的非常... 目录引言:为什么"相等"判断如此重要?方法1:使用some()+includes()(适合小数组)方法2

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

Android 缓存日志Logcat导出与分析最佳实践

《Android缓存日志Logcat导出与分析最佳实践》本文全面介绍AndroidLogcat缓存日志的导出与分析方法,涵盖按进程、缓冲区类型及日志级别过滤,自动化工具使用,常见问题解决方案和最佳实... 目录android 缓存日志(Logcat)导出与分析全攻略为什么要导出缓存日志?按需过滤导出1. 按

Linux中的HTTPS协议原理分析

《Linux中的HTTPS协议原理分析》文章解释了HTTPS的必要性:HTTP明文传输易被篡改和劫持,HTTPS通过非对称加密协商对称密钥、CA证书认证和混合加密机制,有效防范中间人攻击,保障通信安全... 目录一、什么是加密和解密?二、为什么需要加密?三、常见的加密方式3.1 对称加密3.2非对称加密四、

MySQL中读写分离方案对比分析与选型建议

《MySQL中读写分离方案对比分析与选型建议》MySQL读写分离是提升数据库可用性和性能的常见手段,本文将围绕现实生产环境中常见的几种读写分离模式进行系统对比,希望对大家有所帮助... 目录一、问题背景介绍二、多种解决方案对比2.1 原生mysql主从复制2.2 Proxy层中间件:ProxySQL2.3

python使用Akshare与Streamlit实现股票估值分析教程(图文代码)

《python使用Akshare与Streamlit实现股票估值分析教程(图文代码)》入职测试中的一道题,要求:从Akshare下载某一个股票近十年的财务报表包括,资产负债表,利润表,现金流量表,保存... 目录一、前言二、核心知识点梳理1、Akshare数据获取2、Pandas数据处理3、Matplotl

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499