基于 LangGraph 的高并发 Multi-Agent 流式系统设计

同步阻塞式流式响应

Web框架使用Flask,在run_stream()函数中手写for循环驱动agent,等待LLM返回token,拿到token即yield,发给前端。 业务逻辑严格依赖既定的pipeline顺序,不涉及高并发,不需要复杂的并行agent执行。

使用 Server-Sent Events (SSE) 实现的流式 API 接口

SSE通信机制

一种服务器向客户端单向推送实时数据的技术,WebSocket是双向的。

1
2
客户端 ----(HTTP请求)----> 服务器
客户端 <---(持续推送数据)-- 服务器

SSE数据格式

1
2
3
4
5
6
# 注意必须是两个换行符
data: {"type": "agent_output_chunk", "content": "分析中..."}\n\n
# 比如以下消息
data: {"type": "agent_start", "agent": "Researcher"}\n\n
data: {"type": "agent_output_chunk", "content": "正在分析..."}\n\n
data: {"type": "done"}\n\n

API端分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@api_bp.route('/agent/research/stream', methods=['POST'])
def multi_agent_research_stream():
    # 1. 接收研究主题
    data = request.get_json()
    message = data.get('message', '')  # 如分析人工智能在医疗领域的应用
    
    # 2. 验证输入
    if not message:
        return jsonify({'success': False, 'msg': '研究主题不能为空'}), 400
    
    # 3. 创建生成器函数
    def generate():
        # 获取多智能体系统实例
        multi_agent = get_research_multi_agent()
        
        # 执行流式处理,逐个yield事件
        for event in multi_agent.run_stream(message):
            # 将事件转换为SSE格式
            yield f"data: {json_module.dumps(event, ensure_ascii=False)}\n\n"
        
        # 发送完成标记
        yield f"data: {json_module.dumps({'type': 'done'})}\n\n"
    
    # 4. 返回SSE响应
    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',      # 禁用缓存
            'Connection': 'keep-alive',       # 保持连接
            'X-Accel-Buffering': 'no'       # 禁用Nginx缓冲
        }
    )

Multi-Agent执行流程

pipeline模式,定义好了agent执行顺序。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def run_stream(self, user_input: str):
    # 1. 工作流开始
    yield {
        "type": "workflow_start",
        "workflow": "academic_research",
        "agents": ["Supervisor", "Researcher", "Critic", "Ideator", "Writer"]
    }
    
    # 2. 定义Agent执行顺序
    agent_sequence = [
        ("supervisor", self.supervisor, "任务理解与规划"),    # 理解用户需求
        ("researcher", self.researcher, "文献调研"),          # 查找相关文献
        ("critic", self.critic, "批判性分析"),               # 分析现有研究
        ("ideator", self.ideator, "创意生成"),              # 提出新想法
        ("writer", self.writer, "研究报告撰写")              # 撰写最终报告
    ]
    
    # 3. 依次执行每个Agent
    for agent_key, agent, description in agent_sequence:
        # 通知Agent开始
        yield {"type": "agent_start", "agent": agent.name, "description": description}
        
        # 执行Agent并流式输出
        for event in agent.run_stream(state):
            yield event  # 包含 agent_output_chunk 等事件
            
            # 如果Agent完成,更新状态
            if event.get("type") == "agent_complete":
                result = event.get("result", {})
                state = {**state, **result}  # 合并结果到状态
        
        # 通知Agent结束
        yield {"type": "agent_end", "agent": agent.name}
    
    # 4. 工作流完成
    yield {
        "type": "workflow_complete",
        "final_output": state.get("final_output"),  # 最终研究报告
        "elapsed_time": elapsed_time
    }

子agent服务端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 发送事件,表示agent开始输出
yield {"type": "agent_output_start", "agent": self.name}  
full_response = ""  
for chunk in self._invoke_llm_stream(self.get_system_prompt(), user_prompt):  
    full_response += chunk  
    # 每收到一个文本块,立即发送给客户端,同时累积完整响应
    yield {"type": "agent_output_chunk", "agent": self.name, "content": chunk}  
