DAG计算框架:实现业务编排

2024-08-24 04:44

本文主要是介绍DAG计算框架:实现业务编排,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • DAG
  • 如何实现DAG计算框架
    • Node的实现
    • Engine的实现
    • Graph的实现
    • 具体某个节点
    • 如何使用

在工作几年之后,大部分人如果还在继续做着 CRUD的简单重复工作,即使领导不提出对你更高的期望,自身也会感到焦虑吧。学如逆水行舟不进则退,年龄在增,技术深度也需要不断精进,否则就很可能面临淘汰。因此找个时间静下心来,为自己做一个技术规划是非常有必要的。

在工作中想要做好技术规划,就必须抓住一个软件系统的演进见律:

函数->类->组件->脚本->服务->系统->分栈/层->配置化/标准化->自动化->平台化->产品化->规模化

软件工程的本质就是应对规模化所带来的复杂性。

因此如何将复杂的东西变简单,以便于承接更大的规模化发展,这本身就是技术的本质,因此是极其有技术含量的事。

DAG

在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG Directed Acyclic Graph),在工作中,大部分规则引擎都会用到DAG
在这里插入图片描述

设计时,一般只需要在请求到来时,根据变化的配置信息对DAG进行初始化,根据上下文中的信息(一般用自定义的ctx携带信息),每个node决定是否真的执行具体的task(或者跳过),将业务组件最大程度的复用和内聚。

如何实现DAG计算框架

Node的实现

通过继承扩展实现,业务开发需要实现两个函数EnableRun,其中所有参数检查逻辑在Enable中完成, Enable返回false代表不启用此NodeRun函数是真正执行
业务逻辑的函数实现,这样对于一个具体Node的所有业务逻辑都被高度内聚在了一个文件中实现。

type Node struct {Ctx *BizCtx // 自定义的上下文Name string //节点名字,一般表示业务单元的标识(一个业务流程是一个Node)g *Graph  //整个DAG的控制对象ID int64 // 保证唯一的IDDeps []string // 所有的父节点Nexts []*Node // 所有的子节点 Mask int64 // 表示依赖的演码,用于标识当前节点是否可达(可执行)
}//根据相关参数构建节点
func NewNode(...)*Node{....}
// 获取上下文
func (n *Node)GetCtx()*BizCtx{return n.BizCtx}
// 参数校验
func (n *Node)Enable()bool{...}
// 节点实际运行
func (n *Node)Run(){}
// 获取节点名字
func (n *Node)Name()string{return n.Name}
// 节点依赖,可理解成当前节点的父节点,本示例中,父节点都执行成功后,才可执行当前节点
func (n *Node)Deps()[]string{return n.Deps}
// 所有的子节点
func (n *Node)Nexts()[]*Node{return n.Nexts}
// 节点的配置,不同业务,可能有自己的一些配置参数,如分流参数,奖励参数
func (n *Node)Conf()*NodeConf{}

Engine的实现

提供DAG计算框架的运行时资源管理,协程池管理计算资源原,对象池管理内存资源。

type Engine struct { // engine的生命周期是进程级的
ctxPool *sync.Pool
gPool *sync.Pool
runPool *goPool // 某种协程池实现,接受两个函数,一个函数执行和一个回调函数
graph *Grpah // graph代表着一个真正的DAG,是请求级的生命周期。
}// 函数式选项模式获得一个Engine对象
func NewEngine(opt Options)*Engine{...}
// 初始化一个图
func (e *Engine)BuildGraph()*Graph{...}

Graph的实现

真正实现DAG调度的组件,请求范围内的生命周期

var RootNode = &Node{ID:-1}
var EndNode = &Node{ID:-2}
type Graph struct{e *Engine // Graph 和Engine互相包含id int64taskChannel chan int64 // 需要执行的节点的IDackChanel   chan int64   // 异步回调的确认chandoneChannel chan struct{}  // 执行完成或者异常时,终止DAG的通道NameTable   map[string]*Node // 节点名与节点的映射IDTable     map[int64]*Node // 节点id与节点的映射
}// 添加节点,拼接实际的图
// 这里默认添加顺序遵循了添加当前节点时,已经添加完了当前节点依赖的所有父节点
func (g *Graph)ADD(node *Node)*Graph{g.NameTable[node.Name()] = node// Mask在构建Node时会设置和Node的ID字段相同,然后与所有父节点的ID异或,得到新的Mask值// 后面执行时候,当前节点执行成功,就与其所有子节点异或,更新了子节点的Mask值// 当子节点的Mask值和其ID值又相等时,说明当前节点的所有父节点都执行成功了,可以执行当前节点了for nodeName <- node.Deps(){preNode := g.NameTable[nodeName]node.Mask ^= preNode.IDpreNode.Nexts = append(preNode.Nexts, node)}
}func (g *Graph)Run() err {
//在遍历过程中对第一层没有依赖的node添加一个 rootNode,其 ID== -1
//在遍历的过程中对最后没有出度的节点添加上特殊的 终止Nodle,其ID=== -2
// 即默认让-1和-2分别作为根节点和终止节点
g.taskChannel <- -1
for{select{case taskID <-g.taskChannel:// 遇到了终止节点,当前图可以终止执行了if taskID == -2 {close(g.doneChannel)}node := g.IDTable[taskID]if node.Enable(){// 使用Engine管理协程执行g.e.runPool(func(){node.Run()}, func(id int64){g.ackChannel<-id})}//这里也可以基于协程池做异步控制,ackcase taskID <- g.ackChanel:node := g.IDTable[taskID]// 当前节点致辞哪个成功后,通知所有子节点for nextNode <- node.Nexts(){nextNode.Mask ^= node.ID//利用相同数字异或结果为0的特性维护任务依赖状态// 该子节点可以放入可执行的channel中了if nextNode.Mask == nextNode.ID { gg.taskChannel <- nextNode.ID}case <-g.doneChannel:g.Close()return

具体某个节点

这里以一个RecoveryNode为例,其可能放于文件:/nodes/recovery.go一个单独文件中,其他具体的节点也都放于单独的文件中,但都共同放在nodes文件夹中。

type RecoveryNode struct{*Node // 继承节点的能力.... // 其他与当前节点相关的业务自定义字段
}
//注册名字
func (n *Node)Name()string{return "recovery"}//注册依赖,假如当前节点依赖了分流和奖励节点
func (n *Node)Deps()[]string{return []string{"shunt", "reward"}//可以把Node的name定义为常量进行传递会更好,避免出错
}func NewRecoveryNode(/**这里也可以传参数**/)*Node{return &RecoveryNode{Node:NewNode()}
}func (n *RecoveryNode)Enable()bool{// 利用了ctx的WithValue能力,如下shunt.path就是一个key,其中shunt可以理解成是命名空间,表示是shunt节点中设置的path key,取其值// 这里表示分流节点中的通过路径不是1时,可以执行当前节点return n.Node.GetCtx().GetString("shunt.path","") != "1"
}func (n *Node)Run(){count := n.Node.GetCtx().GetInt64("reward.count", 20)list:=Recovery(count)//示意而已,不用在乎业务具体逻辑n.Node.GetCtx().SetInt64List("recovery.success"", true)
}// ... 其他一些方法的实现

如何使用

var e *Engine
func init(){e = NewEngine{}
}func handler(){g := e.BuildGraph().ADD(NewShuntNode()).ADD(NewRewardNode()).ADD(NewRecoveryNode()).ADD(NewPackDataNode())err := g.Run()print(err)

这篇关于DAG计算框架:实现业务编排的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101485

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S