LangGraph基础:有状态工作流与核心概念

本文介绍 LangGraph中的工作流。

LangGraph基础:有状态工作流与核心概念

前言

各位同学,上一章我们掌握了LangChain的基础工作流构建,当面对更复杂的任务——比如需要循环迭代、动态分支、状态回溯的场景时,LangChain的链式流就显得有些局限了。

这一章我们聚焦LangGraph,它作为LangChain生态中负责复杂工作流编排的核心框架,1.0版本后更是强化了生产级特性,能帮我们实现有状态、可追溯、高灵活度的工作流。

接下来我们从理念到实操,逐步吃透LangGraph。

LangGraph 是 LangChain 团队在 2024 年推出的核心框架,到了 2025 年,它已经成为构建 Production-Ready(生产级)Agent 的行业标准。

先提前准备依赖(终端执行安装):

1
pip install langgraph

langgraph 推荐安装版本 ≥1.0.0

1 LangGraph图结构驱动的工作流

在正式上手前,我们先搞懂LangGraph的设计初衷。大家可以把工作流想象成一张“任务地图”,传统链式流是“单一路线”,而LangGraph的图结构是“多节点、多路径”的地图,能应对更复杂的路况。

1.1 为什么需要图结构工作流?

在AI应用开发中,我们常面临超越线性流程的复杂任务。比如一个智能客服系统,需要先识别用户意图,再根据意图分发到技术支持、订单咨询等不同节点,若回答不达标还需回退重生成——这种包含分支、循环的场景,传统线性链式工作流(如LangChain早期Chain)难以高效支撑。

线性流程的核心痛点是“刚性”:一旦定义好步骤顺序,难以动态调整,且无法很好地管理跨步骤的数据共享和状态回溯。而图结构工作流通过“节点”封装功能、“边”定义路径、“状态”共享数据,能精准解决这些问题,让复杂流程的每一步都可控制、可追溯。

LangGraph通过对智能体工作流进行图结构建模,它具有以下几个核心价值:

  • 显式控制: 通过流程图明确指定执行路径,避免智能体在复杂任务中"乱跑"
  • 可预测性: 结果更稳定,行为更可预测
  • 精细调控: 明确规定下一步走哪个节点、何时调用工具、何时循环或终止

6-1

1.2 与LangChain工作流的差异与互补性

很多同学会问:有了LangChain,为什么还要用LangGraph?其实两者不是替代关系,而是互补关系,LangChain的Agent能力甚至是基于LangGraph运行时构建的。

对比维度:流程灵活性、状态管理、循环与分支支持

核心维度LangChainLangGraph
流程灵活性采用链式流设计,适用于线性、固定步骤的任务;抽象层级高,降低上手门槛,能快速搭建简单流程。基于图结构流构建,原生支持分支、循环、并行等复杂流程逻辑,灵活性极强;需手动定义节点及节点间的边,对流程设计能力要求更高。
状态管理状态分散存储于各个Chain组件中,上下文传递需手动编码实现,无统一管理入口,扩展性较弱。具备统一状态对象,支持字段级状态合并策略(如追加、覆盖等),状态管理更规范;内置状态持久化能力,无需额外开发适配,降低运维成本。
循环与分支支持仅能通过条件判断代码实现简单分支逻辑,循环功能需手动控制执行次数,难以适配复杂动态流程。通过条件边、循环边原生支持复杂分支与循环逻辑,可配置终止条件避免无限循环,适配多场景交互、动态决策等复杂业务需求。

总结

  • LangChain更适合快速搭建线性、固定流程的轻量化应用,上手成本低,能满足简单任务的开发需求;
  • LangGraph则聚焦复杂流程场景,尤其适用于智能体工作流、合规审核多步骤校验等场景。

实际开发中,我们可以这样选择:快速搭建原型、任务流程固定(如“文本生成→格式转换”),选LangChain链式流,效率更高;任务流程复杂(有分支、循环、并行)、需要状态追溯、追求生产级稳定性(如多Agent协作、长时任务),选LangGraph图结构流。

2 LangGraph核心概念:图状态、点、边

LangGraph的核心是“图(Graph)”,而构成图的三大组件是:状态(State)、节点(Nodes)、边(Edges)。这三者就像乐高积木,既能搭简单的线性流程,也能拼出复杂的分支、循环工作流。

提示:先理解“组件作用”,再结合代码实操,能快速突破抽象壁垒。

2.1 状态(State):工作流的“共享黑板”

状态是LangGraph工作流的“数据中枢”,所有节点的输入、输出都围绕它展开。

通俗类比:把工作流想象成班级大扫除,状态就是一块共享黑板——扫地的同学(节点1)把“地面已清洁”写在黑板上,擦玻璃的同学(节点2)从黑板看到进度后,完成工作再补充“玻璃已擦完”,所有节点只和黑板交互,不用互相喊话。

这种设计让每个步骤的结果都可追溯,也支持中途暂停、恢复(比如大扫除中途放学,第二天看黑板就知道从哪继续)。

核心本质:共享数据载体+状态快照

  • 共享数据载体:所有节点都能读取、修改状态中的数据(比如字段、列表、字典),确保数据一致性。
  • 状态快照:配合工具,每一步状态变化都会被记录,支持回溯和断点续跑(比如长任务中途服务器重启,恢复后从上次状态继续,不用重做)。

定义规范:TypedDict类型定义与字段设计原则

LangGraph 1.0+推荐使用Python的TypedDict+ Annotated定义状态结构,明确字段类型和含义,提升代码可读性和类型校验能力。

字段设计需遵循三个原则:

  1. 最小必要原则:只定义工作流必需的字段,避免冗余数据占用内存;
  2. 可更新原则:将需要跨节点传递、修改的数据设为状态字段,固定不变的配置无需放入;
  3. 清晰命名原则:字段名需直观反映数据含义,如“user_query”“generated_text”“quality_score”。

基础定义示例(TypedDict方式):

1
2
3
4
5
6
7
8
9
from typing import TypedDict, NotRequired

class TaskState(TypedDict):
    user_query: str #用户原始查询
    tool_result: NotRequired[str] #工具调用结果
    final_answer: NotRequired[str] #最终回答
    progress: NotRequired[int] #任务进度百分比

#NotRequired 表示是非必须要求

也可以用 Pydantic 的方式

1
2
3
4
5
6
7
8
from pydantic import BaseModel, Field
from typing import Optional

class TaskState(BaseModel):
    user_query: str = Field(description="用户原始查询")
    tool_result: Optional[str] = Field(default=None, description="工具调用结果")
    final_answer: Optional[str] = Field(default=None, description="最终回答")
    progress: Optional[int] = Field(default=None, description="任务进度百分比")

两种写法各有优劣,目前2种写法官方都是支持的,因为Pydantic 写法学习起来有难度,本文以下教学均使用TypedDict的写法,但实际的企业生产建议是使用Pydantic 写法,更工程化一些。

2.1.2 状态传递机制

LangGraph的状态传递采用“不可变更新”原则:节点接收当前状态的副本,执行逻辑后返回更新后的部分状态(无需返回完整状态),框架会自动合并为新的全局状态,再传递给下一个节点。

这种机制的优势是避免节点间相互干扰,每个节点只需关注自己需要处理的字段。

例如

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def parse_query(state: TaskState):
    print("\n====== 节点1 parse_query 输入状态 ======")
    print(state)

    query = state.user_query
    update = {
        "tool_result": f"已解析问题: {query}",
        "progress": 30
    }

    return update

是不是还是感觉内容很抽象,看不懂,多说无益,我们通过一个代码案例来学习:

现在我们假设一个简单的智能体工作流场景:用户输入问题,系统先解析问题,再调用外部工具获取信息,最后生成最终回答。

整个过程需要通过 LangGraph 的边来定义执行顺序,从而形成一个可自动运行的任务流水线。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# NotRequired 来自 Python 的 typing 模块扩展,它是 PEP 655 引入的特性
# NotRequired 从 Python 3.11 开始内置支持
from typing import TypedDict, NotRequired

#如果你使用的是 Python ≤3.10,需要安装 typing_extensions:
# pip install typing_extensions
#from typing import TypedDict
#from typing_extensions import NotRequired


class TaskState(TypedDict):
    user_query: str #用户原始查询
    tool_result: NotRequired[str] #工具调用结果
    final_answer: NotRequired[str] #最终回答
    progress: NotRequired[int] #任务进度百分比

