Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期

本文主要是介绍Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一期指路:

上一期​​​​​​​

承接上一期讲到YarnTaskExecutorRunner的main方法,我们继续往下分析。

1.YarnTaskExecutorRunner#main->YarnTaskExecutorRunner#runTaskManagerSecurely->TaskManagerRunner#runTaskManagerSecurely

	public static void runTaskManagerSecurely(Configuration configuration) throws Exception {replaceGracefulExitWithHaltIfConfigured(configuration);final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);FileSystem.initialize(configuration, pluginManager);SecurityUtils.install(new SecurityConfiguration(configuration));SecurityUtils.getInstalledContext().runSecured(() -> {runTaskManager(configuration, pluginManager);return null;});}

2. TaskManagerRunner#runTaskManager

	public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);taskManagerRunner.start();}

①new TaskManagerRunner

构建TaskManagerRunner

②taskManagerRunner.start()

启动

3.TaskManagerRunner#start->TaskExecutorToServiceAdapter#start->RpcEndpoint#start

rpcServer.start()

rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态。那么直接看TaskExecutor的onStart方法

4.TaskExecutor#onStart->TaskExecutor#startTaskExecutorServices

	private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// tell the task slot table who's responsible for the task slot actionstaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// start the job leader servicejobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}

5.StandaloneLeaderRetrievalService#start->TaskExecutor的内部类ResourceManagerLeaderListener#notifyLeaderAddress->TaskExecutor#notifyOfNewResourceManagerLeader->TaskExecutor#reconnectToResourceManager->TaskExecutor#tryConnectToResourceManager->TaskExecutor#connectToResourceManager

	private void connectToResourceManager() {assert(resourceManagerAddress != null);assert(establishedResourceManagerConnection == null);assert(resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();}

6.RegisteredRpcConnection#start

	public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}

①createNewRegistration

创建一个te向rm发起注册

②startRegistration

启动这个注册

7.RetryingRegistration#startRegistration

	public void startRegistration() {if (canceled) {// we already got canceledreturn;}try {// trigger resolution of the target address to a callable gatewayfinal CompletableFuture<G> rpcGatewayFuture;if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}// upon success, start the registration attemptsCompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync((G rpcGateway) -> {log.info("Resolved {} address, beginning registration", targetName);register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());},rpcService.getExecutor());// upon failure, retry, unless this is cancelledrpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !canceled) {final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);if (log.isDebugEnabled()) {log.debug("Could not resolve {} address {}, retrying in {} ms.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure);} else {log.info("Could not resolve {} address {}, retrying in {} ms: {}",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());}startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());}},rpcService.getExecutor());}catch (Throwable t) {completionFuture.completeExceptionally(t);cancel();}}

①rpcService.connect

将目标地址解析为一个可调用的网关

②register

成功后,就开始尝试注册

8.RetryingRegistration#register->TaskExecutorToResourceManagerConnection的内部类ResourceManagerRegistration#invokeRegistration->ResourceManager#registerTaskExecutorInternal

	private RegistrationResponse registerTaskExecutorInternal(TaskExecutorGateway taskExecutorGateway,TaskExecutorRegistration taskExecutorRegistration) {ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);if (oldRegistration != null) {// TODO :: suggest old taskExecutor to stop itselflog.debug("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId.getStringWithMetadata());// remove old task manager registration from slot managerslotManager.unregisterTaskManager(oldRegistration.getInstanceID(),new ResourceManagerException(String.format("TaskExecutor %s re-connected to the ResourceManager.", taskExecutorResourceId.getStringWithMetadata())));}final WorkerType newWorker = workerStarted(taskExecutorResourceId);String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();if (newWorker == null) {log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " +"not recognize it", taskExecutorResourceId.getStringWithMetadata(), taskExecutorAddress);return new RegistrationResponse.Decline("unrecognized TaskExecutor");} else {WorkerRegistration<WorkerType> registration = new WorkerRegistration<>(taskExecutorGateway,newWorker,taskExecutorRegistration.getDataPort(),taskExecutorRegistration.getJmxPort(),taskExecutorRegistration.getHardwareDescription(),taskExecutorRegistration.getMemoryConfiguration());log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager", taskExecutorResourceId.getStringWithMetadata(), taskExecutorAddress);taskExecutors.put(taskExecutorResourceId, registration);taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {@Overridepublic void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the// TaskManager}@Overridepublic void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);}});return new TaskExecutorRegistrationSuccess(registration.getInstanceID(),resourceId,clusterInformation);}}

①getResourceId

获取te的资源id

②taskExecutors.remove

移除之前注册的缓存信息

③slotManager.unregisterTaskManager

从slotManager中删除旧的taskManager注册

④workerStarted

当一个worker被启动时回调得到workerType

⑤getTaskExecutorAddress

获取te地址

⑥new WorkerRegistration<>

构建WorkerRegistration

⑦log.info

taskExecutors.put

打印日志在rm上注册tm

放入缓存中

⑧taskManagerHeartbeatManager.monitorTarget

监控tm作为心跳目标

⑨new TaskExecutorRegistrationSuccess

创建并返回注册成功的信息

由于涉及到rpc调用,发送了注册成功的信息,那么就一定会回调TaskExecutor中的onRegistrationSuccess方法,剩下的我们下期分析。

总览

这一期涉及到的源码流程图如下:

我们下期见!

这篇关于Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/528948

相关文章

linux解压缩 xxx.jar文件进行内部操作过程

《linux解压缩xxx.jar文件进行内部操作过程》:本文主要介绍linux解压缩xxx.jar文件进行内部操作,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、解压文件二、压缩文件总结一、解压文件1、把 xxx.jar 文件放在服务器上,并进入当前目录#

一文详解SpringBoot中控制器的动态注册与卸载

《一文详解SpringBoot中控制器的动态注册与卸载》在项目开发中,通过动态注册和卸载控制器功能,可以根据业务场景和项目需要实现功能的动态增加、删除,提高系统的灵活性和可扩展性,下面我们就来看看Sp... 目录项目结构1. 创建 Spring Boot 启动类2. 创建一个测试控制器3. 创建动态控制器注

浏览器插件cursor实现自动注册、续杯的详细过程

《浏览器插件cursor实现自动注册、续杯的详细过程》Cursor简易注册助手脚本通过自动化邮箱填写和验证码获取流程,大大简化了Cursor的注册过程,它不仅提高了注册效率,还通过友好的用户界面和详细... 目录前言功能概述使用方法安装脚本使用流程邮箱输入页面验证码页面实战演示技术实现核心功能实现1. 随机

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Nacos注册中心和配置中心的底层原理全面解读

《Nacos注册中心和配置中心的底层原理全面解读》:本文主要介绍Nacos注册中心和配置中心的底层原理的全面解读,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录临时实例和永久实例为什么 Nacos 要将服务实例分为临时实例和永久实例?1.x 版本和2.x版本的区别

8种快速易用的Python Matplotlib数据可视化方法汇总(附源码)

《8种快速易用的PythonMatplotlib数据可视化方法汇总(附源码)》你是否曾经面对一堆复杂的数据,却不知道如何让它们变得直观易懂?别慌,Python的Matplotlib库是你数据可视化的... 目录引言1. 折线图(Line Plot)——趋势分析2. 柱状图(Bar Chart)——对比分析3

Java中的内部类和常用类用法解读

《Java中的内部类和常用类用法解读》:本文主要介绍Java中的内部类和常用类用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录内部类和常用类内部类成员内部类静态内部类局部内部类匿名内部类常用类Object类包装类String类StringBuffer和Stri

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel