Flinkx启动流程-整体理解

2023-11-11 21:58

本文主要是介绍Flinkx启动流程-整体理解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 先看启动脚本

在bin/flinkx的内容

set -e
export FLINKX_HOME="$(cd "`dirname "$0"`"/..; pwd)"
# Find the java binary
if [ -n "${JAVA_HOME}" ]; thenJAVA_RUN="${JAVA_HOME}/bin/java"
elseif [ `command -v java` ]; thenJAVA_RUN="java"elseecho "JAVA_HOME is not set" >&2exit 1fi
fi
JAR_DIR=$FLINKX_HOME/lib/*
CLASS_NAME=com.dtstack.flinkx.launcher.Launcher
# 参数1.java的命令
# -cp就是classpath :cp解释https://zhuanlan.zhihu.com/p/214093661
# -cp $JAR_DIR $CLASS_NAME 就是指定运行哪个jar下的哪个主类
# $@为传递的参数
# &为后端运行
echo "flinkx starting ..."
nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &
  1. 先导入flinkx的环境
  2. 然后是java的二进制文件
  3. 指定flinkx相关jar包和启动类
  4. nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &执行

2. 在看Launcher启动类

定位到flinkx-1.8.5\flinkx-launcher\src\main\java\com\dtstack\flinkx\launcher\Launcher.java的main方法,第95行,查看本地模式

if(mode.equals(ClusterMode.local.name())) {String[] localArgs = argList.toArray(new String[argList.size()]);com.dtstack.flinkx.Main.main(localArgs);
}

flinkx本地模式启动实际调用的是 com.dtstack.flinkx.Main.main方法,进去看看

public static void main(String[] args) throws Exception {//  获取传递的参数com.dtstack.flinkx.options.Options options = new OptionParser(args).getOptions();String job = options.getJob();  // -jobString jobIdString = options.getJobid(); // Flink JobString monitor = options.getMonitor();String pluginRoot = options.getPluginRoot(); // -pluginRoot String savepointPath = options.getS();Properties confProperties = parseConf(options.getConfProp()); // -flinkconf// 解析jobPath指定的任务配置json文件DataTransferConfig config = DataTransferConfig.parse(job);speedTest(config);if(StringUtils.isNotEmpty(monitor)) {config.setMonitorUrls(monitor);}if(StringUtils.isNotEmpty(pluginRoot)) {config.setPluginRoot(pluginRoot);}StreamExecutionEnvironment env = (StringUtils.isNotBlank(monitor)) ?StreamExecutionEnvironment.getExecutionEnvironment() :new MyLocalStreamEnvironment();env = openCheckpointConf(env, confProperties);configRestartStrategy(env, config);SpeedConfig speedConfig = config.getJob().getSetting().getSpeed();env.setParallelism(speedConfig.getChannel());env.setRestartStrategy(RestartStrategies.noRestart());// 得到具体的reader类名BaseDataReader dataReader = DataReaderFactory.getDataReader(config, env);DataStream<Row> dataStream = dataReader.readData();dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}// 得到具体的writer类名BaseDataWriter dataWriter = DataWriterFactory.getDataWriter(config);dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());if(env instanceof MyLocalStreamEnvironment) {if(StringUtils.isNotEmpty(savepointPath)){((MyLocalStreamEnvironment) env).setSettings(SavepointRestoreSettings.forPath(savepointPath));}}addEnvClassPath(env, ClassLoaderManager.getClassPath());// 得到执行的结果JobExecutionResult result = env.execute(jobIdString);if(env instanceof MyLocalStreamEnvironment){ResultPrintUtil.printResult(result);}
}

3. 在结合start.sh脚本

结合start.sh脚本,就清楚了

D:/Projects/flinkx-1.8.5/bin/flinkx \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}"

flinkx之后执行执行的语句是这样的

nohup java -cp \
D:\Projects\flinkx-1.8.5\lib\flinkx-launcher-1.6.jar \
com.dtstack.flinkx.launcher.Launcher \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}" &

Launcher类中通过获取mode,pluginRoot ,flinkconf加载环境,通过json文件得知reader和writer的类型和同步策略

这篇关于Flinkx启动流程-整体理解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/393066

相关文章

nginx启动命令和默认配置文件的使用

《nginx启动命令和默认配置文件的使用》:本文主要介绍nginx启动命令和默认配置文件的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录常见命令nginx.conf配置文件location匹配规则图片服务器总结常见命令# 默认配置文件启动./nginx

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

Nexus安装和启动的实现教程

《Nexus安装和启动的实现教程》:本文主要介绍Nexus安装和启动的实现教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Nexus下载二、Nexus安装和启动三、关闭Nexus总结一、Nexus下载官方下载链接:DownloadWindows系统根

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

Java中实现线程的创建和启动的方法

《Java中实现线程的创建和启动的方法》在Java中,实现线程的创建和启动是两个不同但紧密相关的概念,理解为什么要启动线程(调用start()方法)而非直接调用run()方法,是掌握多线程编程的关键,... 目录1. 线程的生命周期2. start() vs run() 的本质区别3. 为什么必须通过 st

Oracle修改端口号之后无法启动的解决方案

《Oracle修改端口号之后无法启动的解决方案》Oracle数据库更改端口后出现监听器无法启动的问题确实较为常见,但并非必然发生,这一问题通常源于​​配置错误或环境冲突​​,而非端口修改本身,以下是系... 目录一、问题根源分析​​​二、保姆级解决方案​​​​步骤1:修正监听器配置文件 (listener.

MySQL版本问题导致项目无法启动问题的解决方案

《MySQL版本问题导致项目无法启动问题的解决方案》本文记录了一次因MySQL版本不一致导致项目启动失败的经历,详细解析了连接错误的原因,并提供了两种解决方案:调整连接字符串禁用SSL或统一MySQL... 目录本地项目启动报错报错原因:解决方案第一个:第二种:容器启动mysql的坑两种修改时区的方法:本地

MySQL启动报错:InnoDB表空间丢失问题及解决方法

《MySQL启动报错:InnoDB表空间丢失问题及解决方法》在启动MySQL时,遇到了InnoDB:Tablespace5975wasnotfound,该错误表明MySQL在启动过程中无法找到指定的s... 目录mysql 启动报错:InnoDB 表空间丢失问题及解决方法错误分析解决方案1. 启用 inno

spring-gateway filters添加自定义过滤器实现流程分析(可插拔)

《spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔)》:本文主要介绍spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔),本文通过实例图... 目录需求背景需求拆解设计流程及作用域逻辑处理代码逻辑需求背景公司要求,通过公司网络代理访问的请求需要做请

使用JavaConfig配置Spring的流程步骤

《使用JavaConfig配置Spring的流程步骤》JavaConfig是Spring框架提供的一种基于Java的配置方式,它通过使用@Configuration注解标记的类来替代传统的XML配置文... 目录一、什么是 JavaConfig?1. 核心注解2. 与 XML 配置的对比二、JavaConf