# ======节点函数=========
def parse_query(state: TaskState):
    print("\n====== 节点1 parse_query 输入状态 ======")
    print(state)

    query = state["user_query"]
    update = {
        "tool_result": f"已解析问题: {query}",
        "progress": 30
    }

    print("------ 节点1 更新字段 ------")
    print(update)

    return update


def call_tool(state: TaskState):
    print("\n====== 节点2 call_tool 输入状态 ======")
    print(state)

    result = f"工具搜索结果:关于『{state['user_query']}』的相关知识"
    update = {
        "tool_result": result,
        "progress": 70
    }

    print("------ 节点2 更新字段 ------")
    print(update)

    return update


def generate_answer(state: TaskState):
    print("\n====== 节点3 generate_answer 输入状态 ======")
    print(state)

    answer = f"最终回答:基于工具结果 -> {state['tool_result']}"
    update = {
        "final_answer": answer,
        "progress": 100
    }

    print("------ 节点3 更新字段 ------")
    print(update)

    return update

#========构建 LangGraph 工作流======
from langgraph.graph import StateGraph

# 1. 创建图(绑定状态类型)
builder = StateGraph(TaskState)

# 2. 添加节点
builder.add_node("parse_query", parse_query)
builder.add_node("call_tool", call_tool)
builder.add_node("generate_answer", generate_answer)

# 3. 定义执行顺序(边)
builder.set_entry_point("parse_query")
builder.add_edge("parse_query", "call_tool")
builder.add_edge("call_tool", "generate_answer")

# 4. 编译图
graph = builder.compile()

# ====== 运行工作流 ======
init_state = TaskState(user_query="什么是 LangGraph?")

final_state = graph.invoke(init_state)

print("\n最终状态:")
print(final_state)

运行结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
====== 节点1 parse_query 输入状态 ======
user_query='什么是 LangGraph?' tool_result='' final_answer='' progress=0
------ 节点1 更新字段 ------
{'tool_result': '已解析问题: 什么是 LangGraph?', 'progress': 30}

====== 节点2 call_tool 输入状态 ======
user_query='什么是 LangGraph?' tool_result='已解析问题: 什么是 LangGraph?' final_answer='' progress=30
------ 节点2 更新字段 ------
{'tool_result': '工具搜索结果:关于『什么是 LangGraph?』的相关知识', 'progress': 70}

====== 节点3 generate_answer 输入状态 ======
user_query='什么是 LangGraph?' tool_result='工具搜索结果:关于『什么是 LangGraph?』的相关知识' final_answer='' progress=70
------ 节点3 更新字段 ------
{'final_answer': '最终回答:基于工具结果 -> 工具搜索结果:关于『什么是 LangGraph?』的相关知识', 'progress': 100}

最终状态:
{'user_query': '什么是 LangGraph?', 'tool_result': '工具搜索结果:关于『什么是 LangGraph?』的相关知识', 'final_answer': '最终回答:基于工具结果 -> 工具搜索结果:关于『什么是 LangGraph?』的相关知识', 'progress': 100}

从上面的图就能清晰的看到状态在不同节点之间进行流转。

2.1.3 状态传递机制:节点间的数据共享与更新逻辑

状态传递的逻辑很简单:工作流启动时初始化状态,每个节点执行时接收当前状态,处理后返回需要更新的字段,LangGraph会根据合并策略更新全局状态,再传递给下一个节点。

这里要注意:节点返回的字典只需包含要更新的字段,无需返回完整状态。比如节点只更新progress字段,就返回{“progress”: 1},LangGraph会自动合并到全局状态中。

思考: 为什么节点返回的字典只需包含要更新的字段,无需返回完整状态?

2.2 节点(Nodes):工作流的“功能小工人”

节点是工作流的“执行单元”,相当于图中的“站点”,每个节点封装一段具体逻辑(比如调用LLM、查数据库、处理数据),本质是“输入状态→处理→返回更新状态”的纯函数。

通俗类比:每个节点都是一个“专业小工人”,只做一件事——从共享黑板(状态)拿材料,做完后把结果写回黑板,不依赖外部无关信息。

2.2.1 核心作用:接收状态、执行逻辑、输出新状态

节点的核心作用有三个:一是接收全局状态,提取所需字段作为输入;二是执行具体逻辑,比如调用LLM、工具、数据处理;三是将执行结果封装成状态更新字典,返回给LangGraph。

需要注意:节点必须是无副作用的纯函数,逻辑只依赖输入的状态,不依赖外部变量,这样才能保证状态的可追溯性和可复现性。

2.2.2 节点类型:LLM调用节点、工具调用节点、数据处理节点

根据功能不同,节点主要分为三类:

LLM调用节点:负责调用大模型生成内容,比如根据用户查询生成回答,核心是将状态中的字段(如user_query)传入模型,将生成结果存入状态。

工具调用节点:负责调用外部工具,比如Elasticsearch检索、天气API查询,核心是提取状态中的参数,调用工具后将结果更新到状态。

数据处理节点:负责数据转换、校验、格式处理,比如将生成的回答格式化、校验结果是否合格,核心是对状态中的数据进行加工,返回处理后的值。

下面结合自定义TaskState演示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from typing import TypedDict, NotRequired

class TaskState(TypedDict):
    user_query: str #用户原始查询
    intent: NotRequired[str] #用户意图
    llm_answer: NotRequired[str] #LLM 生成的回答
    tool_result: NotRequired[str] #工具调用结果
    final_answer: NotRequired[str] #最终回答
    progress: NotRequired[int] #任务进度百分比

#=======LLM 调用节点(模拟)========= 
#LLM 节点只负责“生成文本”,不负责工具、不负责格式化。
def llm_node(state: TaskState):
    print("\n🧠 [LLM Node] 输入状态:", state)

    # 模拟大模型生成
    llm_output = f"LangGraph 是一种用于构建可控AI工作流的框架,问题是:{state['user_query']}"

    return {
        "llm_answer": llm_output,
        "progress": 30
    } 

# ====== 工具调用节点(模拟检索) ======
#工具节点只做 I/O,不生成语言,不控制流程。
def tool_node(state: TaskState):
    print("\n🔧 [Tool Node] 输入状态:", state)

    # 模拟搜索工具
    tool_output = f"检索结果:LangGraph 支持 DAG、状态机、Agent workflow"

    return {
        "tool_result": tool_output,
        "progress": 70
    }

# ====== 节点3:数据处理节点 ======
#数据节点负责加工 state,而不是生成或检索。
def data_process_node(state: TaskState):
    print("\n🧹 [Data Node] 输入状态:", state)

    final = f"""
【LLM回答】{state["llm_answer"]}
【工具补充】{state["tool_result"]}
"""

    return {
        "final_answer": final.strip(),
        "progress": 100
    }
    
from langgraph.graph import StateGraph

# 1. 创建图
builder = StateGraph(TaskState)

# 2. 添加节点
builder.add_node("llm_node", llm_node)
builder.add_node("tool_node", tool_node)
builder.add_node("data_process_node", data_process_node)

# 3. 定义流程
builder.set_entry_point("llm_node")
builder.add_edge("llm_node", "tool_node")
builder.add_edge("tool_node", "data_process_node")

# 4. 编译
graph = builder.compile()

# 初始化状态
init_state = TaskState(user_query="什么是 LangGraph?")

# 执行
final_state = graph.invoke(init_state)

print("\n====== 最终状态 ======")
print(final_state)

运行结果如下所示:

1
2
3
4
5
6
7
8
9

🧠 [LLM Node] 输入状态: user_query='什么是 LangGraph?' llm_answer='' tool_result='' final_answer='' progress=0

🔧 [Tool Node] 输入状态: user_query='什么是 LangGraph?' llm_answer='LangGraph 是一种用于构建可控AI工作流的框架,问题是:什么是 LangGraph?' tool_result='' final_answer='' progress=30

🧹 [Data Node] 输入状态: user_query='什么是 LangGraph?' llm_answer='LangGraph 是一种用于构建可控AI工作流的框架,问题是:什么是 LangGraph?' tool_result='检索结果:LangGraph 支持 DAG、状态机、Agent workflow' final_answer='' progress=70

