DAG计算框架:实现业务编排

2024-08-24 04:44

本文主要是介绍DAG计算框架:实现业务编排,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • DAG
  • 如何实现DAG计算框架
    • Node的实现
    • Engine的实现
    • Graph的实现
    • 具体某个节点
    • 如何使用

在工作几年之后,大部分人如果还在继续做着 CRUD的简单重复工作,即使领导不提出对你更高的期望,自身也会感到焦虑吧。学如逆水行舟不进则退,年龄在增,技术深度也需要不断精进,否则就很可能面临淘汰。因此找个时间静下心来,为自己做一个技术规划是非常有必要的。

在工作中想要做好技术规划,就必须抓住一个软件系统的演进见律:

函数->类->组件->脚本->服务->系统->分栈/层->配置化/标准化->自动化->平台化->产品化->规模化

软件工程的本质就是应对规模化所带来的复杂性。

因此如何将复杂的东西变简单,以便于承接更大的规模化发展,这本身就是技术的本质,因此是极其有技术含量的事。

DAG

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG Directed Acyclic Graph),在工作中,大部分规则引擎都会用到DAG
在这里插入图片描述

设计时,一般只需要在请求到来时,根据变化的配置信息对DAG进行初始化,根据上下文中的信息(一般用自定义的ctx携带信息),每个node决定是否真的执行具体的task(或者跳过),将业务组件最大程度的复用和内聚。

如何实现DAG计算框架

Node的实现

通过继承扩展实现,业务开发需要实现两个函数EnableRun,其中所有参数检查逻辑在Enable中完成, Enable返回false代表不启用此NodeRun函数是真正执行
业务逻辑的函数实现,这样对于一个具体Node的所有业务逻辑都被高度内聚在了一个文件中实现。

type Node struct {Ctx *BizCtx // 自定义的上下文Name string //节点名字,一般表示业务单元的标识(一个业务流程是一个Node)g *Graph  //整个DAG的控制对象ID int64 // 保证唯一的IDDeps []string // 所有的父节点Nexts []*Node // 所有的子节点 Mask int64 // 表示依赖的演码,用于标识当前节点是否可达(可执行)
}//根据相关参数构建节点
func NewNode(...)*Node{....}
// 获取上下文
func (n *Node)GetCtx()*BizCtx{return n.BizCtx}
// 参数校验
func (n *Node)Enable()bool{...}
// 节点实际运行
func (n *Node)Run(){}
// 获取节点名字
func (n *Node)Name()string{return n.Name}
// 节点依赖,可理解成当前节点的父节点,本示例中,父节点都执行成功后,才可执行当前节点
func (n *Node)Deps()[]string{return n.Deps}
// 所有的子节点
func (n *Node)Nexts()[]*Node{return n.Nexts}
// 节点的配置,不同业务,可能有自己的一些配置参数,如分流参数,奖励参数
func (n *Node)Conf()*NodeConf{}

Engine的实现

提供DAG计算框架的运行时资源管理,协程池管理计算资源原,对象池管理内存资源。

type Engine struct { // engine的生命周期是进程级的
ctxPool *sync.Pool
gPool *sync.Pool
runPool *goPool // 某种协程池实现,接受两个函数,一个函数执行和一个回调函数
graph *Grpah // graph代表着一个真正的DAG,是请求级的生命周期。
}// 函数式选项模式获得一个Engine对象
func NewEngine(opt Options)*Engine{...}
// 初始化一个图
func (e *Engine)BuildGraph()*Graph{...}

Graph的实现

真正实现DAG调度的组件,请求范围内的生命周期

