Apache IoTDB 查询引擎源码阅读——DataNode 上 DriverTask 调度与执行

本文主要是介绍Apache IoTDB 查询引擎源码阅读——DataNode 上 DriverTask 调度与执行,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

Apache IoTDB 查询引擎目前采用 MPP 架构,一条查询 SQL 大致会经历下图几个阶段:

FragmentInstance 是分布式计划被拆分后实际分发到各个节点进行执行的实例。由于每个节点会同时接收来自于多个并发 Query 的多个 FragmentInstance,这些 FragmentInstance 在执行时可能由于等待上游数据而处于阻塞状态、或者数据就绪可以执行、或者超时需要被取消。因此,需要一个较为合理的调度策略,保证在分配给 FragmentInstance 的有限资源内,能够满足高并发的查询需求,同时尽可能避免出现饿死或者死锁情况。

在具体实现中,查询引擎里真正执行查询计算的算子树 Operator Tree 是类 Driver 的一个成员变量,Driver 负责控制Operator 的运行。DriverTask 是 Driver 的一层封装,也是调度模块真正调度的对象。一个 FragmentInstance 可能对应多个 Driver,而 Driver 与 DriverTask 是一一对应的。

本文主要介绍 Apache IoTDB 查询引擎在 DataNode 上如何调度和执行 DriverTask。相关代码位于包 org.apache.iotdb.db.mpp.execution.schedule

DriverTask 调度与执行

调度模块维护了两个队列:

  • ReadyQueue:处于 Ready 状态的 DriverTask 队列

  • TimeoutQueue:所有当前节点未结束的 DriverTask 按照超时时间排序的队列

同时处于 Blocked 状态的 DriverTask 会被放入集合 blockedTasks 进行记录。

总体而言,DriverTask 的调度执行参考了协程思想和操作系统任务调度机制。分配给查询引擎调度模块的线程数是固定的,可以通过配置项更改。来自于不同节点的 FragmentInstance 的 DriverTask 在 init 时会被加入 ReadyQueue。执行线程会不断拉取 ReadyQueue 队头的任务进行执行,每次只执行一个时间片,然后根据 DriverTask 的状态决定是否要将 DriverTask 重新放回 ReadyQueue。可以结合下图帮助理解:

![截屏2023-02-15 19.32.21](/Users/lly/Desktop/截屏2023-02-15 19.32.21.png)

DriverTask 完整的生命周期与状态

如上图所示,目前 DriverTask 的状态包括:

  • Ready:就绪状态。在以下场景,DriverTask 会处于 Ready 状态:

  • 新建 DriverTask 时,状态会被设置为 Ready,然后加入到 ReadyQueue 中。

  • 当 DriverTask 依赖的上游数据就绪时,DataBlockManager 会调用回调接口,将其状态从 Blocked 改为 Ready,并从 blockedTasks 中移除。

  • 分配的时间片用完,会进入 ReadyQueue,并且状态从 Running 转换成 Ready。

  • Blocked:阻塞状态。在以下场景,DriverTask 会处于阻塞状态:

  • 依赖的上游数据为空,需要等待上游数据时,会从 Running 状态改为 Blocked,并放入 blockedTasks。

  • 向下游输出数据的 buffer 已满暂时无法发送数据,会从 Running 状态改为 Blocked。

  • Running:执行中状态。当处于 Ready 的 DriverTask 被线程调度时,从 ReadyQueue 中移除,状态变为 Running。

  • Finished:完成状态。DriverTask 变为完成状态后,调度模块会清理此 DriverTask 的信息。

  • Aborted:终止状态。在任何情况下,出现以下情况,DriverTask 会立即进入终止状态,并释放所有申请的资源。

  • 执行超时

  • 用户中断了查询

  • 不可恢复的 Exception

调度模块重要组件

上图包含了调度模块的一些重要组件,下面对调度模块重要组件进行介绍,理解这些组件的作用可以帮助您更好地阅读源码。

Worker Thread

真正负责执行 DriverTask 的物理线程,具体实现类为 DriverSchedulerThread。数量可通过配置参数进行配置,实例启动后不可改变。

