LangGraph 入门与实战

2024-03-25 19:04
文章标签 实战 入门 langgraph

本文主要是介绍LangGraph 入门与实战,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文:LangGraph 入门与实战 - 知乎

参考:langgraph/examples at main · langchain-ai/langgraph · GitHub

大家好,我是雨飞。LangGraph 是在 LangChain 基础上的一个库,是 LangChain 的 LangChain Expression Language (LCEL)的扩展。能够利用有向无环图的方式,去协调多个LLM或者状态,使用起来比 LCEL 会复杂,但是逻辑会更清晰。

相当于一种高级的LCEL语言,值得一试。

安装也十分简单。注意,这个库需要自己去安装,默认的LangChain不会安装这个库。

pip install langgraph

由于,OpenAI访问不方便,我们统一使用智普AI的大模型进行下面的实践。

智普AI的接口和OpenAI的比较类似,因此也可以使用OpenAI的tools的接口,目前还没有发现第二家如此方便的接口。实际使用起来,还是比较丝滑的,虽然有一些小问题。

我们下面以ToolAgent的思想,利用LangGraph去实现一个可以调用工具的Agent。

定义工具以及LLM

工具的定义,可以参考这篇文章,写的比较详细了,比较方便的就是使用 tools 这个注解。

雨飞:使用智普清言的Tools功能实现ToolAgent

定义Agent的状态

LangGraph 中最基础的类型是 StatefulGraph,这种图就会在每一个Node之间传递不同的状态信息。然后每一个节点会根据自己定义的逻辑去更新这个状态信息。具体来说,可以继承 TypeDict 这个类去定义状态,下图我们就定义了有四个变量的信息。

input:这是输入字符串,代表用户的主要请求。

chat_history: 这是之前的对话信息,也作为输入信息传入.

agent_outcome: 这是来自代理的响应,可以是 AgentAction,也可以是 AgentFinish。如果是 AgentFinish,AgentExecutor 就应该结束,否则就应该调用请求的工具。

intermediate_steps: 这是代理在一段时间内采取的行动和相应观察结果的列表。每次迭代都会更新。

class AgentState(TypedDict):# The input stringinput: str# The list of previous messages in the conversationchat_history: list[BaseMessage]# The outcome of a given call to the agent# Needs `None` as a valid type, since this is what this will start asagent_outcome: Union[AgentAction, AgentFinish, None]# List of actions and corresponding observations# Here we annotate this with `operator.add` to indicate that operations to# this state should be ADDED to the existing values (not overwrite it)intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]

定义图中的节点

在LangGraph中,节点一般是一个函数或者langchain中runnable的一种类。

我们这里定义两个节点,agent和tool节点,其中agent节点就是决定执行什么样的行动,

tool节点就是当agent节点选择执行某个行动时,去调用相应的工具。

此外,还需要定义节点之间的连接,也就是边。

条件判断的边:定义图的走向,比如Agent要采取行动时,就需要接下来调用tools,如果Agent说当前的的任务已经完成了,则结束整个流程。

普通的边:调用工具后,始终需要返回到Agent,让Agent决定下一步的行动

from langchain_core.agents import AgentFinish
from langgraph.prebuilt.tool_executor import ToolExecutor# This a helper class we have that is useful for running tools
# It takes in an agent action and calls that tool and returns the result
tool_executor = ToolExecutor(tools)# Define the agent
def run_agent(data):agent_outcome = agent_runnable.invoke(data)return {"agent_outcome": agent_outcome}# Define the function to execute tools
def execute_tools(data):# Get the most recent agent_outcome - this is the key added in the `agent` aboveagent_action = data["agent_outcome"]print("agent action:{}".format(agent_action))output = tool_executor.invoke(agent_action[-1])return {"intermediate_steps": [(agent_action[-1], str(output))]}# Define logic that will be used to determine which conditional edge to go down
def should_continue(data):# If the agent outcome is an AgentFinish, then we return `exit` string# This will be used when setting up the graph to define the flowif isinstance(data["agent_outcome"], AgentFinish):return "end"# Otherwise, an AgentAction is returned# Here we return `continue` string# This will be used when setting up the graph to define the flowelse:return "continue"

定义图

然后,我们就可以定义整个图了。值得注意的是,条件判断的边和普通的边添加方式是不一样的

最后需要编译整个图,才能正常运行。