====== 最终状态 ======
{'user_query': '什么是 LangGraph?', 'llm_answer': 'LangGraph 是一种用于构建可控AI工作流的框架,问题是:什么是 LangGraph?', 'tool_result': '检索结果:LangGraph 支持 DAG、状态机 、Agent workflow', 'final_answer': '【LLM回答】LangGraph 是一种用于构建可控AI工作流的框架,问题是:什么是 LangGraph?\n【工具补充】检索结果:LangGraph 支持 DAG、状态机、Agent workflow', 'progress': 100}

结论: 在 LangGraph 中,节点(Node)是 AI 工作流的最小执行单元,负责完成具体计算任务,如调用大模型、调用外部工具或进行数据处理。每个节点遵循“读取状态 → 执行逻辑 → 返回状态更新”的工作模式,本质上是一个以状态为输入、以状态补丁为输出的纯函数。

2.2.3 节点实现规范:函数定义格式与状态交互方式

LangGraph 1.0+对节点函数有明确的规范要求,确保框架能正确解析和执行:

  1. 输入参数:必须包含状态参数(通常命名为state),类型需与定义的状态结构一致(如ConversationState);可选包含config参数,用于接收工作流配置。
  2. 输出格式:支持三种形式——返回状态片段(字典或TypedDict实例)、返回None(无状态更新)、返回多个状态片段(通过元组)。
  3. 交互原则:节点只能通过状态与其他节点交互,禁止直接调用其他节点函数,确保流程的可追溯性。

错误示例(直接调用其他节点):

1
2
3
4
5
6

# 不推荐:节点间直接调用,破坏流程完整性
def bad_node(state: ConversationState):
    answer = generate_answer(state)  # 直接调用其他节点
    return {"generated_answer": answer}
      

2.3 边(Edges):工作流的“路径导航员”

边是连接节点的“路径”,定义了节点间的跳转规则,决定了工作流的执行顺序。LangGraph 1.0+支持两种核心边类型:固定边和条件边。

2.3.1 核心作用:定义节点间的跳转规则

边的核心作用就是“导航”,告诉工作流“当前节点执行完后,下一步该去哪个节点”。没有边的连接,节点就是孤立的,无法形成完整工作流。简单来说,边就是工作流的“执行路线图”。

2.3.2 核心边类型与实践

固定边是最简单的边类型,指定“从A节点到B节点”的固定跳转,无论状态如何变化,执行完A后必然执行B,适合线性流程。

条件边是动态边,根据当前状态判断跳转方向,支持多分支跳转,适合需要动态决策的场景。比如根据用户需求意图,判断是走“总结”分支还是“改写”分支。

此外,LangGraph还支持循环边,本质是条件边的一种特殊形式,通过判断状态是否满足终止条件,决定是回溯到前序节点继续执行,还是进入下一个节点。

固定边(Fixed Edges):节点执行完成后,固定跳转到下一个节点,适用于线性流程。通过add_edge()方法配置,参数为“起始节点名”和“目标节点名”。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from typing import TypedDict, NotRequired

# ====== 全局共享状态(黑板) ======
class TaskState(TypedDict):
    user_query: str #用户原始查询
    intent: NotRequired[str] #用户意图
    llm_answer: NotRequired[str] #LLM 生成的回答
    tool_result: NotRequired[str] #工具调用结果
    final_answer: NotRequired[str] #最终回答
    progress: NotRequired[int] #任务进度百分比
    
    
#========固定边(线性流程)实践代码======
def parse_intent(state: TaskState):
    print("\n🔹 parse_intent")
    # 简单模拟意图识别
    intent = "summarize" if "总结" in state['user_query'] else "rewrite"
    return {"intent": intent, "progress": 30}


def summarize_node(state: TaskState):
    print("\n🔹 summarize_node")
    return {"llm_answer": f"总结结果: {state['user_query']}", "progress": 60}


def rewrite_node(state: TaskState):
    print("\n🔹 rewrite_node")
    return {"llm_answer": f"改写结果: {state['user_query']}", "progress": 60}


def final_node(state: TaskState):
    print("\n🔹 final_node")
    return {"progress": 100}

from langgraph.graph import StateGraph

builder = StateGraph(TaskState)

builder.add_node("parse_intent", parse_intent)
builder.add_node("summarize_node", summarize_node)
builder.add_node("rewrite_node", rewrite_node)
builder.add_node("final_node", final_node)

# 固定边(线性)
builder.set_entry_point("parse_intent")
builder.add_edge("parse_intent", "summarize_node")   # 固定走总结
builder.add_edge("summarize_node", "final_node")

graph = builder.compile()

state = TaskState(user_query="请帮我总结这段话")
print(graph.invoke(state))

运行结果如下所示:

1
2
3
4
5
6
7

🔹 parse_intent

🔹 summarize_node

🔹 final_node
{'user_query': '请帮我总结这段话', 'intent': 'summarize', 'result': '总结结果: 请帮我总结这段话', 'progress': 100}