DriverSchedulerThread 的实现:

  • 生命周期和查询引擎一致。

  • 内部主体为无限循环,只通过 InterruptedException 中断(当服务停止时会发送 InterruptedException)。

  • 循环会尝试去 ReadyQueue 拉取队头的 DriverTask。若队列为空,则 Worker Thread 进入阻塞状态。

  • Worker Thread 在执行 DriverTask 时,会调用 DriverTask.processFor(),然后返回 ListenableFuture。为了保证 Worker Thread 不会因为某个执行时间较长的 DriverTask 导致其他 DriverTask 饿死,引入了时间片机制。当 Driver#processFor 方法会接收一个时间片长度作为参数,processFor 会运行时间片长度的时间,执行时间超过时间片长度时,processFor() 方法会结束运行,然后返回一个 Future。(目前的时间片为代码内置的常量: 100ms。后续可能会考虑变成用户可配置的项。但是需要有范围值保护。过大的时间片会使得此机制失效,过小的则会频繁触发 DriverTask 的状态切换,影响执行效率。)

  • 根据返回的 Future,会有如下操作:

  • 若 Future 被 cancel,则终止当前 DriverTask 的执行,将其设置为 Aborted 状态。

  • 若执行完成,则将 DriverTask 置为 Finished 状态。

  • 若时间片用完,则将 DriverTask 置为 Ready 状态,计算并更新调度权重,将 DriverTask 加入到 ReadyQueue。

  • 若是因阻塞导致执行权让出,则将 DriverTask 置为 Blocked 状态,并注册 Blocked → Ready 的回调逻辑。

具体流程可以结合下图进行理解:

代码实现为:

public void execute(DriverTask task) throws InterruptedException {long startNanos = ticker.read();// try to switch it to RUNNINGif (!scheduler.readyToRunning(task)) {return;}IDriver driver = task.getDriver();CpuTimer timer = new CpuTimer();ListenableFuture<?> future = driver.processFor(EXECUTION_TIME_SLICE);CpuTimer.CpuDuration duration = timer.elapsedTime();// If the future is cancelled, the task is in an error and should be thrown.if (future.isCancelled()) {task.setAbortCause(DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED);scheduler.toAborted(task);return;}long quantaScheduledNanos = ticker.read() - startNanos;ExecutionContext context = new ExecutionContext();context.setCpuDuration(duration);context.setScheduledTimeInNanos(quantaScheduledNanos);context.setTimeSlice(EXECUTION_TIME_SLICE);if (driver.isFinished()) {scheduler.runningToFinished(task, context);return;}if (future.isDone()) {scheduler.runningToReady(task, context);} else {scheduler.runningToBlocked(task, context);future.addListener(() -> {try (SetThreadName driverTaskName2 =new SetThreadName(task.getDriver().getDriverTaskId().getFullId())) {scheduler.blockedToReady(task);}},listeningExecutor);}}
}

Sentinel Thread

负责监控 DriverTask 超时的物理线程,全局唯一,具体实现类为 DriverTaskTimeoutSentinelThread。

DriverTaskTimeoutSentinelThread 的实现:

  • 内部主体为无限循环,只通过 InterruptedException 中断(当服务停止时会发送 InterruptedException)。

  • 尝试去 timeoutQueue 拉取队头的 DriverTask。若队列为空,则 Sentinel Thread 进入睡眠状态。

  • Sentinel 在拉取 DriverTask 时,会判断当前系统时间是否超过了超时时间:

  • 若超时,则将状态置为 Aborted 状态,走超时处理逻辑。

  • 若未超时,则睡眠至超时时间,将状态置为 Aborted 状态,走超时处理逻辑。

可以结合下图进行理解:

优先调度队列 ReadyQueue

目前实现参考了 Trino 的 MultilevelSplitQueue,在 IoTDB 里的实现类为 MultilevelPriorityQueue,设计思路可以参考博客 Trino 源码阅读 —— MultiLevelSplitQueue 调度机制。

该队列特点:

  • 线程安全。

  • 是一个阻塞队列,有最大长度限制。

  • 存在任务降级机制,设计初衷是避免任务出现饿死,提升 CPU 利用率。

超时队列 TimeoutQueue

根据 DriverTask 的超时 deadline 排序的最大堆,超时时间越早的 DriverTask 就会被先做超时检查。

