7.实现任务的rebalance

2023-12-18 07:12
文章标签 实现 任务 rebalance

本文主要是介绍7.实现任务的rebalance,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.设计

1.1 背景

系统启动后,所有任务都在被执行,如果这时某个节点宕机,那它负责的任务就不能执行了,这对有稳定性要求的任务是不能接受的,所以系统要实现rebalance的功能。

1.2 设计

下面是Job分配与执行的业务点,重分配就是在 follower下线、controller下线、节点新上线进行重分配。理清楚接下来实现就是水到渠成了

2. 实现

2.1 RebalanceJobType

定义了重平衡job的类型

public enum RebalanceJobType {FOLLOWER_OFFLINE(0), CONTROLLER_OFFLINE(1), NODE_ONLINE(2);private int code;RebalanceJobType(int code) {this.code = code;}public boolean isFollowerOffline() {return this.code == FOLLOWER_OFFLINE.code;}public boolean isControllerOffline() {return this.code == CONTROLLER_OFFLINE.code;}public boolean isNodeOnline() {return this.code == NODE_ONLINE.code;}}

2.2 AverageJobAllotStrategy

添加了 rebalanceJob的方法,只有Controller才能调用,对不同的重平衡情况进行分别处理

private Map<Long, List<DttaskJob>> getDttaskJobMap() {List<DttaskJob> allDttaskJob = getAllDttaskJob();return average(allDttaskJob);
}@Override
public void rebalanceJob(RebalanceJobContext rebalanceJobContext) {if (rebalanceJobContext.getType().isFollowerOffline()|| rebalanceJobContext.getType().isControllerOffline()) {long offlineServerId = rebalanceJobContext.getServerId();log.info("{}节点={}下线->重平衡job={}",rebalanceJobContext.getType().isFollowerOffline() ? "follower" : "controller",offlineServerId,rebalanceJobContext);List<DttaskJob> dttaskJobs = getByDttaskId(offlineServerId);List<NodeInfo> nodeInfoList = ServerInfo.getNodeInfoList();Map<Long, List<DttaskJob>> allotMap = new HashMap<>();int i = 0;int nodeCount = nodeInfoList.size();while (i < dttaskJobs.size()) {DttaskJob dttaskJob = dttaskJobs.get(i);NodeInfo nodeInfo = nodeInfoList.get(i % nodeCount);i++;List<DttaskJob> dttaskJobList = allotMap.getOrDefault(nodeInfo.getServerId(), new ArrayList<>());dttaskJobList.add(dttaskJob);allotMap.put(nodeInfo.getServerId(), dttaskJobList);}executeDttaskJob(new ExecuteDttaskJobContext(allotMap, true));} else if (rebalanceJobContext.getType().isNodeOnline()) {log.info("节点上线->重平衡job={}", rebalanceJobContext);long onlineServerId = rebalanceJobContext.getServerId();Map<Long, List<DttaskJob>> dttaskJobMap = BeanUseHelper.entityHelpService().queryDttaskJob();Map<Long, List<DttaskJob>> allotDttaskJobMap = getDttaskJobMap();Map<Long, List<DttaskJob>> stopDttaskJobMapOfOldNodes = new HashMap<>();Map<Long, List<DttaskJob>> startDttaskJobMapOfNewNodes = new HashMap<>();List<DttaskJob> startDttaskJobs = new ArrayList<>();dttaskJobMap.forEach((serverId, dttaskJobList) -> {int size = dttaskJobList.size();int newSize = allotDttaskJobMap.get(serverId).size();if (size > newSize) {List<DttaskJob> dttaskJobs = dttaskJobList.subList(0, size - newSize);stopDttaskJobMapOfOldNodes.put(serverId, dttaskJobs);startDttaskJobs.addAll(dttaskJobs);}});startDttaskJobMapOfNewNodes.put(onlineServerId, startDttaskJobs);executeDttaskJob(new ExecuteDttaskJobContext(stopDttaskJobMapOfOldNodes, false));executeDttaskJob(new ExecuteDttaskJobContext(startDttaskJobMapOfNewNodes, true));}
}

