Druid Task被Overload分配到zk上的流程分析

2024-04-29 12:58

本文主要是介绍Druid Task被Overload分配到zk上的流程分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言


         继前一篇文章关于tasksupervisor创建的过程分析,那么task被创建后是怎样分配给zk的呢?task选择middlemanager的策略又是什么?

上图


       supervisor创建完task后,会将task扔到一个由TaskMaster创建的TaskQueue中。此后的事情就是如果将TaskQueue中的task进行分配运行了。 

      TaskQueue会启动一个线程一直运行,用来读取taskqueue队列中的tasks,对于已经准备好的task通过TaskRunner进行run()。而TaskRunner是 在TaskMaster中创建的。

       TaskRunner中会判断task的状态是否为pending状态,如果是就会对该task进行分配work, 而分配策略默认是根据middlemanager的slot最大空闲数分配的。让后会将work信息、task信息通过jsonMapper进行序列化为byte写到zk的一个能被work识别的路径下面。

 上代码


        首先从TaskMaster入手,因为TaskMaster是被注入的对象,它管理着TaskQueue和TaskRunner两大对象。

       其中TaskRunner是被taskRunnerFactory创建的,而TaskRunner包括:ForkingTaskRunner,RemoteTaskRunner, HttpRemoteTaskRunner 。至于使用哪一个是通过在overload的配置文件中配置的,配置项为:druid.indexer.runner.type=remote/loacl/httpRemote

  • local表示从本地运行任务
  • remote表示分配到分布式系统中
  • httpRemote是在试用期的功能(目前是根据0.16版本分析的),和remote功能一样,只是httpRemote不通过zk而是直接和middlemanager交互

 

      然后此时创建完TaskQueue之后,taskQueue会进行启动并创建一个持续运行的线程。该线程的作用就是不断的轮询判断taskqueue中的task进行处理,调用taskRunner的run()方法做分配task的处理。

/*** Main task runner management loop. Meant to run forever, or, at least until we're stopped.*/private void manage() throws InterruptedException{log.info("Beginning management in %s.", config.getStartDelay());Thread.sleep(config.getStartDelay().getMillis());// Ignore return value- we'll get the IDs and futures from getKnownTasks later.taskRunner.restore();while (active) {giant.lock();try {// Task futures available from the taskRunnerfinal Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new HashMap<>();for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());}// Attain futures for all active tasks (assuming they are ready to run).// Copy tasks list, as notifyStatus may modify it.for (final Task task : ImmutableList.copyOf(tasks)) {if (!taskFutures.containsKey(task.getId())) {final ListenableFuture<TaskStatus> runnerTaskFuture;if (runnerTaskFutures.containsKey(task.getId())) {runnerTaskFuture = runnerTaskFutures.get(task.getId());} else {// Task should be running, so run it.final boolean taskIsReady;try {taskIsReady = task.isReady(taskActionClientFactory.create(task));}catch (Exception e) {log.warn(e, "Exception thrown during isReady for task: %s", task.getId());notifyStatus(task, TaskStatus.failure(task.getId()), "failed because of exception[%s]", e.getClass());continue;}if (taskIsReady) {log.info("Asking taskRunner to run: %s", task.getId());// 该部分是已分配给taskrunner管理且没有被运行的runnerTaskFuture = taskRunner.run(task);} else {continue;}}taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));} else if (isTaskPending(task)) {// if the taskFutures contain this task and this task is pending, also let the taskRunner// to run it to guarantee it will be assigned to run// see https://github.com/apache/incubator-druid/pull/6991// 判断task是否处于pending状态,如果是就进行分配运行taskRunner.run(task);}}// Kill tasks that shouldn't be runningfinal Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(),ImmutableSet.copyOf(Lists.transform(tasks,new Function<Task, Object>(){@Overridepublic String apply(Task task){return task.getId();}})));if (!tasksToKill.isEmpty()) {log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());for (final String taskId : tasksToKill) {try {taskRunner.shutdown(taskId,"task is not in runnerTaskFutures[%s]",runnerTaskFutures.keySet());}catch (Exception e) {log.warn(e, "TaskRunner failed to clean up task: %s", taskId);}}}// awaitNanos because management may become necessary without this condition signalling,// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);}finally {giant.unlock();}}}

     此时决定运行的task时候,让taskRunnner根据策略去寻找work进行分配。判断出处于pending状态的task进行分配。

/*** This method uses a multi-threaded executor to extract all pending tasks and attempt to run them. Any tasks that* are successfully assigned to a worker will be moved from pendingTasks to runningTasks. This method is thread-safe.* This method should be run each time there is new worker capacity or if new tasks are assigned.*/private void runPendingTasks(){runPendingTasksExec.submit(new Callable<Void>(){@Overridepublic Void call(){try {// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them// into running statusList<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());sortByInsertionTime(copy);for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {String taskId = taskRunnerWorkItem.getTaskId();if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {try {//this can still be null due to race from explicit task shutdown request//or if another thread steals and completes this task right after this thread makes copy//of pending tasks. See https://github.com/apache/incubator-druid/issues/2842 .Task task = pendingTaskPayloads.get(taskId);// 试图去分配taskif (task != null && tryAssignTask(task, taskRunnerWorkItem)) {pendingTaskPayloads.remove(taskId);}}catch (Exception e) {log.makeAlert(e, "Exception while trying to assign task").addData("taskId", taskRunnerWorkItem.getTaskId()).emit();RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);if (workItem != null) {taskComplete(workItem, null, TaskStatus.failure(taskId));}}finally {tryAssignTasks.remove(taskId);}}}}catch (Exception e) {log.makeAlert(e, "Exception in running pending tasks").emit();}return null;}});}

     根据策略以及过滤条件得到task要分配的work的信息:

/*** Ensures no workers are already running a task before assigning the task to a worker.* It is possible that a worker is running a task that the RTR has no knowledge of. This occurs when the RTR* needs to bootstrap after a restart.** @param taskRunnerWorkItem - the task to assign** @return true iff the task is now assigned*/private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception{Preconditions.checkNotNull(task, "task");Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {log.info("Task[%s] already running.", task.getId());return true;} else {// Nothing running this task, announce it in ZK for a worker to run itWorkerBehaviorConfig workerConfig = workerConfigRef.get();//  确定分配的策略WorkerSelectStrategy strategy;if (workerConfig == null || workerConfig.getSelectStrategy() == null) {strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;log.debug("No worker selection strategy set. Using default of [%s]", strategy.getClass().getSimpleName());} else {strategy = workerConfig.getSelectStrategy();}ZkWorker assignedWorker = null;final ImmutableWorkerInfo immutableZkWorker;try {synchronized (workersWithUnacknowledgedTask) {immutableZkWorker = strategy.findWorkerForTask(config,ImmutableMap.copyOf(Maps.transformEntries(Maps.filterEntries(zkWorkers,new Predicate<Map.Entry<String, ZkWorker>>(){@Overridepublic boolean apply(Map.Entry<String, ZkWorker> input){return !lazyWorkers.containsKey(input.getKey()) &&!workersWithUnacknowledgedTask.containsKey(input.getKey()) &&!blackListedWorkers.contains(input.getValue());}}),(String key, ZkWorker value) -> value.toImmutable())),task);if (immutableZkWorker != null &&workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.getWorker().getHost(), task.getId())== null) {assignedWorker = zkWorkers.get(immutableZkWorker.getWorker().getHost());}}if (assignedWorker != null) {// 得到work信息后进行发布到zk上return announceTask(task, assignedWorker, taskRunnerWorkItem);} else {log.debug("Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",task.getId(),zkWorkers.values(),workersWithUnacknowledgedTask);}return false;}finally {if (assignedWorker != null) {workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());//if this attempt won the race to run the task then other task might be able to use this worker now after task ack.runPendingTasks();}}}}

      根据task信息和work信息写入zk的相应路径下面:

/*** Creates a ZK entry under a specific path associated with a worker. The worker is responsible for* removing the task ZK entry and creating a task status ZK entry.** @param theZkWorker        The worker the task is assigned to* @param taskRunnerWorkItem The task to be assigned** @return boolean indicating whether the task was successfully assigned or not*/private boolean announceTask(final Task task,final ZkWorker theZkWorker,final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception{Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");final String worker = theZkWorker.getWorker().getHost();synchronized (statusLock) {if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {// the worker might have been killed or marked as lazylog.info("Not assigning task to already removed worker[%s]", worker);return false;}log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());// 将task的信息写到相应的的zk路径下面CuratorUtils.createIfNotExists(cf,JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),CreateMode.EPHEMERAL,jsonMapper.writeValueAsBytes(task),config.getMaxZnodeBytes());// ....}

END


    至此,要运行的task得到分配的work写到zk路径的核心代码以及过程就如上文所述。为了保证task能够准确无误的运行起来会很多细节性的逻辑判断,状态存储

这篇关于Druid Task被Overload分配到zk上的流程分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/946149

相关文章

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空

Spring Security中用户名和密码的验证完整流程

《SpringSecurity中用户名和密码的验证完整流程》本文给大家介绍SpringSecurity中用户名和密码的验证完整流程,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定... 首先创建了一个UsernamePasswordAuthenticationTChina编程oken对象,这是S

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,

Java Stream的distinct去重原理分析

《JavaStream的distinct去重原理分析》Javastream中的distinct方法用于去除流中的重复元素,它返回一个包含过滤后唯一元素的新流,该方法会根据元素的hashcode和eq... 目录一、distinct 的基础用法与核心特性二、distinct 的底层实现原理1. 顺序流中的去重

Android ViewBinding使用流程

《AndroidViewBinding使用流程》AndroidViewBinding是Jetpack组件,替代findViewById,提供类型安全、空安全和编译时检查,代码简洁且性能优化,相比Da... 目录一、核心概念二、ViewBinding优点三、使用流程1. 启用 ViewBinding (模块级

关于MyISAM和InnoDB对比分析

《关于MyISAM和InnoDB对比分析》:本文主要介绍关于MyISAM和InnoDB对比分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录开篇:从交通规则看存储引擎选择理解存储引擎的基本概念技术原理对比1. 事务支持:ACID的守护者2. 锁机制:并发控制的艺

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c