同步阻塞式流式响应
Web框架使用Flask,在run_stream()函数中手写for循环驱动agent,等待LLM返回token,拿到token即yield,发给前端。
业务逻辑严格依赖既定的pipeline顺序,不涉及高并发,不需要复杂的并行agent执行。
使用 Server-Sent Events (SSE) 实现的流式 API 接口
SSE通信机制
一种服务器向客户端单向推送实时数据的技术,WebSocket是双向的。
1
2
| 客户端 ----(HTTP请求)----> 服务器
客户端 <---(持续推送数据)-- 服务器
|
SSE数据格式
1
2
3
4
5
6
| # 注意必须是两个换行符
data: {"type": "agent_output_chunk", "content": "分析中..."}\n\n
# 比如以下消息
data: {"type": "agent_start", "agent": "Researcher"}\n\n
data: {"type": "agent_output_chunk", "content": "正在分析..."}\n\n
data: {"type": "done"}\n\n
|
API端分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| @api_bp.route('/agent/research/stream', methods=['POST'])
def multi_agent_research_stream():
# 1. 接收研究主题
data = request.get_json()
message = data.get('message', '') # 如分析人工智能在医疗领域的应用
# 2. 验证输入
if not message:
return jsonify({'success': False, 'msg': '研究主题不能为空'}), 400
# 3. 创建生成器函数
def generate():
# 获取多智能体系统实例
multi_agent = get_research_multi_agent()
# 执行流式处理,逐个yield事件
for event in multi_agent.run_stream(message):
# 将事件转换为SSE格式
yield f"data: {json_module.dumps(event, ensure_ascii=False)}\n\n"
# 发送完成标记
yield f"data: {json_module.dumps({'type': 'done'})}\n\n"
# 4. 返回SSE响应
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache', # 禁用缓存
'Connection': 'keep-alive', # 保持连接
'X-Accel-Buffering': 'no' # 禁用Nginx缓冲
}
)
|
Multi-Agent执行流程
pipeline模式,定义好了agent执行顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| def run_stream(self, user_input: str):
# 1. 工作流开始
yield {
"type": "workflow_start",
"workflow": "academic_research",
"agents": ["Supervisor", "Researcher", "Critic", "Ideator", "Writer"]
}
# 2. 定义Agent执行顺序
agent_sequence = [
("supervisor", self.supervisor, "任务理解与规划"), # 理解用户需求
("researcher", self.researcher, "文献调研"), # 查找相关文献
("critic", self.critic, "批判性分析"), # 分析现有研究
("ideator", self.ideator, "创意生成"), # 提出新想法
("writer", self.writer, "研究报告撰写") # 撰写最终报告
]
# 3. 依次执行每个Agent
for agent_key, agent, description in agent_sequence:
# 通知Agent开始
yield {"type": "agent_start", "agent": agent.name, "description": description}
# 执行Agent并流式输出
for event in agent.run_stream(state):
yield event # 包含 agent_output_chunk 等事件
# 如果Agent完成,更新状态
if event.get("type") == "agent_complete":
result = event.get("result", {})
state = {**state, **result} # 合并结果到状态
# 通知Agent结束
yield {"type": "agent_end", "agent": agent.name}
# 4. 工作流完成
yield {
"type": "workflow_complete",
"final_output": state.get("final_output"), # 最终研究报告
"elapsed_time": elapsed_time
}
|
子agent服务端实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # 发送事件,表示agent开始输出
yield {"type": "agent_output_start", "agent": self.name}
full_response = ""
for chunk in self._invoke_llm_stream(self.get_system_prompt(), user_prompt):
full_response += chunk
# 每收到一个文本块,立即发送给客户端,同时累积完整响应
yield {"type": "agent_output_chunk", "agent": self.name, "content": chunk}
yield {"type": "agent_output_end", "agent": self.name}
yield {
"type": "agent_complete",
"agent": self.name,
"result": {
"critique_results": {
"topic": research_topic,
"analysis": full_response
}
}
}
|
pipeline设计模式的通信流程
1
2
3
4
5
6
7
8
| 第1层:前端 → Flask(HTTP)
# 前端发起请求
fetch('/api/paper/read/stream', { method: 'POST', body: {...} })
# Flask 接收
@api_bp.route('/paper/read/stream', methods=['POST'])
def paper_read_stream():
...
|
1
2
3
4
5
6
7
8
9
10
11
| 第2层:Flask → LangGraph(函数调用)
# Flask 路由中调用 LangGraph
def paper_read_stream():
multi_agent = get_paper_reader_multi_agent()
def generate():
# 这里调用 LangGraph
for event in multi_agent.run_stream(images, paper_name):
yield f"data: {json.dumps(event)}\n\n"
return Response(generate(), mimetype='text/event-stream')
|
1
2
3
4
5
6
| 第3层:LangGraph 内部(Agent 执行)
#(同步 for 循环)
def run_stream(self, images, paper_name):
for agent in [planner, extractor, analyzer, critic, summarizer]:
for token in agent.run_stream(state):
yield token # 一个个 token 往外吐
|
异步高并发
想要实现真正的异步高并发Multi-Agent架构,supervisor节点可以动态路由,worker节点可以并行执行,支持自动合并多个worker的处理结果。
可以充分利用LangGraph的能力(Send、Annotated、astream_events),或者把 LangGraph 当成状态容器然后自己写调度逻辑。
LangGraph内置能力
图拓扑管理
每一条边自定义任务逻辑,通过Send API,根据Supervisor的决策,动态生成需要执行的节点。
状态管理
Supervisor和所有的Worker节点通过共享的状态字典State交换数据;在LangGraph中,可以通过 TypedDict 和 Python 的 operator.add,用自动合并函数处理多个Worker的并行结果。
1
2
3
4
5
| class ResearchState(TypedDict):
research_outputs: Annotated[List[WorkerOutput], operator.add] # 自动合并
messages: Annotated[List[str], operator.add] # 对话历史累积
builder.set_state(ResearchState) # 定义共享状态
|
利用Send API自动实现并行任务调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| V1 事件格式(当前)
// SSE event 名称统一是 "message",通过 data.type 区分
event: message
data: {"type": "conversation_id", "conversation_id": "xxx"}
event: message
data: {"type": "reading_start", "paper_name": "xxx", "agents": [...]}
event: message
data: {"type": "agent_start", "agent": "Planner", "description": "..."}
event: message
data: {"type": "agent_output_chunk", "agent": "Planner", "content": "方"}
event: message
data: {"type": "agent_output_end", "agent": "Planner"}
event: message
data: {"type": "agent_complete", "agent": "Planner", "result": {...}}
event: message
data: {"type": "reading_complete", "final_output": "...", "elapsed_time": 10.5}
event: message
data: {"type": "done"}
|
1
| from langgraph.constants import Send
|
1
2
3
4
5
| # 返回单个 Send = 顺序执行
return Send("extractor", state)
# 返回 Send 列表 = 并行执行
return [Send("analyzer", state), Send("critic", state)]
|
并行执行的完整流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| # graph.py
def parallel_router(state: PaperReaderState) -> List[Send] | Literal["finalize"]:
steps = state.get("next_steps", []) # 来自Supervisor,如analyzer,critic
if not steps or "FINISH" in steps:
return "finalize"
routes = []
for step in steps:
routes.append(Send(step, state)) # 每个 step 创建一个 Send
# 返回 [Send("analyzer", state), Send("critic", state)]
# LangGraph 引擎会并行调度这两个节点
return routes
|
LangGraph原生 astream_events 实现动态路由和Token级流式
一般这样拿到结果result = await graph.ainvoke(state)
astream_events可以获取每个token、每个节点的变化
async for event in graph.astream_events(state, version="v2"):
处理三类事件:节点开始、Token流、节点结束,然后yield各种类型的输出。
核心步骤
创建共享状态 state.py
Annotated写法对三个字段messages、messages、completed_agents的类型提示。
Annotated允许在不改变变量类型的前提下,给这个类型贴上标签或附加元数据,T是原本的类型,x可以是字符串、函数、类的实例等。对于Langraph框架,会检查第二个参数operator.add,做自己的更新处理。
operator.add在List上的作用就是拼接,避免覆盖了原本的输出,实现状态更新。
1
2
| from typing import Annotated
Annotated[T, x]
|
1
2
3
4
5
| # 使用 Annotated + operator.add 实现自动累加合并
class PaperReaderState(TypedDict, total=False):
messages: Annotated[List[BaseMessage], operator.add]
worker_outputs: Annotated[List[WorkerOutput], operator.add]
completed_agents: Annotated[List[str], operator.add]
|
创建Woker
每一个worker都是一个异步函数,返回状态更新:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| async def analyzer_node(state: PaperReaderState, config: RunnableConfig = None) -> Dict[str, Any]:
images = state.get("paper_images", [])
metadata = state.get("paper_metadata", {})
llm = get_llm(streaming=True)
messages = build_messages(
system_prompt="你是方法论分析专家...",
user_prompt="请分析这篇论文的方法...",
images=images
)
full_response = ""
async for chunk in llm.astream(messages, config=config):
if chunk.content:
full_response += chunk.content
# 返回状态更新(会自动合并到全局状态)
return {
"methodology": {"raw_content": full_response},
"worker_outputs": [{"source": "analyzer", "content": full_response, "metadata": {}}],
"completed_agents": ["analyzer"]
}
|
创建Supervisor,实现动态路由
指定supervisor_node输出两个字段,next_steps和iteration。
用pydantic.BaseModel,定义输出格式是RouteDecision。
with_structured_output(RouteDecision) 是LangChain的一个方法,在底层将RouteDecision转化成json,调用OpenAI的Function Calling能力,强制模型按照这个Schema生成数据。
这样返回的decision不再是一段文本,而是一个对象,包含了next_steps字段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| from pydantic import BaseModel, Field
class RouteDecision(BaseModel):
next_workers: List[str] = Field(description="下一步执行的 Agent 列表")
reasoning: str = Field(description="决策理由")
class SupervisorAgent:
def __init__(self):
self.llm = ChatOpenAI(...).with_structured_output(RouteDecision)
async def decide(self, state: PaperReaderState) -> RouteDecision:
# 构建状态摘要
state_summary = self._build_state_summary(state)
# LLM 决策
decision = await self.llm.ainvoke({
"workers_description": "...",
"state_summary": state_summary
})
return decision
async def supervisor_node(state: PaperReaderState) -> Dict[str, Any]:
supervisor = get_supervisor()
decision = await supervisor.decide(state)
return {
"next_steps": decision.next_workers, # 如 ["analyzer", "critic"] 或 ["FINISH"]
"iteration": state.get("iteration", 0) + 1
}
|
构建图 graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| from langgraph.graph import StateGraph, END
from langgraph.constants import Send
def parallel_router(state: PaperReaderState) -> List[Send] | str:
"""根据 Supervisor 决策路由,支持并行"""
steps = state.get("next_steps", [])
if not steps or "FINISH" in steps:
return "finalize"
# 使用 Send 实现真正的并行执行
routes = []
for step in steps:
routes.append(Send(step, state))
return routes
def build_graph():
workflow = StateGraph(PaperReaderState)
# 1. 添加节点
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("planner", planner_node)
workflow.add_node("extractor", extractor_node)
workflow.add_node("analyzer", analyzer_node)
workflow.add_node("critic", critic_node)
workflow.add_node("summarizer", summarizer_node)
workflow.add_node("finalize", finalize_node)
# 2. 设置入口
workflow.set_entry_point("supervisor")
# 3. Supervisor → 条件路由(支持并行)
workflow.add_conditional_edges(
"supervisor",
parallel_router,
["planner", "extractor", "analyzer", "critic", "summarizer", "finalize"]
)
# 4. Workers → 回到 Supervisor(循环)
workflow.add_edge("planner", "supervisor")
workflow.add_edge("extractor", "supervisor")
workflow.add_edge("analyzer", "supervisor")
workflow.add_edge("critic", "supervisor")
workflow.add_edge("summarizer", "supervisor")
# 5. Finalize → END
workflow.add_edge("finalize", END)
return workflow.compile(checkpointer=MemorySaver())
|
流式输出处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| async def stream_graph_events(graph, input_state, config) -> AsyncGenerator:
async for event in graph.astream_events(input_state, config=config, version="v2"):
kind = event.get("event")
node_name = event.get("metadata", {}).get("langgraph_node", "")
if kind == "on_chain_start" and node_name not in skip_nodes:
yield {"type": "agent_start", "agent": node_name}
elif kind == "on_chat_model_stream":
chunk = event.get("data", {}).get("chunk")
if chunk and chunk.content:
yield {"type": "agent_output_chunk", "agent": node_name, "content": chunk.content}
elif kind == "on_chain_end":
yield {"type": "agent_end", "agent": node_name}
|
主执行器封装
1
2
3
4
5
6
7
8
9
10
11
| class PaperReaderMultiAgent:
def __init__(self):
self.graph = build_graph()
async def run_stream_async(self, paper_images, paper_name, conversation_id):
initial_state = create_initial_state(paper_images, paper_name, conversation_id)
yield {"type": "reading_start", "paper_name": paper_name}
async for event in stream_graph_events(self.graph, initial_state):
yield event
|
核心API设计
整个后端的核心链路:FastAPI 路由 → SSE 响应 → 异步迭代 Multi-Agent → 流式输出事件。
由于paper功能需要支持read和chat两部分功能,同时read部分需要支持断点继续执行功能,API设计如下:
1
2
3
| POST /paper/read/stream
POST /paper/chat/stream
POST /paper/read/continue
|
主接口代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| @router.post("/paper/read/stream")
async def paper_read_stream(req: PaperReadRequest, user_id: Optional[str] = Depends(...)):
# 1. 创建会话(可选)
if user_id and not req.conversation_id:
conversation_id = ConversationService.create_conversation(...)
# 2. 获取 Multi-Agent 实例
multi_agent = get_paper_reader_multi_agent()
# 3. 定义 SSE 事件生成器
async def event_generator():
async for event in multi_agent.run_stream_async(req.images, req.paper_name, conversation_id):
yield {"event": "message", "data": json.dumps(event)}
# 保存到数据库(可选)
if event["type"] == "agent_output_end":
MessageService.add_message(...)
# 4. 返回 SSE 响应
return EventSourceResponse(event_generator())
@router.post("/paper/chat/stream")
async def paper_chat_stream(req: PaperChatRequest, ...):
async def event_generator():
async for event in multi_agent.chat_stream_async(
req.question, req.images, req.paper_metadata, req.context
):
yield {"event": "message", "data": json.dumps(event)}
return EventSourceResponse(event_generator())
|