Druid Task被Overload分配到zk上的流程分析

2024-04-29 12:58

本文主要是介绍Druid Task被Overload分配到zk上的流程分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言


         继前一篇文章关于tasksupervisor创建的过程分析,那么task被创建后是怎样分配给zk的呢?task选择middlemanager的策略又是什么?

上图


       supervisor创建完task后,会将task扔到一个由TaskMaster创建的TaskQueue中。此后的事情就是如果将TaskQueue中的task进行分配运行了。 

      TaskQueue会启动一个线程一直运行,用来读取taskqueue队列中的tasks,对于已经准备好的task通过TaskRunner进行run()。而TaskRunner是 在TaskMaster中创建的。

       TaskRunner中会判断task的状态是否为pending状态,如果是就会对该task进行分配work, 而分配策略默认是根据middlemanager的slot最大空闲数分配的。让后会将work信息、task信息通过jsonMapper进行序列化为byte写到zk的一个能被work识别的路径下面。

 上代码


        首先从TaskMaster入手,因为TaskMaster是被注入的对象,它管理着TaskQueue和TaskRunner两大对象。

       其中TaskRunner是被taskRunnerFactory创建的,而TaskRunner包括:ForkingTaskRunner,RemoteTaskRunner, HttpRemoteTaskRunner 。至于使用哪一个是通过在overload的配置文件中配置的,配置项为:druid.indexer.runner.type=remote/loacl/httpRemote

  • local表示从本地运行任务
  • remote表示分配到分布式系统中
  • httpRemote是在试用期的功能(目前是根据0.16版本分析的),和remote功能一样,只是httpRemote不通过zk而是直接和middlemanager交互

 

      然后此时创建完TaskQueue之后,taskQueue会进行启动并创建一个持续运行的线程。该线程的作用就是不断的轮询判断taskqueue中的task进行处理,调用taskRunner的run()方法做分配task的处理。

