DAG计算框架:实现业务编排

2024-08-24 04:44

本文主要是介绍DAG计算框架:实现业务编排,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • DAG
  • 如何实现DAG计算框架
    • Node的实现
    • Engine的实现
    • Graph的实现
    • 具体某个节点
    • 如何使用

在工作几年之后,大部分人如果还在继续做着 CRUD的简单重复工作,即使领导不提出对你更高的期望,自身也会感到焦虑吧。学如逆水行舟不进则退,年龄在增,技术深度也需要不断精进,否则就很可能面临淘汰。因此找个时间静下心来,为自己做一个技术规划是非常有必要的。

在工作中想要做好技术规划,就必须抓住一个软件系统的演进见律:

函数->类->组件->脚本->服务->系统->分栈/层->配置化/标准化->自动化->平台化->产品化->规模化

软件工程的本质就是应对规模化所带来的复杂性。

因此如何将复杂的东西变简单,以便于承接更大的规模化发展,这本身就是技术的本质,因此是极其有技术含量的事。

DAG

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG Directed Acyclic Graph),在工作中,大部分规则引擎都会用到DAG
在这里插入图片描述

设计时,一般只需要在请求到来时,根据变化的配置信息对DAG进行初始化,根据上下文中的信息(一般用自定义的ctx携带信息),每个node决定是否真的执行具体的task(或者跳过),将业务组件最大程度的复用和内聚。

如何实现DAG计算框架

Node的实现

通过继承扩展实现,业务开发需要实现两个函数EnableRun,其中所有参数检查逻辑在Enable中完成, Enable返回false代表不启用此NodeRun函数是真正执行
业务逻辑的函数实现,这样对于一个具体Node的所有业务逻辑都被高度内聚在了一个文件中实现。

type Node struct {Ctx *BizCtx // 自定义的上下文Name string //节点名字,一般表示业务单元的标识(一个业务流程是一个Node)g *Graph  //整个DAG的控制对象ID int64 // 保证唯一的IDDeps []string // 所有的父节点Nexts []*Node // 所有的子节点 Mask int64 // 表示依赖的演码,用于标识当前节点是否可达(可执行)
}//根据相关参数构建节点
func NewNode(...)*Node{....}
// 获取上下文
func (n *Node)GetCtx()*BizCtx{return n.BizCtx}
// 参数校验
func (n *Node)Enable()bool{...}
// 节点实际运行
func (n *Node)Run(){}
// 获取节点名字
func (n *Node)Name()string{return n.Name}
// 节点依赖,可理解成当前节点的父节点,本示例中,父节点都执行成功后,才可执行当前节点
func (n *Node)Deps()[]string{return n.Deps}
// 所有的子节点
func (n *Node)Nexts()[]*Node{return n.Nexts}
// 节点的配置,不同业务,可能有自己的一些配置参数,如分流参数,奖励参数
func (n *Node)Conf()*NodeConf{}

Engine的实现

提供DAG计算框架的运行时资源管理,协程池管理计算资源原,对象池管理内存资源。

type Engine struct { // engine的生命周期是进程级的
ctxPool *sync.Pool
gPool *sync.Pool
runPool *goPool // 某种协程池实现,接受两个函数,一个函数执行和一个回调函数
graph *Grpah // graph代表着一个真正的DAG,是请求级的生命周期。
}// 函数式选项模式获得一个Engine对象
func NewEngine(opt Options)*Engine{...}
// 初始化一个图
func (e *Engine)BuildGraph()*Graph{...}

Graph的实现

真正实现DAG调度的组件,请求范围内的生命周期

