Agent 群聊 LangGraph 回复编排实现复盘
我们这一篇继续看 Agent 群聊回复编排 的升级实现。此前群聊已经支持创建群、邀请多个 Agent、保存消息历史,也能按规则选择 Agent 回复。这一次的变化,是把原本偏规则化的回复流程,升级成基于 LangGraph 的可编排流程。
可以把它理解成四步:先判断用户本轮消息的群聊意图,再选择应该参与回复的 Agent,然后根据场景决定串行或并行生成回复,最后做一次回复质量检查。
实现上我们仍然保持工程上的克制。不是为了复杂而复杂,也不是把所有逻辑都交给模型,更不会让某个 LangGraph 节点失败之后,直接拖垮主聊天流程。
群聊和一对一聊天最大的差异是:一条用户消息不一定应该由所有 Agent 回复。
比如用户说小雨你怎么看,通常应该只让小雨回复;用户说你们分别给我一点建议,就应该让多个 Agent 参与;用户说我今天有点难受,可能只需要最适合情绪陪伴的 Agent 出来接住;如果用户说你们别吵了,这就属于关系修复或冲突降温,回复策略要更谨慎。
如果继续只靠正则判断你们、大家、一起这类关键词,系统会越来越笨重,而且很难表达复杂意图。在这个位置,LangGraph 不是简单地让模型变聪明,而是帮我们把一次群聊回复拆成几个可观察、可替换、可降级的节点。
API 侧主要改动落在这个文件里:
1// apps/api/src/routes/chat/group.route.ts2import { ChatPromptTemplate } from '@langchain/core/prompts'3import { Annotation, END, START, StateGraph } from '@langchain/langgraph'4import { ChatOpenAI } from '@langchain/openai'5import { z } from 'zod'
LangChain 负责模型调用和结构化输出,LangGraph 负责把多个决策步骤串成一张图。
新的群聊发送流程可以理解为下面这张图:
01flowchart TD02A["用户发送群聊消息"] --> B["保存用户消息"]03B --> C["加载群聊成员与最近消息"]04C --> D["加载每个 Agent 的长期记忆"]05D --> E["LangGraph: 判断用户意图"]06E --> F["LangGraph: 选择参与 Agent"]07F --> G{"回复模式"}08G -->|single| H["单 Agent 生成回复"]09G -->|multi_serial| I["多个 Agent 串行生成回复"]10G -->|multi_parallel| J["多个 Agent 并行生成回复"]11H --> K["LangGraph: 回复质量检查"]12I --> K13J --> K14K --> L["保存 Agent 回复"]15L --> M["更新群聊摘要与最近消息"]
这张图里最需要注意的是顺序。用户消息会先落库,然后再进行 Agent 回复编排;如果 LangGraph 编排失败,系统会回退到旧规则,不会让用户完全聊不了。
LangGraph 中每个节点都围绕同一个状态对象读写。状态设计得越清晰,后续扩展越轻松。
实现中定义了 GroupChatOrchestrationState:
01const GroupChatOrchestrationState = Annotation.Root({02providerConfig: Annotation<ChatProviderConfig>(),03groupChat: Annotation<AgentGroupChatRecord>(),04agents: Annotation<AgentGroupChatAgentRecord[]>(),05recentMessages: Annotation<AgentGroupChatMessageRecord[]>(),06userMessage: Annotation<AgentGroupChatMessageRecord>(),07userText: Annotation<string>(),08agentMemoriesByAgentId: Annotation<Record<string, AgentMemoryForPrompt[]>>(),09intent: Annotation<GroupChatIntent | null>(),10selection: Annotation<GroupChatAgentSelection | null>(),11selectedAgents: Annotation<AgentGroupChatAgentRecord[]>(),12replies: Annotation<PlannedAgentReply[]>(),13quality: Annotation<GroupChatReplyQuality | null>(),14signal: Annotation<AbortSignal>(),15})
这份状态里既有输入上下文,也有每个节点的产物。providerConfig 表示用户当前选择的 LLM 配置,groupChat 表示群聊本身的信息,agents 是当前群里的 Agent 成员,recentMessages 是最近聊天历史,agentMemoriesByAgentId 保存每个 Agent 的长期记忆。后面的 intent、selection、selectedAgents、replies、quality,分别对应意图判断、Agent 选择、实际参与回复的 Agent 列表、生成后的回复和质量检查结果。
我们没有把所有东西都塞进 prompt,而是先把工程状态拆清楚,再由不同节点按需使用。
第一步是判断用户这句话在群聊中的意图。
结构化结果这样定义:
01const GroupChatIntentSchema = z.object({02intent: z.enum([03'direct_mention',04'group_opinion',05'emotional_support',06'planning',07'roleplay',08'casual_chat',09'conflict_repair',10'memory_or_preference',11'unknown',12]),13targetAgentNames: z.array(z.string().trim().min(1).max(120)).max(6),14shouldUseMultipleAgents: z.boolean(),15replyMode: z.enum(['single', 'multi_serial', 'multi_parallel']),16confidence: z.number().min(0).max(1),17reason: z.string().trim().max(500),18})
意图节点并不生成聊天内容,只负责判断用户有没有点名某个 Agent,用户是不是在问大家意见,这一轮是否需要多个 Agent 参与,以及更适合单人、串行多人,还是并行多人。
对应的提示词是 groupChatIntentPrompt。它会给模型群聊名称、群聊摘要、Agent 名单、最近聊天和用户本轮消息,然后要求模型只输出结构化结果。
为了避免模型把普通闲聊错误放大成多人回复,代码里还有一层归一化:
01function normalizeGroupChatIntent(intent: GroupChatIntent, userText: string): GroupChatIntent {02const text = userText.trim()03const shouldUseMultipleAgents =04intent.shouldUseMultipleAgents ||05intent.targetAgentNames.length > 1 ||06/(你们|大家|一起|分别|都说|怎么看|意见)/.test(text)07const replyMode = shouldUseMultipleAgents08? intent.replyMode === 'multi_parallel' ? 'multi_parallel' : 'multi_serial'09: 'single'1011return GroupChatIntentSchema.parse({12...intent,13targetAgentNames: dedupeStrings(intent.targetAgentNames).slice(0, 6),14shouldUseMultipleAgents,15replyMode,16confidence: Math.min(1, Math.max(0, intent.confidence)),17reason: intent.reason.trim() || '根据用户本轮消息进行群聊意图判断。',18})19}
这个函数的作用,是把 LLM 的输出重新拉回产品规则里。只有出现明确多人表达时,才允许多人回复;多人回复默认走串行,除非模型明确选择并行;置信度会被限制在 0-1 之间;Agent 名称也会做去重,避免重复选择。
意图判断之后,第二步是选择由哪些 Agent 回复。
选择结果结构是这样:
1const GroupChatAgentSelectionSchema = z.object({2selectedAgentIds: z.array(z.string().trim().min(1)).min(1).max(groupReplyAgentLimit),3mode: z.enum(['single', 'multi_serial', 'multi_parallel']),4reason: z.string().trim().max(500),5})
这里要先固定一个限制:
1const groupReplyAgentLimit = 3
即使群聊里有 6 个 Agent,一轮最多也只允许 3 个 Agent 回复。AI 电子伴侣产品里,群聊要有陪伴感,但不能刷屏。多人回复应该是有必要才发生,而不是为了热闹一直发生。
Agent 选择节点会读取用户意图、群聊成员列表、每个 Agent 的简介、说明、性格和语气,也会结合最近聊天上下文和用户本轮消息。
选择完成后还会进入 normalizeAgentSelection,校验模型返回的 id 是否真实存在:
1const agentById = new Map(params.agents.map((agent) => [agent.id, agent]))2const selectedAgentIds = dedupeStrings(params.selection.selectedAgentIds)3.filter((agentId) => agentById.has(agentId))4.slice(0, groupReplyAgentLimit)
如果模型返回了不存在的 Agent id,系统不会信任它,而是回退到本地规则:
1const fallbackAgents = selectAgentsForReply({2agents: params.agents,3userText: params.userText,4})
这就是使用 LLM 做调度时必须保留的一层工程护栏:模型可以参与判断,但不能直接突破系统边界。
回复生成节点根据 selection.mode 决定生成方式。
如果模式是 multi_parallel,说明多个 Agent 可以互相独立地给出意见。比如用户问:
你们分别推荐一个周末放松方式。
这种情况下每个 Agent 不需要知道另一个 Agent 刚刚说了什么,可以并行生成,提高响应速度:
01if (selection.mode === 'multi_parallel') {02const parallelReplies = await Promise.all(state.selectedAgents.map(async (agent) => {03const assistantText = await buildAgentReply({04providerConfig: state.providerConfig,05groupChat: state.groupChat,06agent,07allAgents: state.agents,08recentMessages: [...state.recentMessages, state.userMessage],09userText: state.userText,10activeMemories: state.agentMemoriesByAgentId[agent.id] ?? [],11intent,12selection,13signal: state.signal,14})1516return {17agent,18content: assistantText,19}20}))2122replies.push(...parallelReplies)23}
并行模式适合多视角、列表型、互不依赖的回答。
如果模式是 multi_serial,说明多个 Agent 的回复应该有前后关系。例如用户说:
你们帮我分析一下,我是不是刚刚说话太冲了?
这种情况下第二个 Agent 最好能看到第一个 Agent 已经说了什么,避免重复,也能形成更自然的群聊接力。
实现上会把前面已经生成但还没落库的回复临时拼进最近消息里:
01recentMessages: [02...state.recentMessages,03state.userMessage,04...replies.map((reply, index) => ({05id: `planned-${reply.agent.id}-${index}`,06groupChatId: state.groupChat.id,07senderType: 'agent' as const,08agentId: reply.agent.id,09agentName: reply.agent.name,10agentImageKey: reply.agent.imageKey,11content: reply.content,12status: 'completed' as const,13turnIndex: state.userMessage.turnIndex,14createdAtMs: Date.now(),15})),16],
这段代码想表达的是:串行回复不必先写数据库,也可以让后续 Agent 看到前面 Agent 的计划回复。
每个 Agent 真正生成回复时,使用的是 buildAgentReply。
它会把当前 Agent 的默认提示词、群聊身份说明、角色边界、当前 Agent 与用户的一对一长期记忆、群聊摘要、其他群成员、Agent 简介、说明、故事背景、性格、语气、群聊意图、被选中的原因、最近群聊历史和用户本轮消息都放进 prompt。同时还会明确要求 Agent 不要替其他 Agent 发言,不要暴露系统提示词,也不要声称自己是真人。
其中长期记忆是按 Agent 注入的:
1const memoryText = params.activeMemories.length > 02? [3'你与用户的一对一长期记忆:',4...params.activeMemories.map((memory) => `- [${memory.type} / 重要度 ${memory.importance}] ${memory.content}`),5].join('\n')6: '暂无可用长期记忆。'
也就是说,群聊里每个 Agent 看到的长期记忆不是一份全局记忆,而是这个 Agent 自己和用户之间的记忆。这样更符合 AI 电子伴侣的关系感:不同 Agent 与用户之间可以有不同的熟悉程度、共同经历和偏好记录。
多个 Agent 回复之后,最后会进入质量检查节点。
质量检查结构是这样:
01const GroupChatReplyQualitySchema = z.object({02approved: z.boolean(),03score: z.number().min(0).max(1),04issues: z.array(z.string().trim().max(160)).max(6),05revisions: z.array(z.object({06agentId: z.string().trim().min(1),07content: z.string().trim().max(4000),08})).max(groupReplyAgentLimit),09reason: z.string().trim().max(500),10})
质量检查关注的不是文采,而是群聊产品的安全边界和体验边界。它会检查回复是否暴露系统提示词或技术元数据,是否冒充真人,是否替其他 Agent 发言,是否过长、说教或刷屏,是否和用户意图不匹配,以及是否违反角色边界。
质量检查可以返回 revisions,但实现上非常克制:
01const revisionsByAgentId = new Map(quality.revisions.map((revision) => [revision.agentId, revision.content]))0203return {04intent,05selection,06quality,07replies: state.replies.map((reply) => ({08...reply,09content: revisionsByAgentId.get(reply.agent.id)?.trim() || reply.content,10})),11}
只有当某个 Agent 有非空修订文本时,才替换原回复。否则保留原始回复,避免质量检查模型过度干预。
本项目支持用户在 Web 侧配置自己的三方 LLM,因此不能假设所有供应商都完美支持同一种结构化输出方式。
模型创建逻辑是这样:
01function buildLangChainChatModel(providerConfig: ChatProviderConfig) {02return new ChatOpenAI({03model: providerConfig.model,04apiKey: providerConfig.apiKey,05temperature: 0,06useResponsesApi: providerConfig.wireApi === 'responses',07configuration: {08baseURL: providerConfig.baseURL.replace(/\/$/, ''),09},10...(providerConfig.reasoningEffort ? { reasoning: { effort: providerConfig.reasoningEffort } } : {}),11...(providerConfig.wireApi === 'responses' ? { zdrEnabled: true } : {}),12})13}
结构化输出方式会按协议选择不同优先级:
1function getStructuredOutputMethods(providerConfig: ChatProviderConfig) {2return providerConfig.wireApi === 'responses'3? ['jsonSchema', 'functionCalling', 'jsonMode'] as const4: ['functionCalling', 'jsonSchema', 'jsonMode'] as const5}
也就是说,Responses API 会优先尝试 jsonSchema,Chat Completions 会优先尝试 functionCalling。如果失败,再依次尝试其他方式。
这对三方中转 API 很重要,因为不同中转对 Responses、function calling、JSON schema 的兼容程度不一样。
最后的图结构很清楚:
01const groupChatOrchestrationGraph = new StateGraph(GroupChatOrchestrationState)02.addNode('classifyIntent', classifyGroupIntentNode)03.addNode('selectAgents', selectGroupAgentsNode)04.addNode('generateReplies', generateGroupRepliesNode)05.addNode('checkQuality', checkGroupReplyQualityNode)06.addEdge(START, 'classifyIntent')07.addEdge('classifyIntent', 'selectAgents')08.addEdge('selectAgents', 'generateReplies')09.addEdge('generateReplies', 'checkQuality')10.addEdge('checkQuality', END)11.compile()
现在这张图还是线性的,但已经具备后面继续扩展的空间。我们可以在 classifyIntent 之前加入安全边界节点,在 selectAgents 之后加入成本控制节点,在 generateReplies 后加入情绪一致性检查,也可以把 multi_parallel 拆成真正的分支图。
这一版没有把图做得过度复杂,是为了让逻辑足够可维护。
群聊发送接口仍然是:
1groupChatRoute.post('/send', ...)
接口流程可以这样理解:先校验登录态,读取用户选择的 LLM 配置,查询群聊和成员,然后保存用户消息,加载每个 Agent 的长期记忆。等这些上下文准备好之后,再调用 LangGraph 编排,保存 Agent 回复,最后更新群聊摘要、消息数量和最近消息时间。
加载长期记忆时,代码会这样处理:
01const agentMemoriesEntries = await Promise.all(agents.map(async (agent) => {02const activeMemories = await listActiveAgentMemories({03db,04userId: claims.sub,05agentId: agent.id,06limit: 6,07})0809return [agent.id, activeMemories] as const10}))1112const agentMemoriesByAgentId = Object.fromEntries(agentMemoriesEntries)
之后调用编排函数:
01const orchestration = await orchestrateGroupChatReplies({02providerConfig,03groupChat,04agents,05recentMessages,06userMessage,07userText,08agentMemoriesByAgentId,09signal: c.req.raw.signal,10})
最后每条 Agent 回复都会保存到 agent_group_chat_messages,并写入编排元数据:
01metadataJson: JSON.stringify({02source: 'group_chat_agent',03selectedBy: 'langgraph_v1',04model: providerConfig.model,05wireApi: providerConfig.wireApi,06orchestration: {07intent: orchestration.intent,08selection: orchestration.selection,09quality: orchestration.quality,10},11})
这份 metadata 后面会很有用。我们可以用它排查这一轮为什么选择了这些 Agent、模型判断出的用户意图是什么、质量检查有没有发现问题,以及当前使用的是哪个模型和协议。
这次实现里很重要的一个工程细节,就是降级。
LangGraph 不应该成为聊天链路的单点风险。任何节点失败,都应该尽量回到可用状态。
意图判断失败时:
1return buildFallbackGroupChatIntent({2agents: params.agents,3userText: params.userText,4reason: 'LangGraph 意图判断失败,已使用本地规则回退。',5})
Agent 选择失败时:
01return normalizeAgentSelection({02selection: {03selectedAgentIds: selectAgentsForReply({04agents: params.agents,05userText: params.userText,06}).map((agent) => agent.id),07mode: params.intent.shouldUseMultipleAgents ? 'multi_serial' : 'single',08reason: 'LangGraph Agent 选择失败,已使用本地规则回退。',09},10agents: params.agents,11intent: params.intent,12userText: params.userText,13})
整个图失败时:
1catch (error) {2console.warn('LangGraph group chat orchestration failed', error)3const intent = buildFallbackGroupChatIntent(...)4const selectedAgents = selectAgentsForReply(...)5// 继续生成回复6}
这保证了一个原则:LangGraph 用来增强体验,而不是让基础聊天功能变脆。
旧的 selectAgentsForReply 没有删除,而是变成 fallback:
01function selectAgentsForReply(params: {02agents: AgentGroupChatAgentRecord[]03userText: string04}) {05const normalized = params.userText.toLowerCase()06const mentionedAgents = params.agents.filter((agent) => normalized.includes(agent.name.toLowerCase()))0708if (mentionedAgents.length > 0) {09return mentionedAgents.slice(0, groupReplyAgentLimit)10}1112if (/(你们|大家|一起|分别|都说|怎么看|意见)/.test(params.userText)) {13return params.agents.slice(0, Math.min(groupReplyAgentLimit, params.agents.length))14}1516return params.agents.slice(0, 1)17}
这段规则不复杂,但它稳定、可预测、低成本。保留它以后,LLM 结构化输出失败时,系统仍然能工作;开发和调试时,也可以快速判断问题来自模型还是业务逻辑。后面如果要做 A/B 测试,还可以对比规则选择和 LangGraph 选择的差异。
这次升级还不是完整的多智能体系统,仍然有一些明确边界。LangGraph 图还是线性的,没有真正使用复杂条件边;群聊回复仍然是一次请求内完成,没有流式返回多个 Agent 的增量内容;质量检查只做轻量修订,不做多轮重写;长期记忆按 Agent 注入,但还没有做群聊级长期记忆;Agent 选择依赖当前群成员和最近消息,也没有额外做向量检索。
这些边界是有意保留的。当前要解决的问题,是让群聊回复从关键词规则升级到可解释的图编排,而不是一步到位做成过度复杂的 Agent 平台。
这个结构后面可以自然扩展。比如增加群聊级记忆,让群本身也记住共同经历;把安全边界判断接到 LangGraph 的第一个节点;给 selectAgents 节点加入成本预算,限制高价模型调用次数;把多 Agent 并行回复改成真正的 LangGraph 分支;支持某个 Agent 对另一个 Agent 的回复进行追问或补充;给 metadata 做后台分析页,观察 Agent 选择和质量检查效果;也可以为不同关系阶段设置不同的群聊参与策略。
这次实现的关键不是用了 LangGraph 本身,而是把群聊回复拆成了一组更符合产品逻辑的环节。意图判断负责回答用户到底想要什么,Agent 选择负责回答谁最适合回应,串行和并行生成负责处理多人回复如何自然出现,质量检查负责判断回复是否稳、准、不过度,fallback 则保证模型不稳定时系统仍可用。
对于 AI 电子伴侣这样的产品,群聊不应该只是多个模型轮流说话。更合理的方向是:每个 Agent 都有自己的关系记忆、角色边界和发言时机,系统在背后负责调度,让用户感受到的是自然的多人陪伴,而不是一堆机器人同时抢答。