yield {"type": "agent_output_end", "agent": self.name}  
  
yield {  
    "type": "agent_complete",  
    "agent": self.name,  
    "result": {  
        "critique_results": {  
            "topic": research_topic,  
            "analysis": full_response  
        }  
    }  
}

pipeline设计模式的通信流程

1
2
3
4
5
6
7
8
  第1层前端  FlaskHTTP
  # 前端发起请求
  fetch('/api/paper/read/stream', { method: 'POST', body: {...} })

  # Flask 接收
  @api_bp.route('/paper/read/stream', methods=['POST'])
  def paper_read_stream():
      ...
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
第2层Flask  LangGraph函数调用
  # Flask 路由中调用 LangGraph
  def paper_read_stream():
      multi_agent = get_paper_reader_multi_agent()

      def generate():
          # 这里调用 LangGraph
          for event in multi_agent.run_stream(images, paper_name):
              yield f"data: {json.dumps(event)}\n\n"

      return Response(generate(), mimetype='text/event-stream')
1
2
3
4
5
6
第3层LangGraph 内部Agent 执行
  #(同步 for 循环)
  def run_stream(self, images, paper_name):
      for agent in [planner, extractor, analyzer, critic, summarizer]:
          for token in agent.run_stream(state):
              yield token  # 一个个 token 往外吐

异步高并发

想要实现真正的异步高并发Multi-Agent架构,supervisor节点可以动态路由,worker节点可以并行执行,支持自动合并多个worker的处理结果。 可以充分利用LangGraph的能力(Send、Annotated、astream_events),或者把 LangGraph 当成状态容器然后自己写调度逻辑。

LangGraph内置能力

图拓扑管理

每一条边自定义任务逻辑,通过Send API,根据Supervisor的决策,动态生成需要执行的节点。

状态管理

Supervisor和所有的Worker节点通过共享的状态字典State交换数据;在LangGraph中,可以通过 TypedDict 和 Python 的 operator.add,用自动合并函数处理多个Worker的并行结果。

1
2
3
4
5
class ResearchState(TypedDict):
    research_outputs: Annotated[List[WorkerOutput], operator.add] # 自动合并
    messages: Annotated[List[str], operator.add]  # 对话历史累积

builder.set_state(ResearchState) # 定义共享状态

利用Send API自动实现并行任务调度

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 V1 事件格式当前

  // SSE event 名称统一是 "message"通过 data.type 区分
  event: message
  data: {"type": "conversation_id", "conversation_id": "xxx"}

  event: message
  data: {"type": "reading_start", "paper_name": "xxx", "agents": [...]}

  event: message
  data: {"type": "agent_start", "agent": "Planner", "description": "..."}

  event: message
  data: {"type": "agent_output_chunk", "agent": "Planner", "content": "方"}

  event: message
  data: {"type": "agent_output_end", "agent": "Planner"}

  event: message
  data: {"type": "agent_complete", "agent": "Planner", "result": {...}}

  event: message
  data: {"type": "reading_complete", "final_output": "...", "elapsed_time": 10.5}

  event: message
  data: {"type": "done"}
1
from langgraph.constants import Send
1
2
3
4
5
# 返回单个 Send = 顺序执行
return Send("extractor", state)

# 返回 Send 列表 = 并行执行
return [Send("analyzer", state), Send("critic", state)]

并行执行的完整流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# graph.py
def parallel_router(state: PaperReaderState) -> List[Send] | Literal["finalize"]:
  steps = state.get("next_steps", [])  # 来自Supervisor,如analyzer,critic

  if not steps or "FINISH" in steps:
	  return "finalize"

  routes = []
  for step in steps:
	  routes.append(Send(step, state))  # 每个 step 创建一个 Send

  # 返回 [Send("analyzer", state), Send("critic", state)]
  # LangGraph 引擎会并行调度这两个节点
  return routes

