Skip to content

LangGraph 实际应用

TIP

通过完整的 AI Agent 实例,展示 LangGraph 在复杂工作流中的应用。

AI 研究助手

python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
from langchain_openai import ChatOpenAI

class ResearchState(TypedDict):
    topic: str
    search_results: Annotated[list, add]
    analysis: str
    report: str
    iteration: int

llm = ChatOpenAI(model="gpt-4", temperature=0.3)

def search_node(state: ResearchState):
    results = search_web(state["topic"])
    return {"search_results": [results], "iteration": state.get("iteration", 0) + 1}

def generate_node(state: ResearchState):
    report = llm.invoke(f"基于分析: {state['analysis']}\n生成报告: {state['topic']}")
    return {"report": report}

def need_more_search(state: ResearchState):
    if state["iteration"] < 3:
        return "continue"
    return "generate"

builder = StateGraph(ResearchState)
builder.add_node("search", search_node)
builder.add_node("generate", generate_node)
builder.set_entry_point("search")
builder.add_conditional_edges("search", need_more_search,
    {"continue": "search", "generate": "generate"})
builder.add_edge("generate", END)

graph = builder.compile()
result = graph.invoke({
    "topic": "2024大模型发展",
    "search_results": [], "analysis": "", "report": "", "iteration": 0
})

关键特性

  • 状态持久化:所有节点共享状态
  • 条件路由:根据结果动态选择路径
  • 循环支持:可以构建 Agent 思考-行动循环
  • 可编译:编译后可以直接 invoke