# Define a new graph
workflow = StateGraph(AgentState)# Define the two nodes we will cycle between
workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.set_entry_point("agent")# We now add a conditional edge
workflow.add_conditional_edges(# First, we define the start node. We use `agent`.# This means these are the edges taken after the `agent` node is called."agent",# Next, we pass in the function that will determine which node is called next.should_continue,# Finally we pass in a mapping.# The keys are strings, and the values are other nodes.# END is a special node marking that the graph should finish.# What will happen is we will call `should_continue`, and then the output of that# will be matched against the keys in this mapping.# Based on which one it matches, that node will then be called.{# If `tools`, then we call the tool node."continue": "action",# Otherwise we finish."end": END,},
)# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()

总代码

下面是所有的可执行代码,注意,需要将api_key替换为自己的api_key。

# !/usr/bin env python3
# -*- coding: utf-8 -*-
# author: yangyunlong time:2024/2/28
import datetime
import operator
from typing import TypedDict, Annotated, Union, Optional,Type,Listimport requests
from langchain import hub
from langchain.agents import create_openai_tools_agent
from langchain.pydantic_v1 import BaseModel, Field
from langchain.tools import BaseTool, tool
from langchain_core.agents import AgentAction
from langchain_core.agents import AgentFinish
from langchain_core.messages import BaseMessage
from langgraph.graph import END, StateGraph
from langgraph.prebuilt.tool_executor import ToolExecutor
from zhipu_llm import ChatZhipuAIzhipuai_api_key = ""
glm3 = "glm-3-turbo"
glm4 = "glm-4"chat_zhipu = ChatZhipuAI(temperature=0.8,api_key=zhipuai_api_key,model=glm3
)class Tagging(BaseModel):"""分析句子的情感极性,并输出句子对应的语言"""sentiment: str = Field(description="sentiment of text, should be `pos`, `neg`, or `neutral`")language: str = Field(description="language of text (should be ISO 639-1 code)")class Overview(BaseModel):"""Overview of a section of text."""summary: str = Field(description="Provide a concise summary of the content.")language: str = Field(description="Provide the language that the content is written in.")keywords: str = Field(description="Provide keywords related to the content.")@tool("tagging", args_schema=Tagging)
def tagging(s1: str, s2: str):"""分析句子的情感极性,并输出句子对应的语言"""return "The sentiment is {a}, the language is {b}".format(a=s1, b=s2)@tool("overview", args_schema=Overview)
def overview(summary: str, language: str, keywords: str):"""Overview of a section of text."""return "Summary: {a}\nLanguage: {b}\nKeywords: {c}".format(a=summary, b=language, c=keywords)@tool
def get_current_temperature(latitude: float, longitude: float):"""Fetch current temperature for given coordinates."""BASE_URL = "https://api.open-meteo.com/v1/forecast"# Parameters for the requestparams = {'latitude': latitude,'longitude': longitude,'hourly': 'temperature_2m','forecast_days': 1,}# Make the requestresponse = requests.get(BASE_URL, params=params)if response.status_code == 200:results = response.json()else:raise Exception(f"API Request failed with status code: {response.status_code}")current_utc_time = datetime.datetime.utcnow()time_list = [datetime.datetime.fromisoformat(time_str.replace('Z', '+00:00')) for time_str inresults['hourly']['time']]temperature_list = results['hourly']['temperature_2m']closest_time_index = min(range(len(time_list)), key=lambda i: abs(time_list[i] - current_utc_time))current_temperature = temperature_list[closest_time_index]return f'The current temperature is {current_temperature}°C'tools = [tagging, overview, get_current_temperature]
# Get the prompt to use - you can modify this!
prompt = hub.pull("hwchase17/openai-tools-agent")# Construct the OpenAI Functions agent
agent_runnable = create_openai_tools_agent(chat_zhipu, tools, prompt)class AgentState(TypedDict):# The input stringinput: str# The list of previous messages in the conversationchat_history: list[BaseMessage]# The outcome of a given call to the agent# Needs `None` as a valid type, since this is what this will start asagent_outcome: Union[AgentAction, AgentFinish, None]# List of actions and corresponding observations# Here we annotate this with `operator.add` to indicate that operations to# this state should be ADDED to the existing values (not overwrite it)intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]# This a helper class we have that is useful for running tools
# It takes in an agent action and calls that tool and returns the resulttool_executor = ToolExecutor(tools)# Define the agent
def run_agent(data):agent_outcome = agent_runnable.invoke(data)return {"agent_outcome": agent_outcome}# Define the function to execute tools
def execute_tools(data):# Get the most recent agent_outcome - this is the key added in the `agent` aboveagent_action = data["agent_outcome"]print("agent action:{}".format(agent_action))output = tool_executor.invoke(agent_action[-1])return {"intermediate_steps": [(agent_action[-1], str(output))]}# Define logic that will be used to determine which conditional edge to go down
def should_continue(data):# If the agent outcome is an AgentFinish, then we return `exit` string# This will be used when setting up the graph to define the flowif isinstance(data["agent_outcome"], AgentFinish):return "end"# Otherwise, an AgentAction is returned# Here we return `continue` string# This will be used when setting up the graph to define the flowelse:return "continue"# Define a new graph
workflow = StateGraph(AgentState)# Define the two nodes we will cycle between
workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.set_entry_point("agent")# We now add a conditional edge
workflow.add_conditional_edges(# First, we define the start node. We use `agent`.# This means these are the edges taken after the `agent` node is called."agent",# Next, we pass in the function that will determine which node is called next.should_continue,# Finally we pass in a mapping.# The keys are strings, and the values are other nodes.# END is a special node marking that the graph should finish.# What will happen is we will call `should_continue`, and then the output of that# will be matched against the keys in this mapping.# Based on which one it matches, that node will then be called.{# If `tools`, then we call the tool node."continue": "action",# Otherwise we finish."end": END,},
)# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("action", "agent")# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()inputs = {"input": "what is the weather in NewYork", "chat_history": []}
result = app.invoke(inputs)
print(result["agent_outcome"].messages[0].content)