LangGraph原生 astream_events 实现动态路由和Token级流式

一般这样拿到结果result = await graph.ainvoke(state)

astream_events可以获取每个token、每个节点的变化 async for event in graph.astream_events(state, version="v2"):

处理三类事件:节点开始、Token流、节点结束,然后yield各种类型的输出。

核心步骤

创建共享状态 state.py

Annotated写法对三个字段messages、messages、completed_agents的类型提示。 Annotated允许在不改变变量类型的前提下,给这个类型贴上标签或附加元数据,T是原本的类型,x可以是字符串、函数、类的实例等。对于Langraph框架,会检查第二个参数operator.add,做自己的更新处理。

operator.add在List上的作用就是拼接,避免覆盖了原本的输出,实现状态更新。

1
2
from typing import Annotated
Annotated[T, x]
1
2
3
4
5
# 使用 Annotated + operator.add 实现自动累加合并
class PaperReaderState(TypedDict, total=False):
	messages: Annotated[List[BaseMessage], operator.add]
	worker_outputs: Annotated[List[WorkerOutput], operator.add]
	completed_agents: Annotated[List[str], operator.add]

创建Woker

每一个worker都是一个异步函数,返回状态更新:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def analyzer_node(state: PaperReaderState, config: RunnableConfig = None) -> Dict[str, Any]:
      images = state.get("paper_images", [])
      metadata = state.get("paper_metadata", {})

      llm = get_llm(streaming=True)
      messages = build_messages(
          system_prompt="你是方法论分析专家...",
          user_prompt="请分析这篇论文的方法...",
          images=images
      )

      full_response = ""
      async for chunk in llm.astream(messages, config=config):
          if chunk.content:
              full_response += chunk.content

      # 返回状态更新(会自动合并到全局状态)
      return {
          "methodology": {"raw_content": full_response},
          "worker_outputs": [{"source": "analyzer", "content": full_response, "metadata": {}}],
          "completed_agents": ["analyzer"]
      }

创建Supervisor,实现动态路由

指定supervisor_node输出两个字段,next_steps和iteration。 用pydantic.BaseModel,定义输出格式是RouteDecision。 with_structured_output(RouteDecision) 是LangChain的一个方法,在底层将RouteDecision转化成json,调用OpenAI的Function Calling能力,强制模型按照这个Schema生成数据。 这样返回的decision不再是一段文本,而是一个对象,包含了next_steps字段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pydantic import BaseModel, Field

class RouteDecision(BaseModel):
  next_workers: List[str] = Field(description="下一步执行的 Agent 列表")
  reasoning: str = Field(description="决策理由")

class SupervisorAgent:
  def __init__(self):
	  self.llm = ChatOpenAI(...).with_structured_output(RouteDecision)

  async def decide(self, state: PaperReaderState) -> RouteDecision:
	  # 构建状态摘要
	  state_summary = self._build_state_summary(state)

	  # LLM 决策
	  decision = await self.llm.ainvoke({
		  "workers_description": "...",
		  "state_summary": state_summary
	  })
	  return decision

async def supervisor_node(state: PaperReaderState) -> Dict[str, Any]:
  supervisor = get_supervisor()
  decision = await supervisor.decide(state)
  return {
	  "next_steps": decision.next_workers,  # 如 ["analyzer", "critic"] 或 ["FINISH"]
	  "iteration": state.get("iteration", 0) + 1
  }

构建图 graph.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from langgraph.graph import StateGraph, END
from langgraph.constants import Send

def parallel_router(state: PaperReaderState) -> List[Send] | str:
  """根据 Supervisor 决策路由,支持并行"""
  steps = state.get("next_steps", [])

  if not steps or "FINISH" in steps:
	  return "finalize"

  # 使用 Send 实现真正的并行执行
  routes = []
  for step in steps:
	  routes.append(Send(step, state))
  return routes