也可以看一下图的执行状态(在ipynb代码里执行

1
2
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

6-2

从图中能清楚看到节点的运行状态,可以从图中看到我们将rewrite_node 增加到了节点 但是没有用边连接起来,于是rewrite_node变成了孤立的节点~

学习实践: 将rewrite_node 使用边连接起来,比较最后生成的图

条件边:根据当前状态动态决定跳转路径,适用于分支流程。通过add_conditional_edges()方法配置,需指定“起始节点名”“条件判断函数”和“路径映射表”

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from typing import TypedDict, NotRequired

# ====== 全局共享状态(黑板) ======
class TaskState(TypedDict):
    user_query: str #用户原始查询
    intent: NotRequired[str] #用户意图
    llm_answer: NotRequired[str] #LLM 生成的回答
    tool_result: NotRequired[str] #工具调用结果
    final_answer: NotRequired[str] #最终回答
    progress: NotRequired[int] #任务进度百分比

def parse_intent(state: TaskState):
    print("\n🔹 parse_intent")
    # 简单模拟意图识别
    intent = "summarize" if "总结" in state['user_query'] else "rewrite"
    return {"intent": intent, "progress": 30}


def summarize_node(state: TaskState):
    print("\n🔹 summarize_node")
    return {"llm_answer": f"总结结果: {state['user_query']}", "progress": 60}


def rewrite_node(state: TaskState):
    print("\n🔹 rewrite_node")
    return {"llm_answer": f"改写结果: {state['user_query']}", "progress": 60}


def final_node(state: TaskState):
    print("\n🔹 final_node")
    return {"progress": 100}


from langgraph.graph import StateGraph

builder = StateGraph(TaskState)

builder.add_node("parse_intent", parse_intent)
builder.add_node("summarize_node", summarize_node)
builder.add_node("rewrite_node", rewrite_node)
builder.add_node("final_node", final_node)

def route_by_intent(state: TaskState):
    # ✅ 修复:字典必须用 [] 访问,不能用 .
    if state["intent"] == "summarize":
        return "summarize_node"
    else:
        return "rewrite_node"

builder.set_entry_point("parse_intent")

# 条件边
builder.add_conditional_edges(
    "parse_intent",
    route_by_intent,
    {
        "summarize_node": "summarize_node",
        "rewrite_node": "rewrite_node",
    }
)

builder.add_edge("summarize_node", "final_node")
builder.add_edge("rewrite_node", "final_node")

graph = builder.compile()


print("\n====== 测试总结 ======")
# ✅ 传入普通字典即可,不需要 TaskState() 构造
print(graph.invoke({"user_query": "请总结这段话"}))

print("\n====== 测试改写 ======")
print(graph.invoke({"user_query": "请改写这段话"}))

运行结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
====== 测试总结 ======

🔹 parse_intent

🔹 summarize_node

🔹 final_node
{'user_query': '请总结这段话', 'intent': 'summarize', 'result': '总结结果: 请总结这段话', 'progress': 100}

====== 测试改写 ======

🔹 parse_intent

🔹 rewrite_node

🔹 final_node
{'user_query': '请改写这段话', 'intent': 'rewrite', 'result': '改写结果: 请改写这段话', 'progress': 100}

也可以看一下图的执行状态(在ipynb代码里执行

1
2
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

6-3

循环边

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from typing import TypedDict, NotRequired

# ====== 全局共享状态(黑板) ======
class TaskState(TypedDict):
    user_query: str #用户原始查询
    intent: NotRequired[str] #用户意图
    llm_answer: NotRequired[str] #LLM 生成的回答
    tool_result: NotRequired[str] #工具调用结果
    final_answer: NotRequired[str] #最终回答
    progress: NotRequired[int] #任务进度百分比

def parse_intent(state: TaskState):
    print("\n🔹 parse_intent")
    # 简单模拟意图识别
    intent = "summarize" if "总结" in state['user_query'] else "rewrite"
    return {"intent": intent, "progress": 30}


def summarize_node(state: TaskState):
    print("\n🔹 summarize_node")
    return {"llm_answer": f"总结结果: {state['user_query']}", "progress": 60}


def rewrite_node(state: TaskState):
    print("\n🔹 rewrite_node")
    return {"llm_answer": f"改写结果: {state['user_query']}", "progress": 60}


def final_node(state: TaskState):
    print("\n🔹 final_node")
    return {"progress": 100}


from langgraph.graph import StateGraph

#=======循环节点======
def loop_node(state: TaskState):
    print("\n🔄 loop_node, progress =", state.progress)
    return {"progress": state.progress + 30}

#=======循环条件函数======
def loop_router(state: TaskState):
    if state['progress'] >= 100:
        return "final_node"
    return "loop_node"

#=======构建图=========
builder = StateGraph(TaskState)

builder.add_node("loop_node", loop_node)
builder.add_node("final_node", final_node)

builder.set_entry_point("loop_node")

builder.add_conditional_edges(
    "loop_node",
    loop_router,
    {
        "loop_node": "loop_node",     # 回环
        "final_node": "final_node"    # 终止
    }
)

graph = builder.compile()

print(graph.invoke(TaskState(user_query="test")))

运行结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
🔄 loop_node, progress = 0

🔄 loop_node, progress = 30

🔄 loop_node, progress = 60

🔄 loop_node, progress = 90

🔹 final_node
{'user_query': 'test', 'intent': '', 'result': '', 'progress': 100}

也可以看一下图的执行状态(在ipynb代码里执行

1
2
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

循环边本质是“返回前序节点”的条件边,用于需要重复执行的场景(比如校验结果不合格,重新调用工具)

2.4总结

  • 状态(State):存数据、做共享,是工作流的“中枢”,用Pydantic定义更清晰。
  • 节点(Nodes):做执行、处理逻辑,是“功能单元”,必须是纯函数。
  • 边(Edges):定路线、控顺序,分固定边(线性)和条件边(分支/循环)。

三者组合的核心逻辑:状态贯穿全程,节点处理逻辑,边控制流向,这也是LangGraph能实现复杂工作流的根本。

3 LangGraph运行机制:基于超步骤的消息传递

了解了三大组件后,我们再来看LangGraph的运行机制。LangGraph采用“超步骤(super-step)”机制管理执行流程,确保节点执行的有序性和状态的一致性。

LangGraph 工作流是如何“跑起来”的?

LangGraph 的工作流执行过程,可以理解为:👉 状态驱动的任务调度引擎

它不是简单地按顺序调用函数,而是像一个智能调度器,根据状态和边规则动态决定“谁该执行”。

3.1 超步骤(Super-step)与节点活跃状态

超步骤是LangGraph执行工作流的基本单位,可理解为“一轮并行任务执行周期”。每个超步骤内,框架会完成三件事:

  1. 激活节点:所有收到消息(状态更新)的节点会从“休眠状态”变为“活跃状态”;
  2. 执行节点:所有活跃节点并行执行逻辑,生成新的状态片段;
  3. 传递消息:将新的状态片段作为消息,传递给下一个节点,同时节点回归休眠状态。

这种机制让LangGraph天然支持并行执行——同一超步骤内的活跃节点可同时运行,提升流程效率;同时通过消息传递实现节点间的协同,确保状态一致性。

⚙️ 节点活跃状态:谁在这一轮被调度?

在某个超步骤中:

  • 可能只激活一个节点
  • 也可能同时激活多个节点

👉 这些被激活的节点叫 活跃节点(Active Nodes)类比👉像多名工人同时在流水线上工作。

3.2 LangGraph 工作流执行全过程

一个完整的LangGraph工作流执行流程可分为六个阶段:

1.启动:调用graph.invoke()方法传入初始状态,框架将初始状态作为消息,发送给入口节点,入口节点被激活,进入第一个超步骤。

1
2
用户输入:{"user_query": "什么是 LangGraph?"}
系统创建 初始状态黑板,并激活起始节点。

2.节点激活:入口节点接收消息后变为活跃状态,其他节点保持休眠。

1
2
3
4
LangGraph 查看 Graph
parse  tool  generate
决定当前应执行的节点集合,例如:
当前激活节点 = [parse]

3.逻辑执行:活跃节点执行自身逻辑,读取初始状态,生成更新后的状态片段。

1
2
3
4
5
每个活跃节点:
读取状态
执行逻辑
返回状态更新
例如:{"tool_result": "..."}

4.状态传递:框架合并状态片段为新的全局状态,根据边的规则,将新状态作为消息传递给下一个(或多个)节点,激活对应节点。

1
2
3
LangGraph 自动执行:
state = merge(old_state, node_return)
更新后的状态会传递给下一级节点。

5.停止判断:是否继续执行?

1
2
3
4
系统判断:
是否还有后续节点
是否触发 END
是否满足循环终止条件

终止:输出最终结果

1
2
3
4
当没有可执行节点时:

👉 返回最终状态
👉 输出 AI 结果

好啦,你现在已经掌握了LangGraph的精髓~~~

3.3 并行与顺序执行

LangGraph的调度规则核心是“基于节点依赖关系”,分为顺序执行和并行执行两种场景,具体由边的配置和节点依赖决定:

1. 顺序执行:当节点A的后续节点只有节点B,且节点B依赖节点A的状态结果时,触发顺序执行——节点A执行完成并更新状态后,节点B才被激活,即“一超步骤一节点”。

适用场景:步骤间有强依赖(如摘要生成依赖去重后的文本,敏感词校验依赖摘要结果),前文案例均为顺序执行场景。

2. 并行执行:当节点A的后续节点有多个(如节点B和节点C),且节点B、C无相互依赖,仅依赖节点A的状态结果时,触发并行执行——节点A执行完成后,节点B、C被同时激活,纳入同一个超步骤并行执行。

适用场景:步骤间无依赖,比如现在有一个文体,同时生成文本的摘要和关键词(注意,摘要和关键词一般认为这2个是没有联系的)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from langgraph.graph import StateGraph, START, END
from typing import Optional
from typing import TypedDict, Optional, NotRequired
import time
import random

# ===== 1. 定义状态(共享黑板) =====

class TextProcessState(TypedDict):
    raw_text: str
    summary_text: NotRequired[str]
    keyword_text: NotRequired[str]
    has_sensitive: NotRequired[bool]
    final_text: NotRequired[str]

# ===== 2. 去重节点(入口节点) =====
def deduplicate_node(state: TextProcessState):
    print("\n【节点 deduplicate】执行中...")
    text = state["raw_text"]   # ✅ TypedDict 访问方式
    time.sleep(1)
    return {"raw_text": text}

# ===== 3. 摘要节点(并行节点1) =====
def summary_node(state: TextProcessState):
    print("⚡ 并行节点 summary 执行中...")
    time.sleep(random.uniform(1, 2))
    summary = "摘要:" + state["raw_text"][:10]
    return {"summary_text": summary}

# ===== 4. 关键词节点(并行节点2) =====
def keyword_node(state: TextProcessState):
    print("⚡ 并行节点 keyword 执行中...")
    time.sleep(random.uniform(1, 2))
    keywords = "、".join(state["raw_text"].split(",")[:3])
    return {"keyword_text": keywords}

# ===== 5. 并行汇合节点 =====
def sensitive_check_node(state: TextProcessState):
    print("\n【节点 sensitive_check】汇总并行结果")
    print(" summary_text =", state["summary_text"])
    print(" keyword_text =", state["keyword_text"])

    sensitive = "暴力" in state["raw_text"]
    return {
        "has_sensitive": sensitive,
        "final_text": f"最终输出 | 摘要={state['summary_text']} | 关键词={state['keyword_text']}"
    }

# ===== 6. 构建 LangGraph =====
builder = StateGraph(TextProcessState)

builder.add_node("deduplicate", deduplicate_node)
builder.add_node("summary", summary_node)
builder.add_node("keyword", keyword_node)
builder.add_node("sensitive_check", sensitive_check_node)

builder.add_edge(START, "deduplicate")

# 并行分叉
builder.add_edge("deduplicate", "summary")
builder.add_edge("deduplicate", "keyword")

# 并行汇合
builder.add_edge("summary", "sensitive_check")
builder.add_edge("keyword", "sensitive_check")

builder.add_edge("sensitive_check", END)

graph = builder.compile()

# ===== 7. 运行 =====
if __name__ == "__main__":
    init_state = TextProcessState(
        raw_text="LangGraph很强大,支持状态管理,支持动态分支,并行执行"
    )

    final_state = graph.invoke(init_state)

    print("\n====== 最终状态 ======")
    print(final_state)

运行结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
【节点 deduplicate】执行中...
⚡ 并行节点 keyword 执行中...
⚡ 并行节点 summary 执行中...

【节点 sensitive_check】汇总并行结果
 summary_text = 摘要:LangGraph很
 keyword_text = LangGraph很强大、支持状态管理、支持动态分支

====== 最终状态 ======
{'raw_text': 'LangGraph很强大,支持状态管理,支持动态分支,并行执行', 'summary_text': '摘要:LangGraph很', 'keyword_text': 'LangGraph很强大、支持状态管理、支持动态分支', 'has_sensitive': False, 'final_text': '最终输出 | 摘要=摘要:LangGraph很 | 关键词=LangGraph很强大、支持状态管理、支持动态分支'}

学习小结:顺序执行依赖“单后续节点+强状态依赖”,并行执行依赖“多后续节点+无相互依赖”;LangGraph会自动根据节点依赖和边配置选择调度方式,无需手动设置“并行/顺序”,一般而言只需关注业务逻辑的依赖关系即可。

4 综合实操

本节将通过三个递进式综合案例,从简单到复杂构建完整工作流,覆盖线性、分支、循环三种核心场景,帮助大家整合所学知识,实现“从理论到落地”的跨越。所有案例均采用LangGraph 1.0+接口,代码可直接运行(替换API密钥即可)

4.1 实操准备:LangGraph环境确认与核心API导入

首先确保环境配置正确,核心依赖安装完成,同时梳理常用核心API,方便后续案例调用:

1. 环境依赖安装

1
pip install langgraph langchain langchain-openai python-dotenv

推荐安装版本 ≥1.0.0

2. 核心API导入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()
# 加载并验证环境变量(1.x推荐显式验证,避免配置缺失)
API_KEY = os.getenv("LLM_API_KEY")
BASE_URL = os.getenv("LLM_BASE_URL")
MODEL = os.getenv("LLM_MODEL_ID")

if not API_KEY:
    raise ValueError("未检测到 API_KEY,请检查 .env 文件是否配置正确")

llm = ChatOpenAI(
    api_key=API_KEY,
    base_url=BASE_URL,
    model=MODEL,  # 注意:根据你使用的模型修改名称!!!! 后面章节不再继续说明
    temperature=0.3,  # 随机性:0-1,越小越严谨,越大越有创造力
)

4.2 案例1:简单线性工作流——文本处理全流程串联

学习目标:整合“状态、节点、边”三大组件,构建“文本去重→摘要生成→敏感词校验→结果输出”的线性流程,掌握完整工作流的搭建、运行与结果校验方法,同时体验状态快照与可视化功能。

4.2.1 案例设计思路

流程逻辑:用户输入原始文本→先去重清理冗余内容→生成简洁摘要→校验摘要是否含敏感词→格式化输出结果。全程用固定边串联节点,依赖状态传递数据,最终通过可视化图结构和状态历史,验证流程正确性。

知识点: 状态字段的完整设计、节点函数复用与组合、固定边配置、图可视化、状态历史追溯。

完整代码实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# ================== 基础依赖 ==================
from typing import TypedDict, NotRequired
from langgraph.graph import StateGraph, START, END
from langchain_core.prompts import PromptTemplate
from langgraph.checkpoint.memory import MemorySaver

# LLM(真实模型)
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

# 加载环境变量
load_dotenv()
# 加载并验证环境变量(1.x推荐显式验证,避免配置缺失)
API_KEY = os.getenv("LLM_API_KEY")
BASE_URL = os.getenv("LLM_BASE_URL")
MODEL = os.getenv("LLM_MODEL_ID")

if not API_KEY:
    raise ValueError("未检测到 API_KEY,请检查 .env 文件是否配置正确")

llm = ChatOpenAI(
    api_key=API_KEY,
    base_url=BASE_URL,
    model=MODEL,  # 注意:根据你使用的模型修改名称!!!! 后面章节不再继续说明
    temperature=0.3,  # 随机性:0-1,越小越严谨,越大越有创造力
)

# ==========================================================
# 1. 定义 State(工作流共享状态 = Agent 内存)
# ==========================================================
class TextProcessState(TypedDict):
    """
    LangGraph 状态对象:
    用于在节点之间传递数据(类似全局共享内存)
    """
    raw_text: str                        # 输入:用户原始文本
    deduplicated_text: NotRequired[str]  # 过程:去重后的文本
    summary_text: NotRequired[str]       # 过程:LLM 生成摘要
    has_sensitive: NotRequired[bool]     # 过程:敏感词检测结果
    final_output: NotRequired[str]       # 输出:最终格式化结果


# ==========================================================
# 2. 定义节点函数(每个节点 = 一个处理模块)
# ==========================================================
def deduplicate_node(state: TextProcessState) -> TextProcessState:
    """文本去重节点"""
    raw_text = state["raw_text"]
    lines = raw_text.split("\n")
    unique_lines = []
    seen = set()
    for line in lines:
        line_stripped = line.strip()
        if line_stripped and line_stripped not in seen:
            seen.add(line_stripped)
            unique_lines.append(line)
    print("✅ 去重节点执行完成")
    return {"deduplicated_text": "\n".join(unique_lines)}

def summary_node(state: TextProcessState) -> TextProcessState:
    """摘要生成节点(调用LLM)"""
    deduplicated_text = state["deduplicated_text"]
    prompt = PromptTemplate(
        input_variables=["text"],
        template="请为以下文本生成50字以内的简洁摘要,保留核心信息:\n{text}"
    )
    chain = prompt | llm
    summary = chain.invoke({"text": deduplicated_text}).content
    print("🤖 摘要节点执行完成")
    return {"summary_text": summary}

def sensitive_check_node(state: TextProcessState) -> TextProcessState:
    """敏感词检测节点"""
    summary = state["summary_text"]
    sensitive_words = ["敏感词1", "敏感词2", "违法", "违规"]
    has_sensitive = any(word in summary for word in sensitive_words)
    print("🔍 敏感词检测完成:", has_sensitive)
    return {"has_sensitive": has_sensitive}

def output_node(state: TextProcessState) -> TextProcessState:
    """输出节点(根据敏感词结果格式化)"""
    summary = state["summary_text"]
    has_sensitive = state["has_sensitive"]
    if has_sensitive:
        final_output = "⚠️ 检测到敏感内容,无法输出摘要"
    else:
        final_output = f"""✅ 文本处理完成
【摘要】
{summary}

【去重后原文】
{state['deduplicated_text']}
"""
    print("📤 输出节点执行完成")
    return {"final_output": final_output}


# ==========================================================
# 3. 构建线性工作流图(固定边)
# ==========================================================
def build_linear_graph():
    """构建线性 LangGraph 工作流,**实际启用状态历史**"""
    graph_builder = StateGraph(TextProcessState)

    # 注册节点
    graph_builder.add_node("deduplicate", deduplicate_node)
    graph_builder.add_node("summary", summary_node)
    graph_builder.add_node("sensitive_check", sensitive_check_node)
    graph_builder.add_node("output", output_node)

    # 配置固定边(线性执行)
    graph_builder.add_edge(START, "deduplicate")
    graph_builder.add_edge("deduplicate", "summary")
    graph_builder.add_edge("summary", "sensitive_check")
    graph_builder.add_edge("sensitive_check", "output")
    graph_builder.add_edge("output", END)

    # 【修改核心】编译时传入MemorySaver,真正启用状态历史
    # MemorySaver:内存级检查点,适合测试/开发,重启程序后状态丢失
    return graph_builder.compile(checkpointer=MemorySaver())


# ==========================================================
# 4. 测试运行(get_state_history 可正常使用)
# ==========================================================
if __name__ == "__main__":

    # 构建图
    linear_graph = build_linear_graph()

    # 初始状态(输入数据)
    test_state: TextProcessState = {
        "raw_text": "LangGraph是LangChain生态的工作流框架\nLangGraph支持状态管理\nLangGraph是LangChain生态的工作流框架\n支持动态分支和并行执行"
    }

    # thread_id:会话唯一标识,测试用随便命名,多会话用不同id即可
    config = {"configurable": {"thread_id": "text_process_test_001"}}

    # ========== 关键修改2:invoke时传入config ==========
    final_state = linear_graph.invoke(test_state, config=config)

    # 输出最终结果
    print("\n" + "=" * 50)
    print(final_state["final_output"])

    # ========== 关键修改3:get_state_history时也传入同一个config ==========
    print("\n" + "=" * 50)
    history = list(linear_graph.get_state_history(config))   # 必须传config
    print("状态快照数量(超步骤):", len(history))

    for i, state in enumerate(history, 1):
        print(f"\n{i}步:")
        print("state", state)
    

运行结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
 去重节点执行完成
🤖 摘要节点执行完成
🔍 敏感词检测完成: False
📤 输出节点执行完成

==================================================
✅ 文本处理完成
【摘要】
LangGraph是LangChain生态的工作流框架,支持状态管理、动态分支与并行执行。

【去重后原文】
LangGraph是LangChain生态的工作流框架
LangGraph支持状态管理
支持动态分支和并行执行


==================================================
状态快照数量(超步骤): 6

第1步:
state StateSnapshot(values={...

也可以看一下图的执行状态

1
2
3
4
png_data = linear_graph.get_graph().draw_mermaid_png()  # 获取PNG字节流
    with open("linear_text_process_graph.png", "wb") as file:  # wb=二进制写入
        file.write(png_data)
    print("📊 工作流可视化图已保存:linear_text_process_graph.png\n")

6-5

4.3 案例2:分支工作流——带结果校验的动态文本处理

本案例目标:在案例1基础上,新增“摘要质量校验”分支逻辑,实现“质量合格→输出结果”“质量不合格→回退重生成”的动态流转,掌握条件边配置、分支节点设计与状态依赖处理。

4.3.1 案例设计思路

流程逻辑:文本去重→生成摘要→敏感词校验(过滤风险)→摘要质量校验(判断是否符合长度、信息完整性)→质量合格则输出,不合格则回退到摘要生成节点重生成(限制重生成次数,避免无限循环)。

知识点: 条件边多分支配置、循环逻辑设计、状态字段扩展(记录重生成次数)、循环终止条件控制。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# ================== 依赖 ==================
from typing import TypedDict, NotRequired
from langgraph.graph import StateGraph, START, END
from langchain_core.prompts import PromptTemplate
from langgraph.checkpoint.memory import MemorySaver
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

# ================== LLM 初始化 ==================
load_dotenv()
# 加载并验证环境变量(1.x推荐显式验证,避免配置缺失)
API_KEY = os.getenv("LLM_API_KEY")
BASE_URL = os.getenv("LLM_BASE_URL")
MODEL = os.getenv("LLM_MODEL_ID")

if not API_KEY:
    raise ValueError("未检测到 API_KEY,请检查 .env 文件是否配置正确")

llm = ChatOpenAI(
    api_key=API_KEY,
    base_url=BASE_URL,
    model=MODEL,  # 注意:根据你使用的模型修改名称!!!! 后面章节不再继续说明
    temperature=0.3,  # 随机性:0-1,越小越严谨,越大越有创造力
)

# ==========================================================
# 6.4.3 分支工作流案例:带结果校验的动态文本处理
# ==========================================================

# ----------------------------------------------------------
# 1️⃣ 状态定义:扩展工作流状态(新增循环控制字段)
# ----------------------------------------------------------
class BranchTextProcessState(TypedDict):
    """分支工作流共享状态(类似共享黑板)"""

    # 输入字段
    raw_text: str

    # 中间过程字段(可选)
    deduplicated_text: NotRequired[str]
    summary_text: NotRequired[str]
    has_sensitive: NotRequired[bool]

    # 循环控制字段
    rewrite_count: NotRequired[int]      # 重生成次数
    quality_valid: NotRequired[bool]     # 摘要质量是否合格

    # 最终输出
    final_output: NotRequired[str]


# ----------------------------------------------------------
# 2️⃣ 节点定义(工作流执行单元)
# ----------------------------------------------------------

# === 文本去重节点 ===
def deduplicate_node(state: BranchTextProcessState):
    raw_text = state["raw_text"]
    lines = raw_text.split("\n")
    seen, unique_lines = set(), []

    for line in lines:
        line = line.strip()
        if line and line not in seen:
            seen.add(line)
            unique_lines.append(line)

    print("✅ [Node] 去重完成")
    return {"deduplicated_text": "\n".join(unique_lines)}


# === 摘要生成节点 ===
def summary_node(state: BranchTextProcessState):
    text = state.get("deduplicated_text", "")
    if not text:
        return {"summary_text": "无有效文本"}

    prompt = PromptTemplate(
        input_variables=["text"],
        template="请为以下文本生成50字以内摘要,保留核心信息:\n{text}"
    )
    summary = (prompt | llm).invoke({"text": text}).content

    print("🤖 [Node] 摘要生成:", summary)
    return {"summary_text": summary}


# === 敏感词检测节点 ===
def sensitive_check_node(state: BranchTextProcessState):
    summary = state.get("summary_text", "")
    sensitive_words = ["违法", "违规"]
    has_sensitive = any(w in summary for w in sensitive_words)

    print(f"🔍 [Node] 敏感词检测: {has_sensitive}")
    return {"has_sensitive": has_sensitive}


# === 摘要质量校验节点(教学重点) ===
def quality_check_node(state: BranchTextProcessState):
    summary = state.get("summary_text", "")

    # 长度校验
    length_valid = 15 <= len(summary) <= 50

    # 信息完整性校验
    core_keywords = ["LangGraph", "工作流"]
    info_valid = all(k in summary for k in core_keywords)

    quality_valid = length_valid and info_valid
    print(f"📏 [Node] 质量校验 | 长度OK={length_valid} | 关键词OK={info_valid} | 合格={quality_valid}")

    return {"quality_valid": quality_valid}


# === 重生成次数更新节点 ===
def update_rewrite_count_node(state: BranchTextProcessState):
    count = state.get("rewrite_count", 0) + 1
    print(f"🔢 [Node] 重生成次数 -> {count}")
    return {"rewrite_count": count}


# ----------------------------------------------------------
# 3️⃣ Router:条件分支决策(LangGraph 核心)
# ----------------------------------------------------------
def rewrite_router(state: BranchTextProcessState):
    quality = state.get("quality_valid", False)
    count = state.get("rewrite_count", 0)

    print(f"🚦 [Router] quality={quality}, rewrite_count={count}")

    # 质量合格 → 输出
    if quality:
        return "to_output"

    # 不合格且次数 < 2 → 重生成
    if count < 2:
        return "to_rewrite"

    # 次数耗尽 → 强制输出
    return "to_force_output"


# ----------------------------------------------------------
# 4️⃣ 输出节点
# ----------------------------------------------------------

# 正常输出
def output_node(state: BranchTextProcessState):
    summary = state.get("summary_text", "")
    has_sensitive = state.get("has_sensitive", False)

    if has_sensitive:
        final_output = "⚠️ 检测到敏感内容,禁止输出摘要"
    else:
        final_output = f"""
✅ 文本处理完成
重生成次数: {state.get('rewrite_count', 0)}

【摘要】
{summary}

【去重原文】
{state.get('deduplicated_text')}
"""

    print("📤 [Node] 正常输出")
    return {"final_output": final_output}


# 强制输出
def force_output_node(state: BranchTextProcessState):
    summary = state.get("summary_text", "")
    final_output = f"""
⚠️ 摘要多次重生成仍不合格(教学示例)
重生成次数: {state.get('rewrite_count', 0)}
摘要长度: {len(summary)}

强制输出摘要:
{summary}
"""
    print("📤 [Node] 强制输出")
    return {"final_output": final_output}


# ----------------------------------------------------------
# 5️⃣ 构建 LangGraph 分支工作流
# ----------------------------------------------------------
def build_branch_graph():
    graph = StateGraph(BranchTextProcessState)

    # 注册节点
    graph.add_node("deduplicate", deduplicate_node)
    graph.add_node("summary", summary_node)
    graph.add_node("sensitive_check", sensitive_check_node)
    graph.add_node("quality_check", quality_check_node)
    graph.add_node("update_rewrite_count", update_rewrite_count_node)
    graph.add_node("output", output_node)
    graph.add_node("force_output", force_output_node)

    # 固定执行路径
    graph.add_edge(START, "deduplicate")
    graph.add_edge("deduplicate", "summary")
    graph.add_edge("summary", "sensitive_check")
    graph.add_edge("sensitive_check", "quality_check")

    # 条件分支(教学核心)
    graph.add_conditional_edges(
        "quality_check",
        rewrite_router,
        {
            "to_output": "output",
            "to_rewrite": "update_rewrite_count",
            "to_force_output": "force_output",
        }
    )

    # 循环回退路径
    graph.add_edge("update_rewrite_count", "summary")

    # 结束节点
    graph.add_edge("output", END)
    graph.add_edge("force_output", END)

    return graph.compile(checkpointer=MemorySaver())


# ----------------------------------------------------------
# 6️⃣ 运行测试(课堂演示用)
# ----------------------------------------------------------
if __name__ == "__main__":
    branch_graph = build_branch_graph()

    # 初始状态
    test_state: BranchTextProcessState = {
        "raw_text": "LangGraph是LangChain生态下的有状态工作流框架,支持图结构建模、状态追溯、动态分支和并行执行,适用于复杂AI任务编排",
        "rewrite_count": 0,
    }

    config = {"configurable": {"thread_id": "branch_workflow_demo"}}

    print("\n🚀 启动分支工作流示例\n" + "=" * 60)
    final_state = branch_graph.invoke(test_state, config=config)

    # 输出结果
    print("\n🎯 最终结果:")
    print(final_state["final_output"])

    print("\n📊 执行统计:")
    print("重生成次数:", final_state.get("rewrite_count"))
    print("质量是否合格:", final_state.get("quality_valid"))

    # 状态历史(教学亮点)
    history = list(branch_graph.get_state_history(config))
    print(f"\n📜 状态历史步数: {len(history)}")

    # 可视化图
    png_data = branch_graph.get_graph().draw_mermaid_png()
    with open("branch_workflow_graph.png", "wb") as f:
        f.write(png_data)
    print("📊 工作流图已保存: branch_workflow_graph.png")

运行结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
🚀 启动分支工作流示例
============================================================
✅ [Node] 去重完成
🤖 [Node] 摘要生成: LangGraph是LangChain生态的有状态工作流框架,支持图结构建模、状态追溯、动态分支和并行执行,适用于复杂AI任务编排。
🔍 [Node] 敏感词检测: False
📏 [Node] 质量校验 | 长度OK=False | 关键词OK=True | 合格=False
🚦 [Router] quality=False, rewrite_count=0
🔢 [Node] 重生成次数 -> 1
🤖 [Node] 摘要生成: LangGraph是LangChain生态的有状态工作流框架,支持图结构建模、状态追溯、动态分支和并行执行,适用于复杂AI任务编排。
🔍 [Node] 敏感词检测: False
📏 [Node] 质量校验 | 长度OK=False | 关键词OK=True | 合格=False
🚦 [Router] quality=False, rewrite_count=1
🔢 [Node] 重生成次数 -> 2
🤖 [Node] 摘要生成: LangGraph是LangChain生态的有状态工作流框架,支持图结构建模、状态追溯、动态分支和并行执行,适用于复杂AI任务编排。
🔍 [Node] 敏感词检测: False
📏 [Node] 质量校验 | 长度OK=False | 关键词OK=True | 合格=False
🚦 [Router] quality=False, rewrite_count=2
📤 [Node] 强制输出

🎯 最终结果:

⚠️ 摘要多次重生成仍不合格(教学示例)
重生成次数: 2
摘要长度: 66

强制输出摘要:
LangGraph是LangChain生态的有状态工作流框架,支持图结构建模、状态追溯、动态分支和并行执行,适用于复杂AI任务编排。


📊 执行统计:
重生成次数: 2
质量是否合格: False

📜 状态历史步数: 15
📊 工作流图已保存: branch_workflow_graph.png

我们看一下工作图

6-6

4.4 案例3:循环工作流——人机交互式文本优化

本案例目标:构建“用户输入→文本优化→结果反馈→用户确认→确认通过→结束/确认不通过→重新优化”的多轮交互循环流程,模拟智能编辑助手场景,掌握循环边、人机交互节点、持久化状态的使用。

4.4.1 案例设计思路

流程逻辑:用户输入待优化文本→AI优化文本→生成优化建议→展示结果给用户→用户输入“确认”则结束,输入“修改”则回退到优化节点重新生成,输入“退出”则终止流程。核心是实现“机器节点→人机交互节点→循环/终止”的闭环。

知识点:循环边配置、动态输入接收、流程中断控制。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187


import os
from typing import TypedDict, Optional
from dotenv import load_dotenv
from langgraph.graph import StateGraph, START, END
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
# ------------------------------
# 1. 环境加载与模型初始化(保留你的DeepSeek配置,无任何修改)
# ------------------------------
load_dotenv()
# 加载并验证环境变量(1.x推荐显式验证,避免配置缺失)
API_KEY = os.getenv("LLM_API_KEY")
BASE_URL = os.getenv("LLM_BASE_URL")
MODEL = os.getenv("LLM_MODEL_ID")

if not API_KEY:
    raise ValueError("未检测到 API_KEY,请检查 .env 文件是否配置正确")

llm = ChatOpenAI(
    api_key=API_KEY,
    base_url=BASE_URL,
    model=MODEL,  # 注意:根据你使用的模型修改名称!!!! 后面章节不再继续说明
    temperature=0.3,  # 随机性:0-1,越小越严谨,越大越有创造力
)

# ------------------------------
# 2. 定义交互式状态(强类型,持久化存储所有流程数据)
# ------------------------------
class InteractiveOptState(TypedDict):
    user_input: str                # 固定:用户原始输入(全程不变)
    optimized_text: Optional[str]  # 动态:AI优化后文本(多轮更新)
    optimize_suggest: Optional[str]# 动态:优化建议/理由(多轮更新)
    user_feedback: Optional[str]   # 动态:用户反馈(确认/修改/退出)
    final_result: Optional[str]    # 最终:流程结束结果

# ------------------------------
# 3. 核心节点函数(无任何修改,保留你的原代码)
# ------------------------------
def optimize_node(state: InteractiveOptState) -> InteractiveOptState:
    """【机器节点】文本优化核心节点,使用管道符调用LLM"""
    user_input = state["user_input"]
    user_feedback = state["user_feedback"]

    if not user_feedback:
        prompt = PromptTemplate(
            input_variables=["text"],
            template="请优化以下文本,提升流畅度和专业度,严格保留核心信息:\n{text}\n优化完成后,单独一行以【优化理由:】开头给出1-2条简洁优化原因"
        )
        chain = prompt | llm
        result = chain.invoke({"text": user_input}).content
    else:
        prompt = PromptTemplate(
            input_variables=["text", "feedback"],
            template="根据用户反馈针对性优化文本,严格保留核心信息:\n原文本:{text}\n用户反馈:{feedback}\n优化完成后,单独一行以【优化理由:】开头给出1-2条简洁优化原因"
        )
        chain = prompt | llm
        result = chain.invoke({"text": user_input, "feedback": user_feedback}).content

    split_flag = "【优化理由:】"
    if split_flag in result:
        optimized_text, optimize_suggest = result.split(split_flag, 1)
    else:
        optimized_text = result
        optimize_suggest = "AI未生成明确优化理由,建议重新优化"

    return {
        "optimized_text": optimized_text.strip(),
        "optimize_suggest": optimize_suggest.strip()
    }

def feedback_node(state: InteractiveOptState) -> InteractiveOptState:
    """【人机交互节点】展示结果+接收用户反馈,流程中断核心"""
    print("\n" + "-"*60)
    print("📝 AI优化后文本:")
    print(state["optimized_text"])
    print("\n💡 优化建议/理由:")
    print(state["optimize_suggest"])
    print("\n" + "-"*60)

    while True:
        user_feedback = input("请输入反馈(仅需输入:确认/修改/退出):").strip()
        if user_feedback in ["确认", "修改", "退出"]:
            break
        print("❌ 输入无效!请严格输入「确认」「修改」「退出」,无其他字符\n")
    return {"user_feedback": user_feedback}

def feedback_router(state: InteractiveOptState) -> str:
    """【条件路由节点】循环核心,直接返回目标节点名(最新API要求)"""
    feedback = state["user_feedback"]
    if feedback == "确认":
        return "final"    # 确认→final节点
    elif feedback == "修改":
        return "optimize" # 修改→optimize节点(循环核心)
    else:
        return "exit"     # 退出→exit节点

def final_node(state: InteractiveOptState) -> InteractiveOptState:
    """【机器节点】流程正常结束,生成格式化结果"""
    final_result = (
        "✅ 【多轮文本优化流程完成】\n"
        f"📌 最终优化文本:\n{state['optimized_text']}\n"
        f"💡 优化核心总结:\n{state['optimize_suggest']}"
    )
    return {"final_result": final_result}

def exit_node(state: InteractiveOptState) -> InteractiveOptState:
    """【机器节点】用户主动退出,生成终止提示"""
    return {"final_result": "🔚 【文本优化流程终止】\n你主动退出,本次无最终优化结果"}

# ------------------------------
# 4. 搭建循环交互图(无任何修改,保留你的原代码)
# ------------------------------
def build_interactive_graph():
    """构建LangGraph循环状态图,彻底适配最新API终极规范"""
    graph_builder = StateGraph(InteractiveOptState)

    # 添加节点(无修改)
    graph_builder.add_node("optimize", optimize_node)
    graph_builder.add_node("feedback", feedback_node)
    graph_builder.add_node("final", final_node)
    graph_builder.add_node("exit", exit_node)

    # 配置普通边(无修改)
    graph_builder.add_edge(START, "optimize")
    graph_builder.add_edge("optimize", "feedback")

    # 适配最新API:source + path
    graph_builder.add_conditional_edges(
        source="feedback",  # 分支起始节点
        path=feedback_router  # 路由函数(直接返回目标节点名)
    )

    # 配置结束边(无修改)
    graph_builder.add_edge("final", END)
    graph_builder.add_edge("exit", END)

    # 编译图:开启状态持久化(多轮交互必需)
    return graph_builder.compile(checkpointer=MemorySaver())

# ------------------------------
# 5. 运行交互测试(★仅修改初始输入部分★,改为用户手动输入+非空校验)
# ------------------------------
if __name__ == "__main__":
    # 构建循环图(彻底解决所有API报错)
    interactive_graph = build_interactive_graph()
    print("🔧 多轮交互式文本优化工具已启动(适配LangGraph最新API)...\n")

    # ★核心修改:用户手动输入待优化句子 + 非空校验★
    print("="*40 + " 输入待优化句子 " + "="*40)
    while True:
        user_input_text = input("请输入需要AI优化的句子:").strip()
        if user_input_text:  # 非空校验,避免用户输入空内容
            break
        print("❌ 输入不能为空,请重新输入需要优化的句子!\n")

    # 初始状态:使用用户输入的句子,其余字段保持默认
    initial_state: InteractiveOptState = {
        "user_input": user_input_text,  # 替换为用户输入的内容
        "optimized_text": None,
        "optimize_suggest": None,
        "user_feedback": None,
        "final_result": None
    }

    # 启动多轮交互流程(保留你的config配置)
    print(f"\n🚀 已接收你的句子,开始第一轮AI优化...")
    config = {"configurable": {"thread_id": "text_process_test_001"}}
    final_state = interactive_graph.invoke(initial_state, config=config)

    # 展示最终结果
    print("\n" + "="*60)
    print(final_state["final_result"])
    print("="*60)

    # 展示交互轮次(状态持久化验证)
    history = list(interactive_graph.get_state_history(config))
    interact_rounds = len(history) // 2  # 每轮=优化节点+反馈节点
    print("状态快照数量(超步骤):", len(history))

    # 保存可视化流程图(保留你的原代码)
    png_data = interactive_graph.get_graph().draw_mermaid_png()  # 获取PNG字节流
    with open("interactive_optimize_graph.png", "wb") as file:  # wb=二进制写入
        file.write(png_data)
    print("📊 工作流可视化图已保存:interactive_optimize_graph.png\n")

运行结果如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
请输入需要AI优化的句子:LangGraph这工具不错,能做工作流,比以前的Chain好用


------------------------------------------------------------
📝 AI优化后文本:
LangGraph是一款优秀的工作流构建工具,相较于传统的Chain方案,其在功能与实用性上表现更为出色。

💡 优化建议/理由:
1. 将口语化、模糊的表达转化为具体、专业的表述,如将“不错”明确为“优秀”,“好用”具体化为“在功能与实用性上表现更为出色”。
2. 优化了句子结构与逻辑关系,使对比更清晰、论述更流畅,提升了整体表达的严谨性。

------------------------------------------------------------
请输入反馈(仅需输入:确认/修改/退出):修改

📝 AI优化后文本:
LangGraph是一款优秀的工作流构建工具,相较于传统的Chain,它在功能与易用性上更具优势。

💡 优化建议/理由:
1. 用词更正式、具体,如“优秀的工作流构建工具”明确了核心功能。
2. 通过对比突出优势,使表述更客观有力。

------------------------------------------------------------
请输入反馈(仅需输入:确认/修改/退出):确认

✅ 【多轮文本优化流程完成】
📌 最终优化文本:
LangGraph是一款优秀的工作流构建工具,相较于传统的Chain,它在功能与易用性上更具优势。
💡 优化核心总结:
1. 用词更正式、具体,如“优秀的工作流构建工具”明确了核心功能。
2. 通过对比突出优势,使表述更客观有力。
============================================================

核心小结:

1. 人机交互节点设计:feedback_node通过input()函数接收用户手动输入,实现“机器流程→人工干预→机器流程”的闭环,这类节点在智能助手、审批系统等场景中高频使用,核心是“状态接收用户输入,驱动后续流程”。

2. 动态优化适配:optimize_node根据“user_feedback”是否存在,切换不同提示词,实现“针对性优化”——体现了状态的“记忆能力”,让多轮交互更智能,而非机械重复。

3. 流程中断控制:通过“退出”分支直接终止流程,无需等待循环条件满足,给用户主动控制权,实际开发中可扩展为“超时退出”“异常退出”等多场景中断逻辑。

4.5 综合实操小结

本节三个案例从“线性→分支→循环”逐步递进,覆盖了LangGraph工作流的核心应用场景,核心要点可归纳为三点:

第一,状态是核心枢纽:所有流程流转、节点协作都依赖状态,字段设计需“覆盖全链路需求”,同时通过状态历史实现追溯,这是LangGraph与其他工作流框架的核心差异。

第二,边是流程灵魂:固定边保障基础线性流转,条件边实现动态分支,循环边支撑多轮交互,三类边的灵活组合可适配绝大多数复杂场景,配置时需重点关注“分支判断逻辑”和“循环终止条件”。

第三,节点是功能载体:节点可封装任意逻辑(LLM调用、工具调用、人机交互),适配状态接口即可复用,开发时需遵循“单一职责”,避免节点逻辑过于复杂,便于调试和维护。

5 本章总结与实践建议

5.1 核心知识梳理

本章围绕LangGraph三大核心组件(状态、节点、边)展开,从基础概念到融合实操,构建了“组件→机制→应用”的知识体系:

  1. 组件核心:状态是数据枢纽,节点是功能载体,边是路径灵魂,三者的灵活组合是构建复杂工作流的基础。
  2. 运行机制:以超步骤为单位的消息传递的,支持顺序与并行执行,理解该机制可精准调试流程执行顺序与节点依赖。
  3. 融合应用:LangGraph可无缝集成RAG、智能体、外部工具,实现“检索→生成→优化”“工具调用→分支决策”等生产级流程,解决传统线性流程的刚性问题。

5.2 实践落地建议

  1. 从小场景入手:首次落地可从线性流程(如文本处理)开始,熟练后逐步添加分支、循环逻辑,再融合RAG、智能体等复杂场景。
  2. 重视状态设计:字段需覆盖“输入→过程→输出”全链路,避免冗余字段,同时预留调试字段(如重试次数、质量分数),便于问题排查。
  3. 节点拆分原则:遵循“单一职责”,将复杂逻辑拆分为多个轻量节点,便于单独调试、复用和扩展,避免单个节点逻辑过于臃肿。
  4. 强化容错设计:生产级流程需添加异常捕获、重试机制、循环终止条件,避免工具调用失败、逻辑错误导致流程卡死。

5.3 实践练习

完成本章中的langgraph学习案例,从实践中感悟langgraph的特性,完成综合实操的3个案例,针对案例3进行优化,进一步提升智能体的效果。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计