LlamaIndex 工作流

2024-08-31 16:28
文章标签 工作 llamaindex

本文主要是介绍LlamaIndex 工作流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

LlamaIndex 内部提供了一个简单的工作流引擎,为什么要有工作流引擎?做过 OA 的同学都了解工作流引擎,工作流的优势在于模块化开发,把业务节点进行抽象,流程于业务逻辑分离,方便进行业务节点组装,也是很多低代码平台的底层工作原理。大语言模型的应用特别适合工作流, 模型可以理解一个万能的 API,传统的 API 都有固定的入参、出参、功能,而模型会根据提示词做推理,具体做什么,返回什么,需要用户来自定义。例如,可以想象一个典型的场景,检测系统日志,如果发现异常发送邮件到指定的邮件组。本文将介绍如何在 LlamaIndex 创建工作流:

创建一个简单的工作流

首先安装工作流依赖

pip install llama-index-utils-workflow

LlamaIndex 是一个基于事件的工作流引擎,工作流通过事件来驱动,工作流节点在 LlamaIndex 中是 Step,节点对应类中的一个方法,方法上加上@step 注解,node 的输入和输出都是 event。工作流有两个特别 Event,StartEvent 和 StopEvent,StartEvent 是开始节点,workflow.run 启动 workflow 之后进入的第一个节点就是 StartEvent,workflow.run 可以传入初始化参数。

from llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flowsclass F1Event(Event):first_output: strclass MyWorkflow(Workflow):@stepasync def my_step(self, ev: StartEvent) -> StopEvent:# do something herereturn StopEvent(result=ev.topic)draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")async def main():w = MyWorkflow(timeout=10, verbose=False)result = await w.run(topic="Hello")print(result)if __name__ == "__main__":import asyncioasyncio.run(main())

draw_all_possible_flows 可以将 Workflow 可视化
在这里插入图片描述

保存状态

通过上下文 (Conext) 在节点之间保存数据,例如初始化传入数据, 在其他节点获取数据。

from llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,Context
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flowsclass FirstEvent(Event):first_output: strclass SecondEvent(Event):second_output: strclass MyWorkflow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:print(ev.first_input)await ctx.set("data", ev.data)return FirstEvent(first_output="First step complete.")@stepasync def step_two(self, ctx:Context, ev: FirstEvent) -> SecondEvent:print(ev.first_output)return SecondEvent(second_output="Second step complete.")@stepasync def step_three(self, ctx:Context, ev: SecondEvent) -> StopEvent:print(ev.second_output)print(await ctx.get("data"))return StopEvent(result="Workflow complete.")async def main():w = MyWorkflow(timeout=10, verbose=False)result = await w.run(first_input="Hello", data={"name": "tom"})print(result)if __name__ == "__main__":import asyncioasyncio.run(main())

嵌套工作流

工作流节点中,可以嵌套其他工作流,首先在 workflow 中添加工作流,在需要启动工作流的节点上,将子工作流作为参数传入。

from llama_index.core.workflow import (StartEvent,StopEvent,Workflow,step,Event,Context
)
import asyncio
from llama_index.utils.workflow import draw_all_possible_flowsclass ReflectionFlow(Workflow):@stepasync def sub_start(self, ctx: Context, ev: StartEvent) -> StopEvent:print("Doing custom reflection")return StopEvent(result="Improved query")class FirstEvent(Event):first_output: strclass SecondEvent(Event):second_output: strclass MyWorkflow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:print(ev.first_input)await ctx.set("data", ev.data)return FirstEvent(first_output="First step complete.")@stepasync def step_two(self, ctx:Context, ev: FirstEvent, reflection_workflow: Workflow) -> SecondEvent:print(ev.first_output)res = await reflection_workflow.run(query="nested")print(f"nested workflow {res}")return SecondEvent(second_output="Second step complete.")@stepasync def step_three(self, ctx:Context, ev: SecondEvent) -> StopEvent:print(ev.second_output)print(await ctx.get("data"))return StopEvent(result="Workflow complete.")async def main():w = MyWorkflow(timeout=10, verbose=False)w.add_workflows(reflection_workflow=ReflectionFlow())result = await w.run(first_input="Hello", data={"name": "tom"})print(result)if __name__ == "__main__":import asyncioasyncio.run(main())

总结

在 LlamaIndex 中,为了能够更好的将组建进行组装,提供工作流机制,事件工作流可以很好的解耦工作流的逻辑。事件工作流要尽量简单,如果节点过多就是导致事件过于复杂,事件管理也是比较松散的,过多的依赖会导致后期维护困难。

这篇关于LlamaIndex 工作流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1124402

相关文章

LiteFlow轻量级工作流引擎使用示例详解

《LiteFlow轻量级工作流引擎使用示例详解》:本文主要介绍LiteFlow是一个灵活、简洁且轻量的工作流引擎,适合用于中小型项目和微服务架构中的流程编排,本文给大家介绍LiteFlow轻量级工... 目录1. LiteFlow 主要特点2. 工作流定义方式3. LiteFlow 流程示例4. LiteF

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

基于Python开发一个有趣的工作时长计算器

《基于Python开发一个有趣的工作时长计算器》随着远程办公和弹性工作制的兴起,个人及团队对于工作时长的准确统计需求日益增长,本文将使用Python和PyQt5打造一个工作时长计算器,感兴趣的小伙伴可... 目录概述功能介绍界面展示php软件使用步骤说明代码详解1.窗口初始化与布局2.工作时长计算核心逻辑3

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

Go 语言中的select语句详解及工作原理

《Go语言中的select语句详解及工作原理》在Go语言中,select语句是用于处理多个通道(channel)操作的一种控制结构,它类似于switch语句,本文给大家介绍Go语言中的select语... 目录Go 语言中的 select 是做什么的基本功能语法工作原理示例示例 1:监听多个通道示例 2:带

kotlin中的模块化结构组件及工作原理

《kotlin中的模块化结构组件及工作原理》本文介绍了Kotlin中模块化结构组件,包括ViewModel、LiveData、Room和Navigation的工作原理和基础使用,本文通过实例代码给大家... 目录ViewModel 工作原理LiveData 工作原理Room 工作原理Navigation 工

SSID究竟是什么? WiFi网络名称及工作方式解析

《SSID究竟是什么?WiFi网络名称及工作方式解析》SID可以看作是无线网络的名称,类似于有线网络中的网络名称或者路由器的名称,在无线网络中,设备通过SSID来识别和连接到特定的无线网络... 当提到 Wi-Fi 网络时,就避不开「SSID」这个术语。简单来说,SSID 就是 Wi-Fi 网络的名称。比如

工作常用指令与快捷键

Git提交代码 git fetch  git add .  git commit -m “desc”  git pull  git push Git查看当前分支 git symbolic-ref --short -q HEAD Git创建新的分支并切换 git checkout -b XXXXXXXXXXXXXX git push origin XXXXXXXXXXXXXX

嵌入式方向的毕业生,找工作很迷茫

一个应届硕士生的问题: 虽然我明白想成为技术大牛需要日积月累的磨练,但我总感觉自己学习方法或者哪些方面有问题,时间一天天过去,自己也每天不停学习,但总感觉自己没有想象中那样进步,总感觉找不到一个很清晰的学习规划……眼看 9 月份就要参加秋招了,我想毕业了去大城市磨练几年,涨涨见识,拓开眼界多学点东西。但是感觉自己的实力还是很不够,内心慌得不行,总怕浪费了这人生唯一的校招机会,当然我也明白,毕业