var RootNode = &Node{ID:-1}
var EndNode = &Node{ID:-2}
type Graph struct{e *Engine // Graph 和Engine互相包含id int64taskChannel chan int64 // 需要执行的节点的IDackChanel   chan int64   // 异步回调的确认chandoneChannel chan struct{}  // 执行完成或者异常时,终止DAG的通道NameTable   map[string]*Node // 节点名与节点的映射IDTable     map[int64]*Node // 节点id与节点的映射
}// 添加节点,拼接实际的图
// 这里默认添加顺序遵循了添加当前节点时,已经添加完了当前节点依赖的所有父节点
func (g *Graph)ADD(node *Node)*Graph{g.NameTable[node.Name()] = node// Mask在构建Node时会设置和Node的ID字段相同,然后与所有父节点的ID异或,得到新的Mask值// 后面执行时候,当前节点执行成功,就与其所有子节点异或,更新了子节点的Mask值// 当子节点的Mask值和其ID值又相等时,说明当前节点的所有父节点都执行成功了,可以执行当前节点了for nodeName <- node.Deps(){preNode := g.NameTable[nodeName]node.Mask ^= preNode.IDpreNode.Nexts = append(preNode.Nexts, node)}
}func (g *Graph)Run() err {
//在遍历过程中对第一层没有依赖的node添加一个 rootNode,其 ID== -1
//在遍历的过程中对最后没有出度的节点添加上特殊的 终止Nodle,其ID=== -2
// 即默认让-1和-2分别作为根节点和终止节点
g.taskChannel <- -1
for{select{case taskID <-g.taskChannel:// 遇到了终止节点,当前图可以终止执行了if taskID == -2 {close(g.doneChannel)}node := g.IDTable[taskID]if node.Enable(){// 使用Engine管理协程执行g.e.runPool(func(){node.Run()}, func(id int64){g.ackChannel<-id})}//这里也可以基于协程池做异步控制,ackcase taskID <- g.ackChanel:node := g.IDTable[taskID]// 当前节点致辞哪个成功后,通知所有子节点for nextNode <- node.Nexts(){nextNode.Mask ^= node.ID//利用相同数字异或结果为0的特性维护任务依赖状态// 该子节点可以放入可执行的channel中了if nextNode.Mask == nextNode.ID { gg.taskChannel <- nextNode.ID}case <-g.doneChannel:g.Close()return

具体某个节点

这里以一个RecoveryNode为例,其可能放于文件:/nodes/recovery.go一个单独文件中,其他具体的节点也都放于单独的文件中,但都共同放在nodes文件夹中。

type RecoveryNode struct{*Node // 继承节点的能力.... // 其他与当前节点相关的业务自定义字段
}
//注册名字
func (n *Node)Name()string{return "recovery"}//注册依赖,假如当前节点依赖了分流和奖励节点
func (n *Node)Deps()[]string{return []string{"shunt", "reward"}//可以把Node的name定义为常量进行传递会更好,避免出错
}func NewRecoveryNode(/**这里也可以传参数**/)*Node{return &RecoveryNode{Node:NewNode()}
}func (n *RecoveryNode)Enable()bool{// 利用了ctx的WithValue能力,如下shunt.path就是一个key,其中shunt可以理解成是命名空间,表示是shunt节点中设置的path key,取其值// 这里表示分流节点中的通过路径不是1时,可以执行当前节点return n.Node.GetCtx().GetString("shunt.path","") != "1"
}func (n *Node)Run(){count := n.Node.GetCtx().GetInt64("reward.count", 20)list:=Recovery(count)//示意而已,不用在乎业务具体逻辑n.Node.GetCtx().SetInt64List("recovery.success"", true)
}// ... 其他一些方法的实现

如何使用

var e *Engine
func init(){e = NewEngine{}
}func handler(){g := e.BuildGraph().ADD(NewShuntNode()).ADD(NewRewardNode()).ADD(NewRecoveryNode()).ADD(NewPackDataNode())err := g.Run()print(err)

这篇关于DAG计算框架:实现业务编排的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101485

相关文章

基于Python实现一个图片拆分工具

《基于Python实现一个图片拆分工具》这篇文章主要为大家详细介绍了如何基于Python实现一个图片拆分工具,可以根据需要的行数和列数进行拆分,感兴趣的小伙伴可以跟随小编一起学习一下... 简单介绍先自己选择输入的图片,默认是输出到项目文件夹中,可以自己选择其他的文件夹,选择需要拆分的行数和列数,可以通过

Python中将嵌套列表扁平化的多种实现方法

《Python中将嵌套列表扁平化的多种实现方法》在Python编程中,我们常常会遇到需要将嵌套列表(即列表中包含列表)转换为一个一维的扁平列表的需求,本文将给大家介绍了多种实现这一目标的方法,需要的朋... 目录python中将嵌套列表扁平化的方法技术背景实现步骤1. 使用嵌套列表推导式2. 使用itert

Python使用pip工具实现包自动更新的多种方法

《Python使用pip工具实现包自动更新的多种方法》本文深入探讨了使用Python的pip工具实现包自动更新的各种方法和技术,我们将从基础概念开始,逐步介绍手动更新方法、自动化脚本编写、结合CI/C... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

在Linux中改变echo输出颜色的实现方法

《在Linux中改变echo输出颜色的实现方法》在Linux系统的命令行环境下,为了使输出信息更加清晰、突出,便于用户快速识别和区分不同类型的信息,常常需要改变echo命令的输出颜色,所以本文给大家介... 目python录在linux中改变echo输出颜色的方法技术背景实现步骤使用ANSI转义码使用tpu

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Python使用OpenCV实现获取视频时长的小工具

《Python使用OpenCV实现获取视频时长的小工具》在处理视频数据时,获取视频的时长是一项常见且基础的需求,本文将详细介绍如何使用Python和OpenCV获取视频时长,并对每一行代码进行深入解析... 目录一、代码实现二、代码解析1. 导入 OpenCV 库2. 定义获取视频时长的函数3. 打开视频文

golang版本升级如何实现

《golang版本升级如何实现》:本文主要介绍golang版本升级如何实现问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录golanwww.chinasem.cng版本升级linux上golang版本升级删除golang旧版本安装golang最新版本总结gola

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU