【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId

2024-08-29 10:32

本文主要是介绍【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • issue详情
    • reader和writer的通道数不一致
    • 获取JobId
  • 代码分析
    • #issue145
      • 配置说明
      • 源码分析:
    • #issue148

最近准备再花点时间优化一下之前的FlinkX版本,特地去看了一下项目的issues区域,发现两个自己比较关注的issue。

issue详情

reader和writer的通道数不一致

  • 异构数据源reader和writer设置不同的Parallelism数#145

这个issue是我之前提的。当时我在测试拉取mysql数据写入我司一个自研MQ时发现,当channel过小时写MQ会比较慢(写入过程是同步的),当channel跳大后没有提速,读取成为瓶颈,甚至比单channel更慢。

详见我更早之前的一个issue#143。

后来虽然通过其他方法将写入能力提升勉强达到可用状态,但一直想框架本身能支持设置不同的Parallelism数。

获取JobId

  • flink on yarn获取jobid #148

这个就不用说了,现网程序大规模上线后肯定需要能获取获取job id做更精细的告警。

代码分析

#issue145

配置说明

目前版本已经支持,配置demo:

"speed": {"bytes": 1048576,"channel": 2,"rebalance": false,"readerChannel": 1,"writerChannel": 1
}
  • channel:任务并发数
  • readerChannel:reader的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为reader的并发数。
  • writerChannel:writer的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为writer的并发数。
  • rebalance:此参数配置为true时将强制对reader的数据做Rebalance,不配置此参数或者配置为false时,程序会根据reader和writer的通道数选择是否Rebalance,reader和writer的通道数一致时不使用Reblance,通道数不一致时使用Reblance。

源码分析:

// com.dtstack.flinkx.Main 
StreamExecutionEnvironment env = ……
……
// 设置全局并发
env.setParallelism(speedConfig.getChannel());
……
// 设置读并发
dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());
// 强制Rebalance有助于数据均匀
if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}
……
// 设置写并发
dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());

读写默认并发时是 -1。在flink中setParallelism(-1) 时就说使用系统当前的默认的并发

// com.dtstack.flinkx.config.SpeedConfig
public static final int DEFAULT_NUM_READER_WRITER_CHANNEL = -1;
	public static final int PARALLELISM_DEFAULT = -1;/*** The flag value indicating an unknown or unset parallelism. This value is* not a valid parallelism and indicates that the parallelism should remain* unchanged.*/public static final int PARALLELISM_UNKNOWN = -2;

#issue148

官方回答:RichInputFormat.initJobInfo()里面可以拿到
实际上RichOutputForma中也可以获得,这里我写一个demo

// com.dtstack.flinkx.stream.writer.StreamOutputFormat
public class StreamOutputFormat extends BaseRichOutputFormat {……@Overrideprotected void initJobInfo() {Map<String, String> vars = context.getMetricGroup().getAllVariables();System.out.println("Metrics.JOB_ID:" + vars.get(Metrics.JOB_ID));
//        super.initJobInfo();}
}

与RichFunction类似,Rich类可以拿到运行时的上下文,包括Job ID,Metric等

这篇关于【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1117609

相关文章

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

Java发送SNMP至交换机获取交换机状态实现方式

《Java发送SNMP至交换机获取交换机状态实现方式》文章介绍使用SNMP4J库(2.7.0)通过RCF1213-MIB协议获取交换机单/多路状态,需开启SNMP支持,重点对比SNMPv1、v2c、v... 目录交换机协议SNMP库获取交换机单路状态获取交换机多路状态总结交换机协议这里使用的交换机协议为常

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Spring Boot中获取IOC容器的多种方式

《SpringBoot中获取IOC容器的多种方式》本文主要介绍了SpringBoot中获取IOC容器的多种方式,包括直接注入、实现ApplicationContextAware接口、通过Spring... 目录1. 直接注入ApplicationContext2. 实现ApplicationContextA