2.3 ServerClientChannelHandler

对节点下线进行重平衡处理

2.4 NodeOnlineMessageService

3. 测试

启动三个节点,节点完成选举,每个节点执行2个任务

  • 3号节点下线

1 2 节点各分配了一个任务继续执行

  • 3号节点上线

新上线的3号节点,重新得到2个任务,1 2节点各停止一个任务

至此,节点上下线的任务重平衡完成

这篇关于7.实现任务的rebalance的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/507519

相关文章

Spring定时任务之fixedRateString的实现示例

《Spring定时任务之fixedRateString的实现示例》本文主要介绍了Spring定时任务之fixedRateString的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录从毫秒到 Duration:为何要改变?核心:Java.time.Duration.parse

Python进行word模板内容替换的实现示例

《Python进行word模板内容替换的实现示例》本文介绍了使用Python自动化处理Word模板文档的常用方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友... 目录技术背景与需求场景核心工具库介绍1.获取你的word模板内容2.正常文本内容的替换3.表格内容的

Java中实现对象的拷贝案例讲解

《Java中实现对象的拷贝案例讲解》Java对象拷贝分为浅拷贝(复制值及引用地址)和深拷贝(递归复制所有引用对象),常用方法包括Object.clone()、序列化及JSON转换,需处理循环引用问题,... 目录对象的拷贝简介浅拷贝和深拷贝浅拷贝深拷贝深拷贝和循环引用总结对象的拷贝简介对象的拷贝,把一个

Oracle Scheduler任务故障诊断方法实战指南

《OracleScheduler任务故障诊断方法实战指南》Oracle数据库作为企业级应用中最常用的关系型数据库管理系统之一,偶尔会遇到各种故障和问题,:本文主要介绍OracleSchedul... 目录前言一、故障场景:当定时任务突然“消失”二、基础环境诊断:搭建“全局视角”1. 数据库实例与PDB状态2

linux部署NFS和autofs自动挂载实现过程

《linux部署NFS和autofs自动挂载实现过程》文章介绍了NFS(网络文件系统)和Autofs的原理与配置,NFS通过RPC实现跨系统文件共享,需配置/etc/exports和nfs.conf,... 目录(一)NFS1. 什么是NFS2.NFS守护进程3.RPC服务4. 原理5. 部署5.1安装NF

Python实现自动化删除Word文档超链接的实用技巧

《Python实现自动化删除Word文档超链接的实用技巧》在日常工作中,我们经常需要处理各种Word文档,本文将深入探讨如何利用Python,特别是借助一个功能强大的库,高效移除Word文档中的超链接... 目录为什么需要移除Word文档超链接准备工作:环境搭建与库安装核心实现:使用python移除超链接的

Python Excel 通用筛选函数的实现

《PythonExcel通用筛选函数的实现》本文主要介绍了PythonExcel通用筛选函数的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录案例目的示例数据假定数据来源是字典优化:通用CSV数据处理函数使用说明使用示例注意事项案例目的第一

C#使用SendMessage实现进程间通信的示例代码

《C#使用SendMessage实现进程间通信的示例代码》在软件开发中,进程间通信(IPC)是关键技术之一,C#通过调用WindowsAPI的SendMessage函数实现这一功能,本文将通过实例介绍... 目录第一章:SendMessage的底层原理揭秘第二章:构建跨进程通信桥梁2.1 定义通信协议2.2

JAVA实现亿级千万级数据顺序导出的示例代码

《JAVA实现亿级千万级数据顺序导出的示例代码》本文主要介绍了JAVA实现亿级千万级数据顺序导出的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序OOM问题。实现思路:A.启用线程池

Python实现中文大写金额转阿拉伯数字

《Python实现中文大写金额转阿拉伯数字》在财务票据中,中文大写金额被广泛使用以防止篡改,但在数据处理时,我们需要将其转换为阿拉伯数字形式,下面我们就来看看如何使用Python实现这一转换吧... 目录一、核心思路拆解二、中文数字解析实现三、大单位分割策略四、元角分综合处理五、测试验证六、全部代码在财务票