Druid Supervisor启动task流程分析

2024-04-29 12:58

本文主要是介绍Druid Supervisor启动task流程分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言


        继前一篇文章关于supervisor启动流程分析的启动过程,然后来分析一下supervisor启动后是如何启动并管理task的运行的。又是如何将KafkaTask的对象创建的。

上图


        创建完持续执行的supervisor线程后,紧接着通过一个定时的单线程池来创建RunNotice()对象并放入notice队列中供supervisor进行poll并运行handle()方法。定时的时间则是配置的task的运行周期,默认是1秒

supervisor获取到RunNotice的时候,开始执行RunNotice的hadle(), 然后开始执行创建task并将创建的kafkaTask添加到TaskMaster管理的TaskQueue中供taskRunner执行。整个task的创建和被执行的过程是消费者模式启动的。

        taskRunner调用task的start方法后开始具体的数据传输。此处的taskRunner包含:ForkingTaskRunner、HttpRemoteTaskRunner、RemoteTaskRunner、SingleTaskBackgroundRunner 四种实现。具体每中taskRunner的创建过程以及操作原理在后面的文章中做描述,本篇文章不做描述。

 上代码


        在类SeekableStreamSupervisor中执行tryInit() 来启动supervisor, 然后紧接着创建定时创建task的线程池。

scheduledExec.scheduleAtFixedRate(buildRunTask(), // 创建RunNotice()对象ioConfig.getStartDelay().getMillis(),Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), // 配置的task执行周期TimeUnit.MILLISECONDS);

        具体创建task入口则是RunNotice的handle()方法:

private class RunNotice implements Notice{@Overridepublic void handle(){long nowTime = System.currentTimeMillis();// // MAX_RUN_FREQUENCY_MILLIS 是任务的运行周期,默认是一秒, 如果配置的是2个小时,即2个小时会运行一次runInternal()if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {return;}lastRunTime = nowTime;// 即2个小时会运行一次runInternal()runInternal();}}

        如果符合执行的时间要求则执行runInternal()方法:

public void runInternal(){try {/*** 此处省略了很多状态检查以及状态判断变成的操作* 主要是针对现有的task进行状态变更检查,为启动新的task做准备* 这里关于task状态变更的代码可作为细节详解来仔细研究下,具体就是描述了task在切换的时候需要做哪些事情*/if (!spec.isSuspended()) {log.info("[%s] supervisor is running.", dataSource);stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);// 前置操作判断完后开始创建新的taskcreateNewTasks();} else {log.info("[%s] supervisor is suspended.", dataSource);gracefulShutdownInternal();}if (log.isDebugEnabled()) {log.debug(generateReport(true).toString());} else {log.info(generateReport(false).toString());}}catch (Exception e) {stateManager.recordThrowableEvent(e);log.warn(e, "Exception in supervisor run loop for dataSource [%s]", dataSource);}finally {stateManager.markRunFinished();}}

        调用创建task的方法后,会经过一些列的判断,最终组合封装出task的基本信息,io配置信息,封装分配TaskGroup,  创建出具体的task对象。

        createNewTasks() -> createTasksForGroup(groupId, ioConfig.getReplicas() - taskGroup.tasks.size()) -> createIndexTasks() -> new KafkaIndexTask() 创建完task对象后会调用taskMaster得到TaskQueue, 将创建的task对象添加到TaskQueue中供taskQueue进行处理。至此supervisor创建task的工作就做完了。

private void createTasksForGroup(int groupId, int replicas)throws JsonProcessingException{TaskGroup group = activelyReadingTaskGroups.get(groupId);Map<PartitionIdType, SequenceOffsetType> startPartitions = group.startingSequences;Map<PartitionIdType, SequenceOffsetType> endPartitions = new HashMap<>();for (PartitionIdType partition : startPartitions.keySet()) {endPartitions.put(partition, getEndOfPartitionMarker());}Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups.get(groupId).exclusiveStartSequenceNumberPartitions;DateTime minimumMessageTime = group.minimumMessageTime.orNull();DateTime maximumMessageTime = group.maximumMessageTime.orNull();// 根据taskGroupId信息 创建task的IOConfigSeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(groupId,startPartitions,endPartitions,group.baseSequenceName,minimumMessageTime,maximumMessageTime,exclusiveStartSequenceNumberPartitions,ioConfig);// 根据task的基本信息,创建kafkaTask, 因为可能有副本所以使用ListList<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> taskList = createIndexTasks(replicas,group.baseSequenceName,sortingMapper,group.checkpointSequences,newIoConfig,taskTuningConfig,rowIngestionMetersFactory);// 创建完task后 将task放到taskMaster的队列中,等待被启for (SeekableStreamIndexTask indexTask : taskList) {Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();if (taskQueue.isPresent()) {try {taskQueue.get().add(indexTask);}catch (EntryExistsException e) {stateManager.recordThrowableEvent(e);log.error("Tried to add task [%s] but it already exists", indexTask.getId());}} else {log.error("Failed to get task queue because I'm not the leader!");}}
}

        接着来简单看看把task扔给TaskQueue后,TaskQueue是做的什么操作:首先TaskQueue这个对象也是注入的,且是有生命周期的。注入时会调用TaskQueue的start方法。然后启动一个线程来循环处理任务。

@LifecycleStartpublic void start(){giant.lock();try {Preconditions.checkState(!active, "queue must be stopped");active = true;syncFromStorage();managerExec.submit(new Runnable(){@Overridepublic void run(){while (true) {try {manage(); // 开启线程来不断的调用该方案break;}catch (InterruptedException e) {log.info("Interrupted, exiting!");break;}catch (Exception e) {final long restartDelay = config.getRestartDelay().getMillis();log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();try {Thread.sleep(restartDelay);}catch (InterruptedException e2) {log.info("Interrupted, exiting!");break;}}}}});
}

        TaskQueue启动一个线程在调用manage()方法,那我们来看看manage()方法在干什么?从代码可以看出manage()方法执行结束后该线程也会结束了。之所以加一个循环是为了重试机制。manage()方法中才是真正循环执行task的地方。 对于每个task会被TaskRunner调用并将其run()起来。此时task才是真正的被启动了。关于TaskRunner的内容后续文章在做详细描述。

/*** Main task runner management loop. Meant to run forever, or, at least until we're stopped.*/private void manage() throws InterruptedException{// ....while (active) {giant.lock();try {// ....for (final Task task : ImmutableList.copyOf(tasks)) {if (!taskFutures.containsKey(task.getId())) {final ListenableFuture<TaskStatus> runnerTaskFuture;if (runnerTaskFutures.containsKey(task.getId())) {runnerTaskFuture = runnerTaskFutures.get(task.getId());} else {// Task should be running, so run it.final boolean taskIsReady;try {taskIsReady = task.isReady(taskActionClientFactory.create(task));}catch (Exception e) {log.warn(e, "Exception thrown during isReady for task: %s", task.getId());notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());continue;}if (taskIsReady) {log.info("Asking taskRunner to run: %s", task.getId());runnerTaskFuture = taskRunner.run(task);} else {continue;}}taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));} else if (isTaskPending(task)) {// if the taskFutures contain this task and this task is pending, also let the taskRunner// to run it to guarantee it will be assigned to run// see https://github.com/apache/incubator-druid/pull/6991taskRunner.run(task);}}// ....}finally {giant.unlock();}}}

END


        本篇文章涉及的很多细节性问题都没有展开详细描述,比如task的状态变更taskGroup的实现与应用taskRunner注册监听器问题task的chackpoint的问题task的消费序列记录问题task的副本问题KafkaIndexTask的执行流程(下一篇描述)、TaskQueue的实现TaskRunner的实现(重点)等等,每一个细节都非常值得研究~~ 。不过本篇重在task创建的大体流程,对task的存在形式整体有个认识。

这篇关于Druid Supervisor启动task流程分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/946148

相关文章

Mysql服务无法启动 1067

今天在启动Mysql的时候无法启动,报错1067错误,去事件管理器查看日志显示的是mysql.user表不存在,才想起是昨天使用Navicat时,不小心把mysql库给删除了。在Mysql安装的时候会有三个特殊的数据库,mysql、meta、information_schema,在删除了mysql数据库之后,找不到当前mysql的用户信息,自然就无法启动了。要解决的事情就是把mysql数据库找回来

Spring Boot启动报错:Failed to instantiate [ClassName]: No default constructor found;

今天在Spring Boot工程启动时报错了,提示不能初始化该类,原因是找不到默认的构造方法,我看了一下,我也有写构造方法: public class H2DbOperation extends AbstractDbOperation{public H2DbOperation(String url, String name, String password, Connection conne

AI模型部署实战:利用OpenCV的CUDA模块加速视觉模型部署流程

本文首发于公众号【DeepDriving】,欢迎关注。 一. 前言 我在之前的文章《AI模型部署实战:利用CV-CUDA加速视觉模型部署流程》中介绍了如何使用CV-CUDA库来加速视觉模型部署的流程,但是CV-CUDA对系统版本和CUDA版本的要求比较高,在一些低版本的系统中可能无法使用。对于像我这种不会写CUDA代码又想用CUDA来加速模型部署流程的人来说要怎么办呢,其实还有一种方式,

300万元奖励!2024年成都市全球灯塔工厂申报条件奖补、认定流程指南

什么是灯塔工厂? 灯塔工厂是通过数字化、网络化和智能化手段,运用先进的工业4.0技术和理念,实现生产过程的全面自动化、精确化和优化。它不仅实现了数字化与物理世界的深度融合,而且提高了生产效率和质量,降低了制造成本,实现了高度的灵活和个性化生产。 成都市灯塔工厂申报奖补 对入选全球“灯塔工厂”的企业,给予300万元奖励。 成都市灯塔工厂申报条件 全球灯塔工厂评选标准是由美国麻省理工学院(M

常见加解密算法02 - RC4算法分析

RC4是一种广泛使用的流密码,它以其简洁和速度而闻名。区别于块密码,流密码特点在于按位或按字节来进行加密。 RC4由Ron Rivest在1987年设计,尽管它的命名看起来是第四版,实际上它是第一个对外发布的版本。 RC4算法的实施过程简洁明了,主要包括初始化和生成密钥流这两个阶段。 下面我们就一边解析算法,一边分析其代码实现。 初始化 该阶段的核心任务是利用一个可变长度的密钥来初始化一

sqlserver安装失败,提示“找不到数据库引擎启动句柄”的解决办法。

安装sqlserver2012和sqlserver2016时,一直提示“找不到数据库引擎启动句柄”。 执行“setup.exe”时,使用管理员权限启动,就不再报这个错误。

python对排列三的分析

对排列三(一种常见的彩票游戏)进行分析,我们通常关注其号码组合的可能性、中奖概率以及可能的号码趋势或模式。然而,由于排列三是基于随机抽取的,因此没有一种方法可以预测下一个中奖号码,但我们可以通过Python来分析历史数据和统计信息。 以下是一个简单的Python脚本示例,用于分析排列三的一些基本统计信息: python复制代码 from collections import Counte

安卓AccessibilityService概述与应用分析

摘要         随着信息技术的迅猛发展,智能手机已成为人们日常生活的重要组成部分。然而,对于有视觉、听力或运动障碍的用户来说,传统的交互方式存在较大的局限性。Android平台提供的AccessibilityService框架为这些用户群体带来了希望,通过增强的辅助功能服务,极大地提升了设备的无障碍使用体验。本文将全面介绍AccessibilityService的功能、实现步骤以及优势和潜

【C#】学习获取程序执行路径,Gemini 帮助分析

一、前言:         在Delphi中,如果想要获取当前执行程序的目录,程序代码如下: ExtractFilePath(ParamStr(0));         今天在分析一个别人做的C#程序时看到了一段C#代码,意思是获取执行程序所在的文件目录: public static string GetAssemblyDirectory(){var codeBaseUrl = As

【多电压流程 Multivoltage Flow】- 6.术语汇编

- **always-on**:永远不会关闭的单元、电路或电源域的特性。例如,用于隔离或保持的逻辑单元是一个始终开启的单元。 - **bubble register**:保留寄存器内部部分,即使在断电期间也始终供电并保持数据,由高阈值晶体管构成以最小化泄漏电流。也称为阴影寄存器。 - **clock gating**:通过关闭未使用电路的时钟来减少功耗的方法。 - **coars