该队列长度应该有最大限制。

该队列特点:

  • 线程安全。

  • 按照 DriverTask 的调度权重排序,在 O(lgn) 的时间复杂度内完成队列元素的 pull 和 push。

  • 有根据 DriverTask 的 id 做索引查询的能力,能够在 O(lgn) 的时间复杂度内完成随机元素的删除。

阻塞任务集合 BlockedTasks

处于 Blocked 状态的 DriverTask 的集合,线程安全,在 O(1) 的时间复杂度内完成元素的读取。

调度器 DriverScheduler

调度模块的核心,持有线程资源,即之前提到的 WorkerThread 和 SentinelThread。维护了 ReadyQueue 和 TimeoutQueue,FragmentInstance 可以通过 DriverScheduler 提交 Driver,DriverScheduler 负责将 Driver 封装成 DriverTask 并进一步执行。

DriverScheduler 负责切换 DriverTask 的状态,主要通过内部类 Scheduler 完成。ITaskScheduler 定义了切换 DriverTask 状态的接口,Scheduler 实现了这些接口。接口定义如下:

/** the scheduler interface of {@link DriverTask} */
public interface ITaskScheduler {/*** Switch a task from {@link DriverTaskStatus#BLOCKED} to {@link DriverTaskStatus#READY}.** @param task the task to be switched.*/void blockedToReady(DriverTask task);/*** Switch a task from {@link DriverTaskStatus#READY} to {@link DriverTaskStatus#RUNNING}.** @param task the task to be switched.* @return true if it's switched to the target status successfully, otherwise false.*/boolean readyToRunning(DriverTask task);/*** Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#READY}.** @param task the task to be switched.* @param context the execution context of last running.*/void runningToReady(DriverTask task, ExecutionContext context);/*** Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#BLOCKED}.** @param task the task to be switched.* @param context the execution context of last running.*/void runningToBlocked(DriverTask task, ExecutionContext context);/*** Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#FINISHED}.** @param task the task to be switched.* @param context the execution context of last running.*/void runningToFinished(DriverTask task, ExecutionContext context);/*** Switch a task to {@link DriverTaskStatus#ABORTED}.** @param task the task to be switched.*/void toAborted(DriverTask task);
Blocked→ Ready

总体流程可以参考下图:

红色三角处表示,当获取到锁之后,还需要再次确认 DriverTask 状态是否符合预期(在排队等锁时可能被 SentinelThread 改为 Aborted 状态)。若为 Aborted 状态,则后续流程全部跳过。

代码实现为:

@Override
public void blockedToReady(DriverTask task) {task.lock();try {if (task.getStatus() != DriverTaskStatus.BLOCKED) {return;}task.setStatus(DriverTaskStatus.READY);QUERY_METRICS.recordTaskQueueTime(BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());task.setLastEnterReadyQueueTime(System.nanoTime());task.resetLevelScheduledTime();readyQueue.push(task);blockedTasks.remove(task);} finally {task.unlock();}
}
Running -> Ready

计算并更新调度权重,将 DriverTask 加入到 ReadyQueue。

@Override
public void runningToReady(DriverTask task, ExecutionContext context) {task.lock();try {if (task.getStatus() != DriverTaskStatus.RUNNING) {return;}task.updateSchedulePriority(context);task.setStatus(DriverTaskStatus.READY);task.setLastEnterReadyQueueTime(System.nanoTime());readyQueue.push(task);} finally {task.unlock();}
}
Running -> Blocked

更新调度权重,然后将 DriverTask 加入 blockedTasks。

@Override
public void runningToBlocked(DriverTask task, ExecutionContext context) {task.lock();try {if (task.getStatus() != DriverTaskStatus.RUNNING) {return;}task.updateSchedulePriority(context);task.setStatus(DriverTaskStatus.BLOCKED);task.setLastEnterBlockQueueTime(System.nanoTime());blockedTasks.add(task);} finally {task.unlock();}
}
Running -> Finished

更新调度权重,清理 DriverTask 相关信息。

@Override
public void runningToFinished(DriverTask task, ExecutionContext context) {task.lock();try {if (task.getStatus() != DriverTaskStatus.RUNNING) {return;}task.updateSchedulePriority(context);task.setStatus(DriverTaskStatus.FINISHED);clearDriverTask(task);} finally {task.unlock();}
}
toAborted

由于同一个 FragmentInstance 的 DriverTask 之间有依赖性,一个 DriverTask 被置为 Aborted,其余相关的 DriverTask 也应该被置为 Aborted。

@Override
public void toAborted(DriverTask task) {try (SetThreadName driverTaskName =new SetThreadName(task.getDriver().getDriverTaskId().getFullId())) {task.lock();try {// If a task is already in an end state, it indicates that the task is finalized in other// threads.if (task.isEndState()) {return;}logger.warn("The task {} is aborted. All other tasks in the same query will be cancelled",task.getDriverTaskId());clearDriverTask(task);} finally {task.unlock();}QueryId queryId = task.getDriverTaskId().getQueryId();Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId);if (queryRelatedTasks != null) {for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {if (fragmentRelatedTasks != null) {for (DriverTask otherTask : fragmentRelatedTasks) {if (task.equals(otherTask)) {continue;}otherTask.lock();try {otherTask.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);clearDriverTask(otherTask);} finally {otherTask.unlock();}}}}}}
}

这篇关于Apache IoTDB 查询引擎源码阅读——DataNode 上 DriverTask 调度与执行的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/760948

相关文章

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

java中ssh2执行多条命令的四种方法

《java中ssh2执行多条命令的四种方法》本文主要介绍了java中ssh2执行多条命令的四种方法,包括分号分隔、管道分隔、EOF块、脚本调用,可确保环境配置生效,提升操作效率,具有一定的参考价值,感... 目录1 使用分号隔开2 使用管道符号隔开3 使用写EOF的方式4 使用脚本的方式大家平时有没有遇到自

mybatis直接执行完整sql及踩坑解决

《mybatis直接执行完整sql及踩坑解决》MyBatis可通过select标签执行动态SQL,DQL用ListLinkedHashMap接收结果,DML用int处理,注意防御SQL注入,优先使用#... 目录myBATiFBNZQs直接执行完整sql及踩坑select语句采用count、insert、u

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

一个Java的main方法在JVM中的执行流程示例详解

《一个Java的main方法在JVM中的执行流程示例详解》main方法是Java程序的入口点,程序从这里开始执行,:本文主要介绍一个Java的main方法在JVM中执行流程的相关资料,文中通过代码... 目录第一阶段:加载 (Loading)第二阶段:链接 (Linking)第三阶段:初始化 (Initia

基于Go语言开发一个 IP 归属地查询接口工具

《基于Go语言开发一个IP归属地查询接口工具》在日常开发中,IP地址归属地查询是一个常见需求,本文将带大家使用Go语言快速开发一个IP归属地查询接口服务,有需要的小伙伴可以了解下... 目录功能目标技术栈项目结构核心代码(main.go)使用方法扩展功能总结在日常开发中,IP 地址归属地查询是一个常见需求:

MySQL之复合查询使用及说明

《MySQL之复合查询使用及说明》文章讲解了SQL复合查询中emp、dept、salgrade三张表的使用,涵盖多表连接、自连接、子查询(单行/多行/多列)及合并查询(UNION/UNIONALL)等... 目录复合查询基本查询回顾多表查询笛卡尔积自连接子查询单行子查询多行子查询多列子查询在from子句中使

java 恺撒加密/解密实现原理(附带源码)

《java恺撒加密/解密实现原理(附带源码)》本文介绍Java实现恺撒加密与解密,通过固定位移量对字母进行循环替换,保留大小写及非字母字符,由于其实现简单、易于理解,恺撒加密常被用作学习加密算法的入... 目录Java 恺撒加密/解密实现1. 项目背景与介绍2. 相关知识2.1 恺撒加密算法原理2.2 Ja

Nginx屏蔽服务器名称与版本信息方式(源码级修改)

《Nginx屏蔽服务器名称与版本信息方式(源码级修改)》本文详解如何通过源码修改Nginx1.25.4,移除Server响应头中的服务类型和版本信息,以增强安全性,需重新配置、编译、安装,升级时需重复... 目录一、背景与目的二、适用版本三、操作步骤修改源码文件四、后续操作提示五、注意事项六、总结一、背景与

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码