/*** Main task runner management loop. Meant to run forever, or, at least until we're stopped.*/private void manage() throws InterruptedException{log.info("Beginning management in %s.", config.getStartDelay());Thread.sleep(config.getStartDelay().getMillis());// Ignore return value- we'll get the IDs and futures from getKnownTasks later.taskRunner.restore();while (active) {giant.lock();try {// Task futures available from the taskRunnerfinal Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());}// Attain futures for all active tasks (assuming they are ready to run).// Copy tasks list, as notifyStatus may modify it.for (final Task task : ImmutableList.copyOf(tasks)) {if (!taskFutures.containsKey(task.getId())) {final ListenableFuture<TaskStatus> runnerTaskFuture;if (runnerTaskFutures.containsKey(task.getId())) {runnerTaskFuture = runnerTaskFutures.get(task.getId());} else {// Task should be running, so run it.final boolean taskIsReady;try {taskIsReady = task.isReady(taskActionClientFactory.create(task));}catch (Exception e) {log.warn(e, "Exception thrown during isReady for task: %s", task.getId());notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());continue;}if (taskIsReady) {log.info("Asking taskRunner to run: %s", task.getId());// 该部分是已分配给taskrunner管理且没有被运行的runnerTaskFuture = taskRunner.run(task);} else {continue;}}taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));} else if (isTaskPending(task)) {// if the taskFutures contain this task and this task is pending, also let the taskRunner// to run it to guarantee it will be assigned to run// see https://github.com/apache/incubator-druid/pull/6991// 判断task是否处于pending状态,如果是就进行分配运行taskRunner.run(task);}}// Kill tasks that shouldn't be runningfinal Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(),ImmutableSet.copyOf(Lists.transform(tasks,new Function<Task, Object>(){@Overridepublic String apply(Task task){return task.getId();}})));if (!tasksToKill.isEmpty()) {log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());for (final String taskId : tasksToKill) {try {taskRunner.shutdown(taskId,"task is not in runnerTaskFutures[%s]",runnerTaskFutures.keySet());}catch (Exception e) {log.warn(e, "TaskRunner failed to clean up task: %s", taskId);}}}// awaitNanos because management may become necessary without this condition signalling,// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);}finally {giant.unlock();}}}

     此时决定运行的task时候,让taskRunnner根据策略去寻找work进行分配。判断出处于pending状态的task进行分配。

/*** This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.* This method should be run each time there is new worker capacity or if new tasks are assigned.*/private void runPendingTasks(){runPendingTasksExec.submit(new Callable<Void>(){@Overridepublic Void call(){try {// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them// into running statusList<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());sortByInsertionTime(copy);for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {String taskId = taskRunnerWorkItem.getTaskId();if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {try {//this can still be null due to race from explicit task shutdown request//or if another thread steals and completes this task right after this thread makes copy//of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .Task task = pendingTaskPayloads.get(taskId);// 试图去分配taskif (task != null && tryAssignTask(task, taskRunnerWorkItem)) {pendingTaskPayloads.remove(taskId);}}catch (Exception e) {log.makeAlert(e, "Exception while trying to assign task").addData("taskId", taskRunnerWorkItem.getTaskId()).emit();RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);if (workItem != null) {taskComplete(workItem, null, TaskStatus.failure(taskId));}}finally {tryAssignTasks.remove(taskId);}}}}catch (Exception e) {log.makeAlert(e, "Exception in running pending tasks").emit();}return null;}});}

     根据策略以及过滤条件得到task要分配的work的信息:

/*** Ensures no workers are already running a task before assigning the task to a worker.* It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR* needs to bootstrap after a restart.** @param taskRunnerWorkItem - the task to assign** @return true iff the task is now assigned*/private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception{Preconditions.checkNotNull(task, "task");Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {log.info("Task[%s] already running.", task.getId());return true;} else {// Nothing running this task, announce it in ZK for a worker to run itWorkerBehaviorConfig workerConfig = workerConfigRef.get();//  确定分配的策略WorkerSelectStrategy strategy;if (workerConfig == null || workerConfig.getSelectStrategy() == null) {strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());} else {strategy = workerConfig.getSelectStrategy();}ZkWorker assignedWorker = null;final ImmutableWorkerInfo immutableZkWorker;try {synchronized (workersWithUnacknowledgedTask) {immutableZkWorker = strategy.findWorkerForTask(config,ImmutableMap.copyOf(Maps.transformEntries(Maps.filterEntries(zkWorkers,new Predicate<Map.Entry<String, ZkWorker>>(){@Overridepublic boolean apply(Map.Entry<String, ZkWorker> input){return !lazyWorkers.containsKey(input.getKey()) &&!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&!blackListedWorkers.contains(input.getValue());}}),(String key, ZkWorker value) -> value.toImmutable())),task);if (immutableZkWorker != null &&workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())== null) {assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());}}if (assignedWorker != null) {// 得到work信息后进行发布到zk上return announceTask(task, assignedWorker, taskRunnerWorkItem);} else {log.debug("Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",task.getId(),zkWorkers.values(),workersWithUnacknowledgedTask);}return false;}finally {if (assignedWorker != null) {workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());//if this attempt won the race to run the task then other task might be able to use this worker now after task ack.runPendingTasks();}}}}

      根据task信息和work信息写入zk的相应路径下面:

/*** Creates a ZK entry under a specific path associated with a worker. The worker is responsible for* removing the task ZK entry and creating a task status ZK entry.** @param theZkWorker        The worker the task is assigned to* @param taskRunnerWorkItem The task to be assigned** @return boolean indicating whether the task was successfully assigned or not*/private boolean announceTask(final Task task,final ZkWorker theZkWorker,final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception{Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");final String worker = theZkWorker.getWorker().getHost();synchronized (statusLock) {if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {// the worker might have been killed or marked as lazylog.info("Not assigning task to already removed worker[%s]", worker);return false;}log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());// 将task的信息写到相应的的zk路径下面CuratorUtils.createIfNotExists(cf,JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),CreateMode.EPHEMERAL,jsonMapper.writeValueAsBytes(task),config.getMaxZnodeBytes());// ....}

END


    至此,要运行的task得到分配的work写到zk路径的核心代码以及过程就如上文所述。为了保证task能够准确无误的运行起来会很多细节性的逻辑判断,状态存储

这篇关于Druid Task被Overload分配到zk上的流程分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/946149

相关文章

视频智能检测AI智能分析网关V4告警消息推送:公众号消息推送的配置步骤介绍

TSINGSEE青犀智能分析网关V4属于高性能、低功耗的软硬一体AI边缘计算硬件设备,目前拥有3种型号(8路/16路/32路),支持Caffe/DarkNet/TensorFlow/PyTorch/MXNet/ONNX/PaddlePaddle等主流深度学习框架。硬件内部署了近40种AI算法模型,算力高达17.6/32Tops的INT8峰值算力,2.2T的FB32高精度算力,支持高达16/32路1

经验分享智能产品从0到1全流程

大家好,今天继续分享文章,这篇文章在网络上搜索资料时,有感而发,分享一个智能产品从0到1的整个生命周期中需要经历哪些阶段,我这里以开发一个mini补光灯为例,深入探索各个阶段可能涉及的具体活动和考虑事项: 1. 市场分析 在这一阶段,你需要收集和分析关于补光灯市场的信息,包括目标用户群体(例如摄影师、视频博主、普通消费者等),了解他们的具体需求和偏好。竞争分析也很重要,需要识别主要竞争者及其产

8评分卡建模整体流程梳理

评分卡建模整体流程梳理 学习目标 掌握评分卡建模流程使用Toad库构建评分卡 1 加载数据 import pandas as pd from sklearn.metrics import roc_auc_score,roc_curve,auc from sklearn.model_selection import train_test_split from sklearn.l

hvigor ERROR: Task PreviewBuild,--watch not found in hvigor node: entry.”

harmony deveco 配置问题处理 这是第三次配置 harmony,以为会一切顺利,不想会遇到这个问题。环境都安装好后创建一个 demo 后预览失效,虚拟机启动运行正常。 通常遇到失败的情况直接把 devtool 的命令直接拿到命令行中执行一遍就能看到错误的大概原因。 这次的运行命令如下 >node ~/.hvigor/project_caches/abd44fcf2ba26abb

联丰策略炒股官网分析地产链条中的家电,一个不能再忽视的板块

查查配“上涨放量,盘整缩量”是近期市场的一个重要特征,这说明空头衰竭、新的做多力量或正在蓄力。昨天我们也以调查问卷的方式与大家进行了讨论,对于市场未来将会如何演绎?近一半投票认为“牛在路上,逢低加仓”。与此同时,当前市场中,多条主线还在发力,比如地产链条中的家电,今天就重点聊一聊这个板块。 联丰策略拥有一支由知名互联网公司和国内证券金融机构的行业专家组成的一流运营团队。凭借他们在互联网产品开发和

runtime pm的实例分析

概念 运行时的PM与前文描述的系统级挂起到RAM时候的PM不太一样,它是针对单个设备,指系统在非睡眠状态的情况下,某个设备在空闲时可以进入运行时挂起状态,而在不是空闲时执行运行时恢复使得设备进入正常工作状态,如此,这个设备在运行时会省电 struct dev_pm_ops {...int (*runtime_suspend)(struct device *dev);int (*runti

冯喜运:5.14今日黄金原油涨跌走势分析操作建议

【黄金消息面分析】:本周黄金市场将迎来关键的美国通胀数据,包括周二的生产者价格指数(PPI)和周三的消费者物价指数(CPI)。这些数据对美联储的政策路径至关重要,可能会影响市场对利率调整的预期。目前,现货黄金价格小幅上涨,日内涨幅0.13%,报2339.05美元/盎司,显示出市场对即将公布的经济数据的谨慎态度。上周发布的美国4月非农就业报告不及预期,增强了市场对今年降息的预期。多数分析师预计,

Python实战开发及案例分析(25)—— 爬山算法

爬山算法(Hill Climbing)是一种启发式搜索算法,常用于解决优化问题。它的核心思想是从一个初始解开始,不断朝着增益最大的方向移动,直到达到局部最优解。 实现步骤 从初始解开始。在当前解的邻域中找到一个更好的解。如果找到的解比当前解好,则移动到该解,并重复步骤2。如果在邻域中没有找到更好的解,则算法终止,返回当前解。 代码示例         以下示例展示了如何使用

数据治理框架下,如何实现高效且安全的数据提取与分析

一、引言 随着数字化时代的到来,数据已成为企业运营和决策的核心资产。然而,数据的复杂性和多样性也为企业带来了数据提取与分析的挑战。为了实现数据的有效利用,并确保数据的安全性,需要在数据治理框架下构建高效且安全的数据提取与分析体系。 二、数据治理框架概述 数据治理框架是一个系统性的方法,用于确保数据的质量、可用性、安全性和合规性。它涵盖了数据的全生命周期,包括数据的收集、存储、处理、分析和共享

用 Python 分析中国大学分布,终于知道为什么好大学难上了

这是「进击的Coder」的第 448 篇技术分享 作者:周萝卜 来源:萝卜大杂烩 “ 阅读本文大概需要 4 分钟。 ” 大家好,今天重点从高等学府的分布情况来分析,不同省份考取名牌大学的难易程度。 都说高考其实是相对公平的选拔,那么今天我们就用数据来说话,看看全国的教育资源,高校分布到底是怎样,哪里的小伙伴相对来说,更容易踏入大学的校门呢。 数据还是来自于高考网,网站比较简单,获取数据相对容易