Flinkx启动流程-整体理解

2023-11-11 21:58

本文主要是介绍Flinkx启动流程-整体理解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 先看启动脚本

在bin/flinkx的内容

set -e
export FLINKX_HOME="$(cd "`dirname "$0"`"/..; pwd)"
# Find the java binary
if [ -n "${JAVA_HOME}" ]; thenJAVA_RUN="${JAVA_HOME}/bin/java"
elseif [ `command -v java` ]; thenJAVA_RUN="java"elseecho "JAVA_HOME is not set" >&2exit 1fi
fi
JAR_DIR=$FLINKX_HOME/lib/*
CLASS_NAME=com.dtstack.flinkx.launcher.Launcher
# 参数1.java的命令
# -cp就是classpath :cp解释https://zhuanlan.zhihu.com/p/214093661
# -cp $JAR_DIR $CLASS_NAME 就是指定运行哪个jar下的哪个主类
# $@为传递的参数
# &为后端运行
echo "flinkx starting ..."
nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &
  1. 先导入flinkx的环境
  2. 然后是java的二进制文件
  3. 指定flinkx相关jar包和启动类
  4. nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &执行

2. 在看Launcher启动类

定位到flinkx-1.8.5\flinkx-launcher\src\main\java\com\dtstack\flinkx\launcher\Launcher.java的main方法,第95行,查看本地模式

if(mode.equals(ClusterMode.local.name())) {String[] localArgs = argList.toArray(new String[argList.size()]);com.dtstack.flinkx.Main.main(localArgs);
}

flinkx本地模式启动实际调用的是 com.dtstack.flinkx.Main.main方法,进去看看

public static void main(String[] args) throws Exception {//  获取传递的参数com.dtstack.flinkx.options.Options options = new OptionParser(args).getOptions();String job = options.getJob();  // -jobString jobIdString = options.getJobid(); // Flink JobString monitor = options.getMonitor();String pluginRoot = options.getPluginRoot(); // -pluginRoot String savepointPath = options.getS();Properties confProperties = parseConf(options.getConfProp()); // -flinkconf// 解析jobPath指定的任务配置json文件DataTransferConfig config = DataTransferConfig.parse(job);speedTest(config);if(StringUtils.isNotEmpty(monitor)) {config.setMonitorUrls(monitor);}if(StringUtils.isNotEmpty(pluginRoot)) {config.setPluginRoot(pluginRoot);}StreamExecutionEnvironment env = (StringUtils.isNotBlank(monitor)) ?StreamExecutionEnvironment.getExecutionEnvironment() :new MyLocalStreamEnvironment();env = openCheckpointConf(env, confProperties);configRestartStrategy(env, config);SpeedConfig speedConfig = config.getJob().getSetting().getSpeed();env.setParallelism(speedConfig.getChannel());env.setRestartStrategy(RestartStrategies.noRestart());// 得到具体的reader类名BaseDataReader dataReader = DataReaderFactory.getDataReader(config, env);DataStream<Row> dataStream = dataReader.readData();dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}// 得到具体的writer类名BaseDataWriter dataWriter = DataWriterFactory.getDataWriter(config);dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());if(env instanceof MyLocalStreamEnvironment) {if(StringUtils.isNotEmpty(savepointPath)){((MyLocalStreamEnvironment) env).setSettings(SavepointRestoreSettings.forPath(savepointPath));}}addEnvClassPath(env, ClassLoaderManager.getClassPath());// 得到执行的结果JobExecutionResult result = env.execute(jobIdString);if(env instanceof MyLocalStreamEnvironment){ResultPrintUtil.printResult(result);}
}

3. 在结合start.sh脚本

结合start.sh脚本,就清楚了

D:/Projects/flinkx-1.8.5/bin/flinkx \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}"

flinkx之后执行执行的语句是这样的

nohup java -cp \
D:\Projects\flinkx-1.8.5\lib\flinkx-launcher-1.6.jar \
com.dtstack.flinkx.launcher.Launcher \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}" &

Launcher类中通过获取mode,pluginRoot ,flinkconf加载环境,通过json文件得知reader和writer的类型和同步策略

这篇关于Flinkx启动流程-整体理解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/393066

相关文章

解决hive启动时java.net.ConnectException:拒绝连接的问题

《解决hive启动时java.net.ConnectException:拒绝连接的问题》Hadoop集群连接被拒,需检查集群是否启动、关闭防火墙/SELinux、确认安全模式退出,若问题仍存,查看日志... 目录错误发生原因解决方式1.关闭防火墙2.关闭selinux3.启动集群4.检查集群是否正常启动5.

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

kkFileView启动报错:报错2003端口占用的问题及解决

《kkFileView启动报错:报错2003端口占用的问题及解决》kkFileView启动报错因office组件2003端口未关闭,解决:查杀占用端口的进程,终止Java进程,使用shutdown.s... 目录原因解决总结kkFileViewjavascript启动报错启动office组件失败,请检查of

Linux下在线安装启动VNC教程

《Linux下在线安装启动VNC教程》本文指导在CentOS7上在线安装VNC,包含安装、配置密码、启动/停止、清理重启步骤及注意事项,强调需安装VNC桌面以避免黑屏,并解决端口冲突和目录权限问题... 目录描述安装VNC安装 VNC 桌面可能遇到的问题总结描js述linux中的VNC就类似于Window

linux下shell脚本启动jar包实现过程

《linux下shell脚本启动jar包实现过程》确保APP_NAME和LOG_FILE位于目录内,首次启动前需手动创建log文件夹,否则报错,此为个人经验,供参考,欢迎支持脚本之家... 目录linux下shell脚本启动jar包样例1样例2总结linux下shell脚本启动jar包样例1#!/bin

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

Java Spring的依赖注入理解及@Autowired用法示例详解

《JavaSpring的依赖注入理解及@Autowired用法示例详解》文章介绍了Spring依赖注入(DI)的概念、三种实现方式(构造器、Setter、字段注入),区分了@Autowired(注入... 目录一、什么是依赖注入(DI)?1. 定义2. 举个例子二、依赖注入的几种方式1. 构造器注入(Con

Spring Boot从main方法到内嵌Tomcat的全过程(自动化流程)

《SpringBoot从main方法到内嵌Tomcat的全过程(自动化流程)》SpringBoot启动始于main方法,创建SpringApplication实例,初始化上下文,准备环境,刷新容器并... 目录1. 入口:main方法2. SpringApplication初始化2.1 构造阶段3. 运行阶

使用Go实现文件复制的完整流程

《使用Go实现文件复制的完整流程》本案例将实现一个实用的文件操作工具:将一个文件的内容完整复制到另一个文件中,这是文件处理中的常见任务,比如配置文件备份、日志迁移、用户上传文件转存等,文中通过代码示例... 目录案例说明涉及China编程知识点示例代码代码解析示例运行练习扩展小结案例说明我们将通过标准库 os

Ubuntu 24.04启用root图形登录的操作流程

《Ubuntu24.04启用root图形登录的操作流程》Ubuntu默认禁用root账户的图形与SSH登录,这是为了安全,但在某些场景你可能需要直接用root登录GNOME桌面,本文以Ubuntu2... 目录一、前言二、准备工作三、设置 root 密码四、启用图形界面 root 登录1. 修改 GDM 配