var RootNode = &Node{ID:-1}
var EndNode = &Node{ID:-2}
type Graph struct{e *Engine // Graph 和Engine互相包含id int64taskChannel chan int64 // 需要执行的节点的IDackChanel   chan int64   // 异步回调的确认chandoneChannel chan struct{}  // 执行完成或者异常时,终止DAG的通道NameTable   map[string]*Node // 节点名与节点的映射IDTable     map[int64]*Node // 节点id与节点的映射
}// 添加节点,拼接实际的图
// 这里默认添加顺序遵循了添加当前节点时,已经添加完了当前节点依赖的所有父节点
func (g *Graph)ADD(node *Node)*Graph{g.NameTable[node.Name()] = node// Mask在构建Node时会设置和Node的ID字段相同,然后与所有父节点的ID异或,得到新的Mask值// 后面执行时候,当前节点执行成功,就与其所有子节点异或,更新了子节点的Mask值// 当子节点的Mask值和其ID值又相等时,说明当前节点的所有父节点都执行成功了,可以执行当前节点了for nodeName <- node.Deps(){preNode := g.NameTable[nodeName]node.Mask ^= preNode.IDpreNode.Nexts = append(preNode.Nexts, node)}
}func (g *Graph)Run() err {
//在遍历过程中对第一层没有依赖的node添加一个 rootNode,其 ID== -1
//在遍历的过程中对最后没有出度的节点添加上特殊的 终止Nodle,其ID=== -2
// 即默认让-1和-2分别作为根节点和终止节点
g.taskChannel <- -1
for{select{case taskID <-g.taskChannel:// 遇到了终止节点,当前图可以终止执行了if taskID == -2 {close(g.doneChannel)}node := g.IDTable[taskID]if node.Enable(){// 使用Engine管理协程执行g.e.runPool(func(){node.Run()}, func(id int64){g.ackChannel<-id})}//这里也可以基于协程池做异步控制,ackcase taskID <- g.ackChanel:node := g.IDTable[taskID]// 当前节点致辞哪个成功后,通知所有子节点for nextNode <- node.Nexts(){nextNode.Mask ^= node.ID//利用相同数字异或结果为0的特性维护任务依赖状态// 该子节点可以放入可执行的channel中了if nextNode.Mask == nextNode.ID { gg.taskChannel <- nextNode.ID}case <-g.doneChannel:g.Close()return

具体某个节点

这里以一个RecoveryNode为例,其可能放于文件:/nodes/recovery.go一个单独文件中,其他具体的节点也都放于单独的文件中,但都共同放在nodes文件夹中。

type RecoveryNode struct{*Node // 继承节点的能力.... // 其他与当前节点相关的业务自定义字段
}
//注册名字
func (n *Node)Name()string{return "recovery"}//注册依赖,假如当前节点依赖了分流和奖励节点
func (n *Node)Deps()[]string{return []string{"shunt", "reward"}//可以把Node的name定义为常量进行传递会更好,避免出错
}func NewRecoveryNode(/**这里也可以传参数**/)*Node{return &RecoveryNode{Node:NewNode()}
}func (n *RecoveryNode)Enable()bool{// 利用了ctx的WithValue能力,如下shunt.path就是一个key,其中shunt可以理解成是命名空间,表示是shunt节点中设置的path key,取其值// 这里表示分流节点中的通过路径不是1时,可以执行当前节点return n.Node.GetCtx().GetString("shunt.path","") != "1"
}func (n *Node)Run(){count := n.Node.GetCtx().GetInt64("reward.count", 20)list:=Recovery(count)//示意而已,不用在乎业务具体逻辑n.Node.GetCtx().SetInt64List("recovery.success"", true)
}// ... 其他一些方法的实现

如何使用

var e *Engine
func init(){e = NewEngine{}
}func handler(){g := e.BuildGraph().ADD(NewShuntNode()).ADD(NewRewardNode()).ADD(NewRecoveryNode()).ADD(NewPackDataNode())err := g.Run()print(err)

这篇关于DAG计算框架:实现业务编排的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101485

相关文章

Spring Boot 整合 Redis 实现数据缓存案例详解

《SpringBoot整合Redis实现数据缓存案例详解》Springboot缓存,默认使用的是ConcurrentMap的方式来实现的,然而我们在项目中并不会这么使用,本文介绍SpringB... 目录1.添加 Maven 依赖2.配置Redis属性3.创建 redisCacheManager4.使用Sp

Kali Linux安装实现教程(亲测有效)

《KaliLinux安装实现教程(亲测有效)》:本文主要介绍KaliLinux安装实现教程(亲测有效),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、下载二、安装总结一、下载1、点http://www.chinasem.cn击链接 Get Kali | Kal

C#使用MQTTnet实现服务端与客户端的通讯的示例

《C#使用MQTTnet实现服务端与客户端的通讯的示例》本文主要介绍了C#使用MQTTnet实现服务端与客户端的通讯的示例,包括协议特性、连接管理、QoS机制和安全策略,具有一定的参考价值,感兴趣的可... 目录一、MQTT 协议简介二、MQTT 协议核心特性三、MQTTNET 库的核心功能四、服务端(BR

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

Dubbo之SPI机制的实现原理和优势分析

《Dubbo之SPI机制的实现原理和优势分析》:本文主要介绍Dubbo之SPI机制的实现原理和优势,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Dubbo中SPI机制的实现原理和优势JDK 中的 SPI 机制解析Dubbo 中的 SPI 机制解析总结Dubbo中

使用Java实现Navicat密码的加密与解密的代码解析

《使用Java实现Navicat密码的加密与解密的代码解析》:本文主要介绍使用Java实现Navicat密码的加密与解密,通过本文,我们了解了如何利用Java语言实现对Navicat保存的数据库密... 目录一、背景介绍二、环境准备三、代码解析四、核心代码展示五、总结在日常开发过程中,我们有时需要处理各种软

Java 压缩包解压实现代码

《Java压缩包解压实现代码》Java标准库(JavaSE)提供了对ZIP格式的原生支持,通过java.util.zip包中的类来实现压缩和解压功能,本文将重点介绍如何使用Java来解压ZIP或RA... 目录一、解压压缩包1.zip解压代码实现:2.rar解压代码实现:3.调用解压方法:二、注意事项三、总

NGINX 配置内网访问的实现步骤

《NGINX配置内网访问的实现步骤》本文主要介绍了NGINX配置内网访问的实现步骤,Nginx的geo模块限制域名访问权限,仅允许内网/办公室IP访问,具有一定的参考价值,感兴趣的可以了解一下... 目录需求1. geo 模块配置2. 访问控制判断3. 错误页面配置4. 一个完整的配置参考文档需求我们有一

Linux实现简易版Shell的代码详解

《Linux实现简易版Shell的代码详解》本篇文章,我们将一起踏上一段有趣的旅程,仿照CentOS–Bash的工作流程,实现一个功能虽然简单,但足以让你深刻理解Shell工作原理的迷你Sh... 目录一、程序流程分析二、代码实现1. 打印命令行提示符2. 获取用户输入的命令行3. 命令行解析4. 执行命令

基于MongoDB实现文件的分布式存储

《基于MongoDB实现文件的分布式存储》分布式文件存储的方案有很多,今天分享一个基于mongodb数据库来实现文件的存储,mongodb支持分布式部署,以此来实现文件的分布式存储,需要的朋友可以参考... 目录一、引言二、GridFS 原理剖析三、Spring Boot 集成 GridFS3.1 添加依赖