这篇关于LangGraph 入门与实战的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/845955

相关文章

Java Spring 中的监听器Listener详解与实战教程

《JavaSpring中的监听器Listener详解与实战教程》Spring提供了多种监听器机制,可以用于监听应用生命周期、会话生命周期和请求处理过程中的事件,:本文主要介绍JavaSprin... 目录一、监听器的作用1.1 应用生命周期管理1.2 会话管理1.3 请求处理监控二、创建监听器2.1 Ser

Python中OpenCV与Matplotlib的图像操作入门指南

《Python中OpenCV与Matplotlib的图像操作入门指南》:本文主要介绍Python中OpenCV与Matplotlib的图像操作指南,本文通过实例代码给大家介绍的非常详细,对大家的学... 目录一、环境准备二、图像的基本操作1. 图像读取、显示与保存 使用OpenCV操作2. 像素级操作3.

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性

MQTT SpringBoot整合实战教程

《MQTTSpringBoot整合实战教程》:本文主要介绍MQTTSpringBoot整合实战教程,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录MQTT-SpringBoot创建简单 SpringBoot 项目导入必须依赖增加MQTT相关配置编写

JavaScript实战:智能密码生成器开发指南

本文通过JavaScript实战开发智能密码生成器,详解如何运用crypto.getRandomValues实现加密级随机密码生成,包含多字符组合、安全强度可视化、易混淆字符排除等企业级功能。学习密码强度检测算法与信息熵计算原理,获取可直接嵌入项目的完整代码,提升Web应用的安全开发能力 目录

Redis迷你版微信抢红包实战

《Redis迷你版微信抢红包实战》本文主要介绍了Redis迷你版微信抢红包实战... 目录1 思路分析1.1hCckRX 流程1.2 注意点①拆红包:二倍均值算法②发红包:list③抢红包&记录:hset2 代码实现2.1 拆红包splitRedPacket2.2 发红包sendRedPacket2.3 抢

springboot项目redis缓存异常实战案例详解(提供解决方案)

《springboot项目redis缓存异常实战案例详解(提供解决方案)》redis基本上是高并发场景上会用到的一个高性能的key-value数据库,属于nosql类型,一般用作于缓存,一般是结合数据... 目录缓存异常实践案例缓存穿透问题缓存击穿问题(其中也解决了穿透问题)完整代码缓存异常实践案例Red

Spring Boot拦截器Interceptor与过滤器Filter深度解析(区别、实现与实战指南)

《SpringBoot拦截器Interceptor与过滤器Filter深度解析(区别、实现与实战指南)》:本文主要介绍SpringBoot拦截器Interceptor与过滤器Filter深度解析... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现与实

基于C#实现MQTT通信实战

《基于C#实现MQTT通信实战》MQTT消息队列遥测传输,在物联网领域应用的很广泛,它是基于Publish/Subscribe模式,具有简单易用,支持QoS,传输效率高的特点,下面我们就来看看C#实现... 目录1、连接主机2、订阅消息3、发布消息MQTT(Message Queueing Telemetr

Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例

《Nginx使用Keepalived部署web集群(高可用高性能负载均衡)实战案例》本文介绍Nginx+Keepalived实现Web集群高可用负载均衡的部署与测试,涵盖架构设计、环境配置、健康检查、... 目录前言一、架构设计二、环境准备三、案例部署配置 前端 Keepalived配置 前端 Nginx