def build_graph():
  workflow = StateGraph(PaperReaderState)

  # 1. 添加节点
  workflow.add_node("supervisor", supervisor_node)
  workflow.add_node("planner", planner_node)
  workflow.add_node("extractor", extractor_node)
  workflow.add_node("analyzer", analyzer_node)
  workflow.add_node("critic", critic_node)
  workflow.add_node("summarizer", summarizer_node)
  workflow.add_node("finalize", finalize_node)

  # 2. 设置入口
  workflow.set_entry_point("supervisor")

  # 3. Supervisor → 条件路由(支持并行)
  workflow.add_conditional_edges(
	  "supervisor",
	  parallel_router,
	  ["planner", "extractor", "analyzer", "critic", "summarizer", "finalize"]
  )

  # 4. Workers → 回到 Supervisor(循环)
  workflow.add_edge("planner", "supervisor")
  workflow.add_edge("extractor", "supervisor")
  workflow.add_edge("analyzer", "supervisor")
  workflow.add_edge("critic", "supervisor")
  workflow.add_edge("summarizer", "supervisor")

  # 5. Finalize → END
  workflow.add_edge("finalize", END)

  return workflow.compile(checkpointer=MemorySaver())

流式输出处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
async def stream_graph_events(graph, input_state, config) -> AsyncGenerator:
  async for event in graph.astream_events(input_state, config=config, version="v2"):
	  kind = event.get("event")
	  node_name = event.get("metadata", {}).get("langgraph_node", "")

	  if kind == "on_chain_start" and node_name not in skip_nodes:
		  yield {"type": "agent_start", "agent": node_name}

	  elif kind == "on_chat_model_stream":
		  chunk = event.get("data", {}).get("chunk")
		  if chunk and chunk.content:
			  yield {"type": "agent_output_chunk", "agent": node_name, "content": chunk.content}

	  elif kind == "on_chain_end":
		  yield {"type": "agent_end", "agent": node_name}

主执行器封装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class PaperReaderMultiAgent:
  def __init__(self):
	  self.graph = build_graph()

  async def run_stream_async(self, paper_images, paper_name, conversation_id):
	  initial_state = create_initial_state(paper_images, paper_name, conversation_id)

	  yield {"type": "reading_start", "paper_name": paper_name}

	  async for event in stream_graph_events(self.graph, initial_state):
		  yield event

核心API设计

整个后端的核心链路:FastAPI 路由 → SSE 响应 → 异步迭代 Multi-Agent → 流式输出事件。

由于paper功能需要支持read和chat两部分功能,同时read部分需要支持断点继续执行功能,API设计如下:

1
2
3
POST /paper/read/stream   
POST /paper/chat/stream       
POST /paper/read/continue 

主接口代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@router.post("/paper/read/stream")
async def paper_read_stream(req: PaperReadRequest, user_id: Optional[str] = Depends(...)):
  # 1. 创建会话(可选)
  if user_id and not req.conversation_id:
	  conversation_id = ConversationService.create_conversation(...)

  # 2. 获取 Multi-Agent 实例
  multi_agent = get_paper_reader_multi_agent()

  # 3. 定义 SSE 事件生成器
  async def event_generator():
	  async for event in multi_agent.run_stream_async(req.images, req.paper_name, conversation_id):
		  yield {"event": "message", "data": json.dumps(event)}

		  # 保存到数据库(可选)
		  if event["type"] == "agent_output_end":
			  MessageService.add_message(...)

  # 4. 返回 SSE 响应
  return EventSourceResponse(event_generator())
  
@router.post("/paper/chat/stream")
async def paper_chat_stream(req: PaperChatRequest, ...):
  async def event_generator():
	  async for event in multi_agent.chat_stream_async(
		  req.question, req.images, req.paper_metadata, req.context
	  ):
		  yield {"event": "message", "data": json.dumps(event)}

  return EventSourceResponse(event_generator())
使用 Hugo 构建
主题 StackJimmy 设计