大多数用例都始于单个智能体(Agent),但随着工具数量的增加,以及所希望智能体解决的问题范围扩大,引入多智能体(Multiagent)模式可以提高整体性能和可靠性。正如我们在软件工程中看到的那样,将所有代码放在单个文件中,或将所有后端服务器捆绑成一个单体应用(Monolith)通常不是个好主意;我们在软件架构和服务设计中学到的许多原则,在构建基于 AI 和基础模型的系统时依然适用。随着你不断向智能体系统(Agentic System)中添加功能和能力,很快就会发现需要将系统拆分为更小的智能体,以便可以独立地验证、测试、集成和复用。在本章中,我们将讨论如何以及何时向系统添加智能体,以及如何组织和管理它们。
本书代码请见:https://github.com/alanhou/ai-agent。
我需要多少个智能体?
从简单的方法入手,仅在需要提高性能时才增加复杂性。智能体的适当数量和组织方式会根据任务的难度、工具的数量以及环境的复杂性而发生巨大变化。
单智能体场景
我们将从单智能体系统开始,这类系统适用于难度适中、工具数量有限且环境复杂性较低的任务。对于重视延迟的场景,通常也是更好的选择,因为多智能体系统通常需要智能体之间进行多次交互,这会增加用户的等待时间。因此,通常的最佳实践是从单智能体系统开始,因为它比扩展到多智能体系统更快且更便宜。在这种方法中,单个智能体负责在响应用户之前调用工具(如果有的话),直到达到某个限制。在此过程中,智能体执行任务并选择何时调用工具或提交答案。主要好处包括:
-
简单性
更容易实施和管理。
-
较低的资源需求
更少的计算开销。
-
低延迟
对用户的响应更快。
单智能体系统为构建智能体应用提供了一个强大的起点。其简单性、低成本和低延迟使其非常适合许多实际场景——尤其是在任务范围有限且性能要求严格时。虽然它可能无法很好地扩展来应对高度复杂或多层面的任务,但从单智能体架构开始,团队可以快速验证核心功能并有效迭代。只有在复杂性、工具集或任务协调需求超出单个智能体的能力时,开发人员才应考虑过渡到更复杂的多智能体系统。
为了论证这一点,考虑一个用于供应链物流管理的单智能体系统。该智能体在一个统一的提示词(Prompt)和图(Graph)中处理用于库存、运输和供应商任务的广泛工具集。虽然对于基本查询有效,但随着工具增多,性能会下降,因为智能体必须从大量工具中进行选择。以下是我们如何设置一个拥有 16 个工具的单智能体:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
from __future__ import annotations import os import json import operator import builtins from typing import Annotated, Sequence, TypedDict, Optional from langchain_openai.chat_models import ChatOpenAI from langchain.schema import AIMessage, BaseMessage, HumanMessage, SystemMessage from langchain_core.messages.tool import ToolMessage from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain.tools import tool from langgraph.graph import StateGraph, END from traceloop.sdk import Traceloop from src.common.observability.loki_logger import log_to_loki os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317" os.environ["OTEL_EXPORTER_OTLP_INSECURE"] = "true" @tool def manage_inventory(sku: str = None, **kwargs) -> str: """管理库存水平、库存补货、审计和优化策略。""" print(f"[TOOL] manage_inventory(sku={sku}, kwargs={kwargs})") log_to_loki("tool.manage_inventory", f"sku={sku}") return "inventory_management_initiated" @tool def track_shipments(origin: str = None, **kwargs) -> str: """跟踪货物状态、延误并协调交付物流。""" print(f"[TOOL] track_shipments(origin={origin}, kwargs={kwargs})") log_to_loki("tool.track_shipments", f"origin={origin}") return "shipment_tracking_updated" @tool def evaluate_suppliers(supplier_name: str = None, **kwargs) -> str: """评估供应商绩效、进行审计并管理供应商关系。""" print(f"[TOOL] evaluate_suppliers(supplier_name={supplier_name}, kwargs={kwargs})") log_to_loki("tool.evaluate_suppliers", f"supplier_name={supplier_name}") return "supplier_evaluation_complete" @tool def optimize_warehouse(operation_type: str = None, **kwargs) -> str: """优化仓库运营、布局、容量和存储效率。""" print(f"[TOOL] optimize_warehouse(operation_type={operation_type}, kwargs={kwargs})") log_to_loki("tool.optimize_warehouse", f"operation_type={operation_type}") return "warehouse_optimization_initiated" @tool def forecast_demand(season: str = None, **kwargs) -> str: """分析需求模式、季节性趋势并创建预测模型。""" print(f"[TOOL] forecast_demand(season={season}, kwargs={kwargs})") log_to_loki("tool.forecast_demand", f"season={season}") return "demand_forecast_generated" @tool def manage_quality(supplier: str = None, **kwargs) -> str: """管理质量控制、缺陷跟踪和供应商质量标准。""" print(f"[TOOL] manage_quality(supplier={supplier}, kwargs={kwargs})") log_to_loki("tool.manage_quality", f"supplier={supplier}") return "quality_management_initiated" @tool def arrange_shipping(shipping_type: str = None, **kwargs) -> str: """安排运输方式、加急交付和多式联运。""" print(f"[TOOL] arrange_shipping(shipping_type={shipping_type}, kwargs={kwargs})") log_to_loki("tool.arrange_shipping", f"shipping_type={shipping_type}") return "shipping_arranged" @tool def coordinate_operations(operation_type: str = None, **kwargs) -> str: """协调复杂操作,如越库配送、集货和转运。""" print(f"[TOOL] coordinate_operations(operation_type={operation_type}, kwargs={kwargs})") log_to_loki("tool.coordinate_operations", f"operation_type={operation_type}") return "operations_coordinated" @tool def manage_special_handling(product_type: str = None, **kwargs) -> str: """处理危险品、冷链和敏感产品的特殊要求。""" print(f"[TOOL] manage_special_handling(product_type={product_type}, kwargs={kwargs})") log_to_loki("tool.manage_special_handling", f"product_type={product_type}") return "special_handling_managed" @tool def handle_compliance(compliance_type: str = None, **kwargs) -> str: """管理监管合规、海关、文件和认证。""" print(f"[TOOL] handle_compliance(compliance_type={compliance_type}, kwargs={kwargs})") log_to_loki("tool.handle_compliance", f"compliance_type={compliance_type}") return "compliance_handled" @tool def process_returns(returned_quantity: str = None, **kwargs) -> str: """处理退货、逆向物流和产品处置。""" print(f"[TOOL] process_returns(returned_quantity={returned_quantity}, kwargs={kwargs})") log_to_loki("tool.process_returns", f"returned_quantity={returned_quantity}") return "returns_processed" @tool def scale_operations(scaling_type: str = None, **kwargs) -> str: """针对旺季、容量规划和劳动力管理扩展运营。""" print(f"[TOOL] scale_operations(scaling_type={scaling_type}, kwargs={kwargs})") log_to_loki("tool.scale_operations", f"scaling_type={scaling_type}") return "operations_scaled" @tool def optimize_costs(cost_type: str = None, **kwargs) -> str: """分析并优化运输、仓储和运营成本。""" print(f"[TOOL] optimize_costs(cost_type={cost_type}, kwargs={kwargs})") log_to_loki("tool.optimize_costs", f"cost_type={cost_type}") return "cost_optimization_initiated" @tool def optimize_delivery(delivery_type: str = None, **kwargs) -> str: """优化交付路线、最后一英里物流和可持续性计划。""" print(f"[TOOL] optimize_delivery(delivery_type={delivery_type}, kwargs={kwargs})") log_to_loki("tool.optimize_delivery", f"delivery_type={delivery_type}") return "delivery_optimization_complete" @tool def manage_disruption(disruption_type: str = None, **kwargs) -> str: """管理供应链中断、应急计划和风险缓解。""" print(f"[TOOL] manage_disruption(disruption_type={disruption_type}, kwargs={kwargs})") log_to_loki("tool.manage_disruption", f"disruption_type={disruption_type}") return "disruption_managed" @tool def send_logistics_response(operation_id: str = None, message: str = None): """向利益相关者发送物流更新、建议或状态报告。""" print(f"[TOOL] send_logistics_response → {message}") log_to_loki("tool.send_logistics_response", f"operation_id={operation_id}, message={message}") return "logistics_response_sent" TOOLS = [ manage_inventory, track_shipments, evaluate_suppliers, optimize_warehouse, forecast_demand, manage_quality, arrange_shipping, coordinate_operations, manage_special_handling, handle_compliance, process_returns, scale_operations, optimize_costs, optimize_delivery, manage_disruption, send_logistics_response] |
这些工具涵盖了供应链智能体的核心功能,从跟踪货物到预测需求和管理中断。通过在 LangChain 中使用 @tool 装饰器定义它们,我们使智能体能够根据用户的查询动态调用它们。这种设置很简单,不需要复杂的协调——智能体只需分析提示词并选择适当的工具。例如,一个基本的智能体可能会通过按顺序调用 manage_inventory 和 forecast_demand 来处理库存短缺问题,正如我们将在执行流程中看到的那样。
然而,随着工具集的扩展——这里是 16 个——智能体的系统提示词必须描述所有可能性,这可能会导致混淆或次优选择。这就是单智能体模型的局限性开始凸显的地方,也为多智能体分解铺平了道路。现在,我们通过基础模型绑定、状态定义和图构建来完成智能体设置:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
Traceloop.init(disable_batch=True, app_name="supply_chain_logistics_agent") llm = ChatOpenAI(model="gpt-5", temperature=0.0, callbacks=[StreamingStdOutCallbackHandler()], verbose=True).bind_tools(TOOLS) class AgentState(TypedDict): operation: Optional[dict] # 供应链运营信息 messages: Annotated[Sequence[BaseMessage], operator.add] def call_model(state: AgentState): history = state["messages"] # 优雅地处理缺失或不完整的运营数据 operation = state.get("operation", {}) if not operation: operation = {"operation_id": "UNKNOWN", "type": "general", "priority": "medium", "status": "active"} operation_json = json.dumps(operation, ensure_ascii=False) system_prompt = ( "你是一位经验丰富的供应链与物流专业人士。\n" "你的专业知识涵盖:\n" "- 库存管理和需求预测\n" "- 运输和航运优化\n" "- 供应商关系管理和评估\n" "- 仓库运营和容量规划\n" "- 质量控制和合规管理\n" "- 成本优化和运营效率\n" "- 风险管理和中断响应\n" "- 可持续发展和绿色物流倡议\n" "\n" "在管理供应链运营时:\n" " 1) 分析物流挑战或机遇\n" " 2) 调用适当的供应链管理工具\n" " 3) 跟进 send_logistics_response 以提供建议\n" " 4) 考虑成本、效率、质量和可持续性影响\n" " 5) 优先考虑客户满意度和业务连续性\n" "\n" "始终在成本与质量和风险缓解之间取得平衡。\n" f"当前运营数据: {operation_json}" ) full = [SystemMessage(content=system_prompt)] + history first: ToolMessage | BaseMessage = llm.invoke(full) messages = [first] if getattr(first, "tool_calls", None): for tc in first.tool_calls: print(first) print(tc['name']) fn = next(t for t in TOOLS if t.name == tc['name']) out = fn.invoke(tc["args"]) messages.append(ToolMessage(content=str(out), tool_call_id=tc["id"])) second = llm.invoke(full + messages) messages.append(second) return {"messages": messages} def construct_graph(): g = StateGraph(AgentState) g.add_node("assistant", call_model) g.set_entry_point("assistant") return g.compile() graph = construct_graph() if __name__ == "__main__": example = {"operation_id": "OP-12345", "type": "inventory_management", "priority": "high", "location": "Warehouse A"} convo = [HumanMessage(content="We're running critically low on SKU-12345. Current stock is 50 units but we have 200 units on backorder. What's our reorder strategy?")] result = graph.invoke({"operation": example, "messages": convo}) for m in result["messages"]: print(f"{m.type}: {m.content}") |
随着智能体的完全组装,我们看到了单节点 LangGraph 的优雅之处:状态保存运营详情和消息,模型调用分析查询并调用工具,图极其精简——只有一个“助手”节点。这种结构最大限度地减少了开销,确保了低延迟,因为没有智能体间的通信。在实践中,正如 2025 年 LangGraph 供应链智能体教程所展示的那样,这种设置可以在标准硬件上在不到一秒的时间内处理查询,使其非常适合操作仪表板或实时警报。
然而,对于大多数用例来说,关键瓶颈出现在工具和职责数量增加时。当期望智能体从一组工具中选择正确的工具时,随着潜在工具数量的增加,性能会下降。在跳转到多智能体之前,可以考虑在单智能体框架内进行扩展:例如,将多个工具封装成更大的分组(例如,通过分层工具选择),或使用第五章“编排”中描述的基于向量数据库的语义工具选择。如果这些方法仍然不足,将工具分解为具有适当职责的不同智能体可以提高可靠性和性能,尽管这会引入协调开销。
多智能体场景
在多智能体系统中,多个智能体协作以实现共同目标,这种方法在任务复杂、需要多种工具集、并行处理或适应动态环境时特别有利。多智能体系统的一个主要好处是专业化:每个智能体可以被分配特定的角色或专业领域,使系统能够有效地利用每个智能体的优势。这种分工使智能体能够专注于任务的特定方面,从而提高效率并确保专业工具被应用在最需要的地方。通过在智能体之间分配工具和职责,多智能体系统解决了单智能体系统面临的局限性,特别是在任务需要跨不同领域的专业知识或所需工具数量超过单个智能体可以可靠管理的数量时。
我们基于上一节的单智能体供应链示例,将其演变为一个多智能体系统。在这里,我们将 16 个工具分解为三个专业智能体:一个用于库存和仓库管理,一个用于运输和物流,另一个用于供应商关系和合规。一个监督者(Supervisor)智能体将查询路由给合适的专家,体现了管理者协调模式(详见管理者协调一节)。这种设置通过缩小每个智能体的工具集和提示词范围来展示专业化,从而减少选择错误并提高可靠性。代码从导入和共享响应工具开始,确保所有专家可以统一地传达结果。这个共享工具最大限度地减少了重复,同时允许去中心化的执行:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import os import json import operator from typing import Annotated, Sequence, TypedDict, Optional from langchain_openai.chat_models import ChatOpenAI from langchain.schema import AIMessage, BaseMessage, HumanMessage, SystemMessage from langchain_core.messages.tool import ToolMessage from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain.tools import tool from langgraph.graph import StateGraph, END from traceloop.sdk import Traceloop from src.common.observability.loki_logger import log_to_loki os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317" os.environ["OTEL_EXPORTER_OTLP_INSECURE"] = "true" # 所有专家共享的工具 @tool def send_logistics_response(operation_id = None, message = None): """向利益相关者发送物流更新、建议或状态报告。""" print(f"[TOOL] send_logistics_response → {message}") log_to_loki("tool.send_logistics_response", f"operation_id={operation_id}, message={message}") return "logistics_response_sent" # 库存与仓库专家工具 @tool def manage_inventory(sku: str = None, **kwargs) -> str: """管理库存水平、库存补货、审计和优化策略。""" print(f"[TOOL] manage_inventory(sku={sku}, kwargs={kwargs})") log_to_loki("tool.manage_inventory", f"sku={sku}") return "inventory_management_initiated" @tool def optimize_warehouse(operation_type: str = None, **kwargs) -> str: """优化仓库运营、布局、容量和存储效率。""" print(f"[TOOL] optimize_warehouse(operation_type={operation_type}, kwargs={kwargs})") log_to_loki("tool.optimize_warehouse", f"operation_type={operation_type}") return "warehouse_optimization_initiated" @tool def forecast_demand(season: str = None, **kwargs) -> str: """分析需求模式、季节性趋势并创建预测模型。""" print(f"[TOOL] forecast_demand(season={season}, kwargs={kwargs})") log_to_loki("tool.forecast_demand", f"season={season}") return "demand_forecast_generated" @tool def manage_quality(supplier: str = None, **kwargs) -> str: """管理质量控制、缺陷跟踪和供应商质量标准。""" print(f"[TOOL] manage_quality(supplier={supplier}, kwargs={kwargs})") log_to_loki("tool.manage_quality", f"supplier={supplier}") return "quality_management_initiated" @tool def scale_operations(scaling_type: str = None, **kwargs) -> str: """针对旺季、容量规划和劳动力管理扩展运营。""" print(f"[TOOL] scale_operations(scaling_type={scaling_type}, kwargs={kwargs})") log_to_loki("tool.scale_operations", f"scaling_type={scaling_type}") return "operations_scaled" @tool def optimize_costs(cost_type: str = None, **kwargs) -> str: """分析并优化运输、仓储和运营成本。""" print(f"[TOOL] optimize_costs(cost_type={cost_type}, kwargs={kwargs})") log_to_loki("tool.optimize_costs", f"cost_type={cost_type}") return "cost_optimization_initiated" INVENTORY_TOOLS = [manage_inventory, optimize_warehouse, forecast_demand, manage_quality, scale_operations, optimize_costs, send_logistics_response] # 运输与物流专家工具 @tool def track_shipments(origin: str = None, **kwargs) -> str: """跟踪货物状态、延误并协调交付物流。""" print(f"[TOOL] track_shipments(origin={origin}, kwargs={kwargs})") log_to_loki("tool.track_shipments", f"origin={origin}") return "shipment_tracking_updated" @tool def arrange_shipping(shipping_type: str = None, **kwargs) -> str: """安排运输方式、加急交付和多式联运。""" print(f"[TOOL] arrange_shipping(shipping_type={shipping_type}, kwargs={kwargs})") log_to_loki("tool.arrange_shipping", f"shipping_type={shipping_type}") return "shipping_arranged" @tool def coordinate_operations(operation_type: str = None, **kwargs) -> str: """协调复杂操作,如越库配送、集货和转运。""" print(f"[TOOL] coordinate_operations(operation_type={operation_type}, kwargs={kwargs})") log_to_loki("tool.coordinate_operations", f"operation_type={operation_type}") return "operations_coordinated" @tool def manage_special_handling(product_type: str = None, **kwargs) -> str: """处理危险品、冷链和敏感产品的特殊要求。""" print(f"[TOOL] manage_special_handling(product_type={product_type}, kwargs={kwargs})") log_to_loki("tool.manage_special_handling", f"product_type={product_type}") return "special_handling_managed" @tool def process_returns(returned_quantity: str = None, **kwargs) -> str: """处理退货、逆向物流和产品处置。""" print(f"[TOOL] process_returns(returned_quantity={returned_quantity}, kwargs={kwargs})") log_to_loki("tool.process_returns", f"returned_quantity={returned_quantity}") return "returns_processed" @tool def optimize_delivery(delivery_type: str = None, **kwargs) -> str: """优化交付路线、最后一英里物流和可持续性计划。""" print(f"[TOOL] optimize_delivery(delivery_type={delivery_type}, kwargs={kwargs})") log_to_loki("tool.optimize_delivery", f"delivery_type={delivery_type}") return "delivery_optimization_complete" @tool def manage_disruption(disruption_type: str = None, **kwargs) -> str: """管理供应链中断、应急计划和风险缓解。""" print(f"[TOOL] manage_disruption(disruption_type={disruption_type}, kwargs={kwargs})") log_to_loki("tool.manage_disruption", f"disruption_type={disruption_type}") return "disruption_managed" TRANSPORTATION_TOOLS = [track_shipments, arrange_shipping, coordinate_operations, manage_special_handling, process_returns, optimize_delivery, manage_disruption, send_logistics_response] # 供应商与合规专家工具 @tool def evaluate_suppliers(supplier_name: str = None, **kwargs) -> str: """评估供应商绩效、进行审计并管理供应商关系。""" print(f"[TOOL] evaluate_suppliers(supplier_name={supplier_name}, kwargs={kwargs})") log_to_loki("tool.evaluate_suppliers", f"supplier_name={supplier_name}") return "supplier_evaluation_complete" @tool def handle_compliance(compliance_type: str = None, **kwargs) -> str: """管理监管合规、海关、文件和认证。""" print(f"[TOOL] handle_compliance(compliance_type={compliance_type}, kwargs={kwargs})") log_to_loki("tool.handle_compliance", f"compliance_type={compliance_type}") return "compliance_handled" SUPPLIER_TOOLS = [evaluate_suppliers, handle_compliance, send_logistics_response] Traceloop.init(disable_batch=True, app_name="supply_chain_logistics_agent") llm = ChatOpenAI(model="gpt-4o", temperature=0.0, callbacks=[StreamingStdOutCallbackHandler()], verbose=True) # 将工具绑定到专门的 LLM inventory_llm = llm.bind_tools(INVENTORY_TOOLS) transportation_llm = llm.bind_tools(TRANSPORTATION_TOOLS) supplier_llm = llm.bind_tools(SUPPLIER_TOOLS) |
通过将工具分组,我们将它们绑定到每个专家的单独语言模型实例。这允许定制提示词并减少每个智能体的上下文大小,从而提高专注度和效率。像这样的多智能体架构支持并行处理(例如,一个智能体优化交付,而另一个智能体评估供应商),从而减少大批量物流中的响应时间。共享状态确保了无缝交接。
监督者节点充当中央协调员,分析查询并路由给专家——体现了简化的决策制定,无需全员共识的开销。然后,专家节点独立处理,调用工具并响应。这种结构通过明确的角色边界减少了冲突,并在边扩展为并发调用时支持并行性:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
class AgentState(TypedDict): operation: Optional[dict] # 供应链运营信息 messages: Annotated[Sequence[BaseMessage], operator.add] # 监督者(管理者)节点:路由到适当的专家 def supervisor_node(state: AgentState): history = state["messages"] operation = state.get("operation", {}) operation_json = json.dumps(operation, ensure_ascii=False) supervisor_prompt = ( "你是一名协调供应链专家团队的监督者。\n" "团队成员:\n" "- inventory: 处理库存水平、预测、\n" "质量、仓库优化、扩展和成本。\n" "- transportation: 处理运输跟踪、\n" "安排、运营协调、\n" "特殊处理、退货、交付优化和中断。\n" "- supplier: 处理供应商评估和合规性。\n" "\n" "根据用户查询,选择一名团队成员来处理它。\n" "仅输出所选成员的名称\n" "(inventory, transportation, 或 supplier),不要输出其他内容。\n\n" f"当前运营数据: {operation_json}" ) full = [SystemMessage(content=supervisor_prompt)] + history response = llm.invoke(full) return {"messages": [response]} # 专家节点模板 def specialist_node(state: AgentState, specialist_llm, system_prompt: str): history = state["messages"] operation = state.get("operation", {}) if not operation: operation = {"operation_id": "UNKNOWN", "type": "general", "priority": "medium", "status": "active"} operation_json = json.dumps(operation, ensure_ascii=False) full_prompt = system_prompt + f"\n\nOPERATION: {operation_json}" full = [SystemMessage(content=full_prompt)] + history first: ToolMessage | BaseMessage = specialist_llm.invoke(full) messages = [first] if getattr(first, "tool_calls", None): for tc in first.tool_calls: print(first) print(tc['name']) # 查找工具(假设工具名称在所有工具中是唯一的) all_tools = INVENTORY_TOOLS + TRANSPORTATION_TOOLS + SUPPLIER_TOOLS fn = next(t for t in all_tools if t.name == tc['name']) out = fn.invoke(tc["args"]) messages.append(ToolMessage(content=str(out), tool_call_id=tc["id"])) second = specialist_llm.invoke(full + messages) messages.append(second) return {"messages": messages} # 库存专家节点 def inventory_node(state: AgentState): inventory_prompt = ( "你是一名库存和仓库管理专家。\n" "在管理时:\n" " 1) 分析库存/仓库挑战\n" " 2) 调用适当的工具\n" " 3) 跟进 send_logistics_response\n" "考虑成本、效率和可扩展性。" ) return specialist_node(state, inventory_llm, inventory_prompt) # 运输专家节点 def transportation_node(state: AgentState): transportation_prompt = ( "你是一名运输和物流专家。\n" "在管理时:\n" " 1) 分析运输/交付挑战\n" " 2) 调用适当的工具\n" " 3) 跟进 send_logistics_response\n" "考虑效率、可持续性和风险缓解。" ) return specialist_node(state, transportation_llm, transportation_prompt) # 供应商专家节点 def supplier_node(state: AgentState): supplier_prompt = ( "你是一名供应商关系和合规专家。\n" "在管理时:\n" " 1) 分析供应商/合规性问题\n" " 2) 调用适当的工具\n" " 3) 跟进 send_logistics_response\n" "考虑绩效、法规和关系。" ) return specialist_node(state, supplier_llm, supplier_prompt) |
最后,该图通过条件边(Conditional Edges)进行路由组装,实现了系统的适应性,因为监督者会根据查询内容动态选择。在执行中,这使得系统能够高效处理多样化任务,而不会出现单点过载。虽然协调增加了一些延迟,但对于复杂环境而言,可扩展性和可靠性方面的收益远大于此:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# 用于条件边的路由函数 def route_to_specialist(state: AgentState): last_message = state["messages"][-1] agent_name = last_message.content.strip().lower() if agent_name == "inventory": return "inventory" elif agent_name == "transportation": return "transportation" elif agent_name == "supplier": return "supplier" else: # 如果没有匹配则回退 return END def construct_graph(): g = StateGraph(AgentState) g.add_node("supervisor", supervisor_node) g.add_node("inventory", inventory_node) g.add_node("transportation", transportation_node) g.add_node("supplier", supplier_node) g.set_entry_point("supervisor") g.add_conditional_edges("supervisor", route_to_specialist, {"inventory": "inventory", "transportation": "transportation", "supplier": "supplier"}) g.add_edge("inventory", END) g.add_edge("transportation", END) g.add_edge("supplier", END) return g.compile() graph = construct_graph() if __name__ == "__main__": example = {"operation_id": "OP-12345", "type": "inventory_management", "priority": "high", "location": "Warehouse A"} convo = [HumanMessage(content='''We're running critically low on SKU-12345. Current stock is 50 units but we have 200 units on backorder. What's our reorder strategy?''')] result = graph.invoke({"operation": example, "messages": convo}) for m in result["messages"]: print(f"{m.type}: {m.content}") |
这个多智能体框架体现了适应性的力量。例如,如果查询涉及旺季期间的突然供应中断,监督者可以将其中断路由给运输专家进行立即控制,同时库存专家并发地扩展仓库运营。这种动态重新路由已变得司空见惯,使系统能够响应天气事件或市场变化等实时数据,从而最大限度地减少故障时间并优化资源分配。在我们的代码中,条件边增强了这种灵活性,因为监督者的输出决定了流程,使系统能够在没有严格预定义路径的情况下处理不断变化的情况。这不仅通过潜在的并行性(例如,如果扩展,可以分叉到多个专家)提高了吞吐量,而且还增强了弹性,因为一个智能体的故障(例如,由于 API 故障)不会导致整个流程停止。
适应性是另一个核心优势,因为多智能体系统可以动态响应不断变化的条件。通过协调它们的行动,智能体可以根据需要重新分配角色和职责,实时适应新信息或环境变化。这种适应性使系统能够在复杂和不可预测的场景中保持高效和有效,而静态的单智能体方法可能难以处理这种场景。
然而,多智能体系统并非没有挑战。随着多个智能体的交互,协调的复杂性增加,需要复杂的通信和同步机制,以确保智能体流畅工作。通信开销是另一个挑战,因为智能体必须频繁交换信息以保持一致并避免重复工作。这种通信需求可能会降低系统速度并引入额外的资源需求,尤其是在大规模应用中。此外,如果智能体带有重叠的目标或未能有效确定优先级,可能会发生冲突,因此需要冲突解决和资源分配的协议。
总之,虽然多智能体系统在处理复杂、多方面的任务时提供了强大的优势,但它们也需要仔细规划,以管理它们引入的额外复杂性和协调要求。通过为智能体分配不同的角色,启用并行处理,并结合适应性和冗余,多智能体系统可以实现高水平的性能、可靠性和灵活性,尤其是在单智能体方法不足的场景中。
蜂群智能体 (Swarms)
蜂群(Swarms)代表了智能体系统设计的一种独特方法,其灵感来自分布式自然系统——如鸟群、鱼群或蚁群。在基于蜂群的系统,大量简单的智能体以最小的个体智能运行,但通过局部交互和简单的规则,集体产生智能的、涌现的行为(Emergent Behavior)。
与通常依赖显式角色分配和集中协调的传统多智能体系统不同,蜂群系统强调去中心化和自组织。每个智能体遵循自己的一套局部策略或行为,通常没有系统的全局视图。然而,通过反复的局部交互——例如广播小的更新、对邻居做出反应或根据共享信号进行调整——蜂群智能体可以适应不断变化的条件,解决复杂问题,并表现出稳健的群体级行为。基于蜂群的系统的主要优势包括:
-
可扩展性
由于蜂群智能体是松散耦合和局部驱动的,系统可以扩展到数百或数千个智能体,且协调开销极小。
-
鲁棒性
没有单点故障。如果个别智能体失败,其他智能体可以继续运行,而性能不会显著下降。
-
灵活性
蜂群可以实时适应不断变化的目标或环境,使其非常适合动态或不可预测的场景。
-
分布式问题解决
探索、监控、共识形成或分布式搜索等任务可以通过蜂群动力学有效解决。
蜂群在集中控制不切实际或不可取的环境中特别有效。例如,它们在跨多个来源的大规模数据发现、研究或分布式决策中非常有用。在这些场景中,智能体可以半独立地运行,贡献小洞见或行动,并让全局行为从局部行动的积累中涌现出来。
然而,设计蜂群系统面临独特的挑战,特别是在可预测性、可观测性和效率方面。尽管有这些限制,基于蜂群的系统为可受益于去中心化、并行性和弹性的问题提供了一个强大而优雅的解决方案。虽然并非适用于每个问题领域,但蜂群在分布式环境中表现出色,并且在边缘计算、传感器网络和实时协作系统等领域越来越重要——尤其是在灵活性和鲁棒性比精度或中央控制更重要的地方。
添加智能体的原则
在通过添加更多智能体来扩展系统时,必须采取战略性方法,以确保系统保持高效、可管理和有效。以下原则可作为优化基于智能体的设计和功能的指南:
-
任务分解 (Task Decomposition)
任务分解是一个基本原则,强调将复杂任务分解为更小、可管理的子任务的重要性。通过分解任务,每个智能体可以专注于工作负载的特定方面,简化其职责并提高效率。清晰的任务边界减少了重叠和冗余,确保每个智能体的贡献都是有价值的,并且没有精力被浪费。这种分解不仅提高了单个智能体的性能,还使系统更易于协调和扩展。
-
专业化 (Specialization)
专业化使智能体能够被分配与其优势相匹配的角色,从而最大限度地提高系统的集体能力。当每个智能体负责与其特定功能一致的活动时,系统的运行将更加精确和有效。专业化的智能体更善于处理特定类型的工作,这转化为整体性能的提高和更快的任务执行。通过设计具有不同职责的智能体,系统可以利用多样化的专业知识来处理复杂或多学科的任务。
-
简约原则 (Parsimony)
简约原则是一个指导原则,鼓励仅添加实现所需功能和性能所需的最小数量的智能体。这一原则强调简单性和效率,提醒开发人员,系统中添加的每个智能体都会引入额外的通信开销、协调复杂性和资源需求。通过坚持简约原则,开发人员可以避免不必要的智能体激增,这种激增可能导致维护压力上升和潜在的性能瓶颈。简约原则要求仔细评估每个智能体的角色,并采取严格的智能体分配方法,确保每次添加都能为系统提供明确的价值。在添加智能体之前,开发人员应考虑其职责是否可以通过现有智能体或增强当前能力来履行。这种对简单性的关注产生了一个精简、更易于管理的系统,该系统有效运行而没有过多的冗余。最终,简约原则促进了一个高效、精益的多智能体系统,该系统在最大化功能的同时最小化与复杂性相关的风险和成本。
-
协调 (Coordination)
协调对于多智能体系统的和谐运行至关重要。为了保持智能体之间的一致性,必须建立稳健的通信协议,促进高效的信息共享并降低冲突风险。协调机制还应包括冲突解决协议,特别是在智能体有重叠的任务或资源需求时。在智能体能够无缝交换信息并自主解决问题时,系统就更具弹性和适应性,能够有效地应对动态场景。
-
鲁棒性 (Robustness)
鲁棒性对于增强容错能力和弹性至关重要。冗余涉及添加可以在其他智能体失败时接管的智能体,提供确保不间断运行的备份支持。在高风险环境中,冗余对于保持系统稳定性和可靠性是非常宝贵的。鲁棒性还包括设计能够承受意外中断(如网络故障或智能体宕机)的智能体和工作流。通过将冗余和鲁棒性嵌入系统,开发人员可以确保即使在不利条件下也能保持可运行。
-
效率 (Efficiency)
效率有助于评估添加智能体与随之而来的潜在复杂性或资源需求之间的权衡。每个额外的智能体都会增加计算需求和协调开销,因此权衡扩展功能的优势与这些成本至关重要。通过仔细评估每个智能体添加的成本和收益,开发人员可以做出明智的决定,平衡系统性能、资源效率和可扩展性。
通过遵循这些原则,开发人员可以确定实现所需的性能、效率和复杂性平衡所需的最佳智能体数量和配置。这种深思熟虑的方法使得创建既有能力又可持续的多智能体系统成为可能,最大化新增智能体的好处,同时最小化潜在的缺点。
多智能体协调
智能体之间的有效协调对于多智能体系统的成功至关重要。可以采用各种协调策略,每种策略都有其优点和挑战。本节探讨了几种主要的协调策略,但我们也可能会看到新方法的出现。
民主式协调 (Democratic Coordination)
在民主式协调中,系统内的每个智能体都被赋予平等的决策权,目标是就行动和解决方案达成共识。这种方法的特点是去中心化控制,没有任何单个智能体被指定为领导者。相反,智能体平等地协作和共享信息,贡献他们独特的观点以集体达成决定。民主式协调的关键优势是其鲁棒性;因为没有智能体占据主导地位,系统没有单点故障。这意味着即使一个或多个智能体发生故障,整个系统仍能继续有效运行。另一个优势是灵活性:当智能体公开协作时,他们可以通过更新集体输入来快速适应环境的变化。这种适应性在对新信息做出响应至关重要的动态环境中是必不可少的。
此外,民主式协调促进了智能体之间的公平,确保所有参与者都有平等的发言权,这可以带来更公平的结果。
然而,民主式协调也有其自身的挑战。达成共识的过程通常需要智能体之间进行广泛的通信,导致显着的通信开销。由于每个智能体必须贡献和协商他们的观点,决策过程也可能很慢,可能会在需要快速响应的环境中产生延迟。此外,实施民主协调协议通常很复杂,因为它需要定义明确的通信和冲突解决机制以促进共识建立。尽管存在这些挑战,民主式协调特别适合优先考虑公平性和鲁棒性的应用,例如分布式传感器网络或协作机器人,其中每个智能体的贡献都是有价值的,共识对于系统的成功至关重要。
管理者协调 (Manager Coordination)
管理者协调采用更加集中的方法,其中一个或多个智能体被指定为管理者,负责监督和指导下属智能体的行动。在这个模型中,管理者承担监督角色,制定决策,分配任务,并解决其指导下的智能体之间的冲突。管理者协调的主要优势之一是其简化的决策制定。因为管理者有权代表团队做出决定,系统可以更有效地运行,绕过民主系统中所需的漫长谈判过程。这种集中化还使管理者能够清晰地分配任务和职责,确保智能体专注于特定目标,而不会重复工作或引起冲突。此外,管理者协调简化了通信路径,因为下属智能体主要与指定的管理者通信,而不是与其他每个智能体通信,从而降低了协调复杂性。
然而,对管理者的依赖引入了某些脆弱性。存在单点故障,因为如果管理者智能体失败或受到损害,整个系统可能会中断。此外,随着系统的增长,可扩展性成为一个问题;如果在更大的网络中任务或交互量增加,管理者可能会成为瓶颈。最后,管理者协调中决策制定的集中性质可能会降低适应性,因为管理者可能并不总是能够根据每个所辖环境中的实时变化做出最明智的决定。这种类型的协调在结构化、分层的设置中特别有效,如制造系统或客户支持中心,其中集中控制允许优化的工作流和更快的冲突解决。
层级式协调 (Hierarchical Coordination)
层级式协调采用多层组织方法,通过结构化的层级结合了集中式和去中心化控制的要素。在这个系统中,智能体被组织成多个级别,更高级别的智能体监督和指导下级智能体,同时给予下属智能体一定程度的自主权。这种方法提供了显着的可扩展性优势,因为层级结构使协调职责能够分布在多个级别上。通过这样做,系统可以比完全集中的模型更有效地管理大量智能体。分层设计还引入了冗余,因为任务可以在不同级别进行管理,从而提高了容错能力。层级内清晰的权力线简化了操作,更高级别的智能体处理战略决策,更低级别的智能体专注于战术执行。
尽管有这些优势,层级式协调也存在自身的挑战。设计分层系统的复杂性可能很大,因为必须仔细构建每个级别以确保层与层之间的平滑协调。由于信息需要在到达所有智能体之前通过多个级别传播,可能会出现通信延迟,这可能会减慢对紧急变化的响应速度。此外,更高级别的决策可能会引入延迟,因为更低级别的智能体可能需要在采取行动之前等待指令。尽管存在这些挑战,层级式协调非常适合大型、复杂的系统,如供应链管理或军事行动,其中不同级别的协调可以处理高级规划和实地执行。
Actor-Critic 方法
智能体系统中的 Actor-Critic 模式是一种轻量级的评估驱动迭代形式。在这种设置中,Actor(执行者)负责生成候选输出——如答案、计划或行动——而 Critic(评论者/批评者)充当质量把关人,根据预定义的标准接受或拒绝输出。
过程很简单:Actor 不断产生候选,直到 Critic 确定输出满足所需的质量阈值。这可以被视为一种测试时计算(Test-time Compute)形式,其中额外的推理周期用于提高可靠性和性能。权衡增加了计算成本,但通常能带来显著更好的结果。这种方法在以下情况下特别有效:
-
有明确的评估标准或清单(例如,正确性、完整性、语气)。
-
生成额外输出的成本相对于更高质量带来的收益是可以接受的。
-
任务本质上是模糊的或生成的,单次尝试的表现通常不如重新排序或过滤的方法。
在供应链示例中,一个“Actor”智能体生成再订购计划,一个“Critic”评估其可行性(例如,成本、风险),重复该过程直到获得批准。随后的代码在监督者之后添加了一个 Actor-Critic 循环:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# Actor 节点:生成候选计划 def actor_node(state: AgentState): history = state["messages"] actor_prompt = '''生成 3 个候选供应链计划, 格式为 JSON 列表: [{'plan': 'description', 'tools': [...]}]''' response = llm.invoke([SystemMessage(content=actor_prompt)] + history) state["candidates"] = json.loads(response.content) return state # Critic 节点:评估并选择/迭代 def critic_node(state: AgentState): candidates = state["candidates"] history = state["messages"] critic_prompt = f'''以 1-10 分的量表对候选 {candidates} 的可行性、成本、风险进行评分。如果最高分大于 8 则选择最佳, 否则要求重新生成。''' response = llm.invoke([SystemMessage(content=critic_prompt)] + history) eval = json.loads(response.content) if eval['best_score'] > 8: winning_plan = eval['selected'] # 执行获胜计划的工具(类似于专家执行) messages = [] for tool_info in winning_plan['tools']: tc = {'name': tool_info['tool'], 'args': tool_info['args'], 'id': 'dummy'} fn = next(t for t in all_tools if t.name == tc['name']) out = fn.invoke(tc["args"]) messages.append(ToolMessage(content=str(out), tool_call_id=tc["id"])) # 发送响应 send_fn.invoke({"message": winning_plan['plan']}) return {"messages": history + messages} else: # 迭代:将反馈添加到历史记录以供 Actor 使用 return {"messages": history + [AIMessage(content="根据反馈进行改进并重新生成: " + eval['feedback'])]} def construct_actor_critic_graph(): g = StateGraph(AgentState) g.add_node("actor", actor_node) g.add_node("critic", critic_node) g.set_entry_point("actor") g.add_edge("actor", "critic") # 如果未获批准则循环回退(条件边) g.add_conditional_edges("critic", lambda s: "actor" if "regenerate" in s["messages"][-1].content.lower() else END) return g.compile() |
当评估比生成更容易时,Actor-Critic 设置特别有用。如果你能可靠地说“这是一个好的输出”,但不能轻易地在第一次尝试中产生它,那么一个简单的 Actor-Critic 循环就是一个强大的工具——不需要学习。作为一种易于实施的策略,当性能提升值得额外的计算成本时,通常值得一试。
智能体系统的自动化设计
智能体系统自动化设计(ADAS)代表了智能体开发的一种变革性方法,它摆脱了手工构建的架构,转向能够自我设计、评估和迭代改进的系统。正如 Shengran Hu、Cong Lu 和 Jeff Clune 在他们 2024 年的原始论文[1]中所阐述的,ADAS 的核心思想是,与其手动构建智能体的每个组件,不如启用更高级别的元智能体搜索(Meta Agent Search, MAS)算法来自动创建、评估和完善智能体系统。这种方法开辟了一个新的研究前沿——它可以使智能体适应复杂、不断变化的环境,并在没有人为直接干预的情况下不断提高自身能力。如图 8-1 所示,ADAS 建立在这样一个理念之上:历史上,机器学习(ML)中的手工设计解决方案通常被学习或自动化的替代方案所取代,这表明智能体系统也可能从这种转变中受益。
在 ADAS 中,基础模型充当智能体架构中灵活的通用模块。这些模型已经支持思维链(Chain-of-Thought)推理、自我反思和基于 Toolformer 的智能体等策略,它们构成了可以分层更专业或特定任务能力的基础。然而,ADAS 试图超越这些传统方法,使智能体能够自主发明全新的结构和模块。基础模型的多功能性提供了一个理想的起点,但 ADAS 利用自动化流程超越预定义的能力,使智能体能够演变出新颖的提示词、控制流和工具使用。这些构建块不是静态的;相反,它们是由元智能体动态生成的,元智能体可以根据不断变化的需求或改进机会不断尝试新的设计。
图 8-1. ADAS 框架的核心组件:搜索空间概述了可表示的智能体架构的范围。搜索算法规定了该空间内的探索策略。评估函数量化候选智能体针对目标(如性能、鲁棒性和效率)的有效性。来自原始论文。
ADAS 的基石是通过代码定义智能体的概念。利用图灵完备的编程语言,该框架理论上允许智能体发明任何可构想的结构或行为。这包括复杂的工作流、创造性的工具集成和人类设计师可能未预见到的创新决策过程。ADAS 的力量在于这种基于代码的方法,它将智能体视为灵活的构造,而不是静态的实体,可以随着时间的推移重新定义、修改和优化。这种方法的潜力是巨大的:原则上,元智能体可以开发出种类繁多的智能体,不断完善和组合元素,以追求在各种任务中获得更高的性能。
ADAS 的核心是 MAS 算法,这是一种具体的方法,展示了元智能体如何自主生成和完善智能体系统。在 MAS 中,元智能体充当设计师,编写代码定义新智能体,并在任务中测试这些智能体。每一个成功的设计都会被归档,形成一个不断增长的知识库,为未来智能体的创建提供信息。MAS 通过迭代循环运行:元智能体以先前智能体的存档为条件,生成高级设计描述,用代码实现它(为智能体定义“forward”函数),并通过两个自我反思步骤来完善新颖性和正确性。新智能体在验证数据上进行评估;错误会触发多达五次的调试改进。成功的智能体将与其性能指标(例如准确性或 F1 分数)一起归档,为未来的迭代提供信息。这一过程模仿了进化过程,在探索新设计与利用高性能者之间取得平衡。元智能体既是创造者又是策展人,平衡了对新设计的探索和对成功模式的利用。这个过程反映了生物系统的进化,其中成功的特征被保留并迭代修改以适应新的挑战。
为了说明 MAS 如何实施这些想法,考虑一个受开源 ADAS 启发的通用 Python 实现。该框架使用基础模型(例如 GPT-5)作为元智能体来生成和完善智能体代码。关键组件包括用于提示的基础模型智能体基类、用于迭代进化的搜索循环以及用于适应度评分的评估函数。这些元素使元智能体能够动态发明用于网格谜题(ARC [抽象和推理语料库])或多项选择推理(MMLU)等任务的智能体,归档高性能者以供将来使用:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
class LLMAgentBase: def __init__(self, output_fields: list, agent_name: str, role='helpful assistant', model='gpt-4o-2024-05-13', temperature=0.5): self.output_fields = output_fields self.agent_name = agent_name self.role = role self.model = model self.temperature = temperature self.id = random_id() # 智能体实例的唯一 ID def generate_prompt(self, input_infos, instruction, output_description): # 构建带有角色和 JSON 格式指令的系统提示词 system_prompt = f"You are a {self.role}.\n\n" + FORMAT_INST(output_description) # 从输入和指令构建用户提示词 prompt = '' # (从 infos 构建输入文本) + instruction return system_prompt, prompt def query(self, input_infos: list, instruction, output_description, iteration_idx=-1): system_prompt, prompt = self.generate_prompt(input_infos, instruction, output_description) response_json = get_json_response_from_gpt(prompt, self.model, system_prompt, self.temperature) # 处理错误,解析 JSON output_infos = [Info(key, self.__repr__(), value, iteration_idx) for key, value in response_json.items()] return output_infos |
LLMAgentBase 类构成了元智能体的核心,包装了与基础模型的交互以生成结构化响应(例如,想法、代码)。它强制执行 JSON 输出以实现可解析性,并优雅地处理错误,允许元智能体根据归档的先验查询新的智能体设计。这种模块化设计确保了灵活性:角色(例如,“乐于助人的助手”)和温度(用于创造力)可以调整,而输出描述指导特定于任务的行为,例如为 MMLU 仅返回单字母答案。
MAS 的核心是搜索函数,它在各存档版本中迭代以进化智能体。从初始档案(例如,基本的基于提示词的智能体)开始,它以过去的成功为条件,生成新代码,应用 Reflexion 进行完善,在验证数据上进行评估,并归档适应度评分的解决方案。这个循环平衡了探索(新颖设计)和利用(建立在高性能者之上),通常运行 25-30 版:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
def search(args, task): archive = task.get_init_archive() # 或加载现有的 for n in range(args.n_generation): # 从档案生成提示词 msg_list = [{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}] next_solution = get_json_response_from_gpt_reflect( msg_list, args.model) # 初始生成 # Reflexion: 两个步骤来完善 next_solution = reflect_and_refine(msg_list, task.get_reflexion_prompt()) # 用于反思的伪代码 # 评估和调试 acc_list = evaluate_forward_fn(args, next_solution["code"], task) next_solution['fitness'] = bootstrap_confidence_interval(acc_list) archive.append(next_solution) def evaluate_forward_fn(args, forward_str, task): # 动态加载智能体代码作为函数 exec(forward_str, globals(), namespace) func = namespace['forward'] # 假设单个函数 data = task.load_data(SEARCHING_MODE) # 验证集或测试集 task_queue = task.prepare_task_queue(data) # 并行评估 with ThreadPoolExecutor() as executor: acc_list = list(executor.map(process_item, task_queue)) # process_item: 运行 func, 与真实值比较得分 return acc_list |
exec)作为可调用的 forward 函数,将其并行应用于任务数据(为了效率使用多线程),并通过特定于任务的评分计算准确性。这种模块化设置通过子类化 BaseTask 抽象类,可以轻松适应新问题,该类定义了数据加载、格式化和预测解析的方法。例如,在 MMLU 中,它将字母选项(A–D)映射到索引以进行精确匹配评分,而在 ARC 中,它评估网格变换的像素级精度。此类实现证明了 ADAS 的通用性,从而产生了观察到的强大实证结果。MAS 的结果揭示了通过 ADAS 设计的智能体的一个有趣特性:即使应用在新的领域和模型上,它们也往往能保持高水平的性能。例如,在 ARC 挑战赛(网格转换谜题)中,MAS 发现的智能体优于手工设计的基线,如思维链(CoT)、Self-Refine 和 LLM-Debate。在推理基准测试中,MAS 在 DROP(阅读理解)上获得了 79.4 ± 0.8 的 F1 分数(比 Role Assignment 基线高 +13.6),在 MGSM(数学)上获得了 53.4% ± 3.5 的准确率(比 LLM-Debate 高 +14.4%),在 MMLU(多任务)上获得了 69.6% ± 3.2(比 OPRO 提示优化高 +2%),在 GPQA(科学)上获得了 34.6% ± 3.2(比 OPRO 高 +1.7%)。跨域迁移表现稳健(例如,ARC 智能体应用于 MMLU),并且在切换模型时性能保持不变(例如,从 GPT-3.5 到 GPT-4)。
这种跨领域的稳健性表明,通过 MAS 创建的智能体不仅仅是针对一次性任务进行了优化;相反,它们体现了更普遍的原则和适应性结构,使它们即使在环境细节发生变化时也能表现出色。这种跨域可转移性反映了自动化设计的一个根本优势:通过生成本质上灵活的智能体,MAS 产生的解决方案可以比那些为狭窄、专业背景设计的解决方案更有效地泛化。
ADAS 拥有巨大的前景,但其发展需要仔细考虑道德和技术层面。自动化设计更强大智能体的潜力引发了关于安全性、可靠性以及与人类价值观一致性的问题。虽然 MAS 提供了一种结构化和探索性的方法,但至关重要的是要确保不断发展的智能体遵守道德标准,并且不会发展出可能与人类意图不一致的不可预见的行为。确保这些系统是有益性要求平衡自主性和约束,给予智能体创新的自由,同时指导它们在安全和可预测的范围内运行。
ADAS 的发展轨迹表明,未来智能体系统可以自主适应、改进并以极少的人为干预处理不断扩大的任务范围。随着 ADAS 的进步,智能体开发更复杂设计的能力可能会成为 AI 研究的基石,提供可以应对日益复杂、不断变化的挑战的工具。通过这种方式,ADAS 让我们瞥见了一个具有自我完善和创新能力的智能系统的未来,体现了从静态、预先设计的智能体向适应性、自主系统的转变,这些系统随着我们不断扩大的需求而成长。
通信技术
随着智能体系统从单智能体原型成长为多智能体、分布式系统,通信架构的选择变得越来越关键。起初简单的内存消息传递或函数调用,随着系统在范围、智能体数量、地理分布或部署复杂性方面的增长,很快就会变得难以为继。本节探讨了可用于管理智能体之间的通信、协调和任务流的核心技巧和技术——尤其是当系统从单设备实验过渡到生产级分布式部署时。读者会注意到有许多有效的方法,它们在开发工作量、延迟、可扩展性、可靠性和成本方面都有不同的权衡。
本地与分布式通信
在小规模——例如单设备或单进程设置——智能体通常通过直接函数调用、共享内存或内存消息队列进行通信。虽然简单高效,但这些方法无法很好地扩展。一旦智能体分布在服务、容器或节点之间,通信就必须是显式的、异步的和容错的。
在本地部署中,像 AutoGen 这样的框架通常使用内存路由器来编排智能体消息传递和工具调用。这些设置对于研究和原型设计非常有效,尤其是对于单线程或单智能体配置。但对于生产使用,通信和状态管理必须进化。
智能体对智能体协议 (Agent-to-Agent Protocol)
由 Google 推出的智能体对智能体(A2A)协议是朝着使自主智能体能够协作实现更复杂目标迈出的雄心勃勃且充满希望的一步。它为智能体相互发现、协商协作和交换结构化请求提供了一种标准化的跨平台机制——而无需透露内部逻辑或实施细节。通过使异构智能体能够通过基于 HTTP 的传输进行互操作,A2A 创建了一种通用语言,假以时日,它可以使多智能体协调像微服务之间的 API 调用一样常规。
A2A 的核心是 Agent Card(智能体卡片),这是一个机器可读的 JSON 描述符,每个智能体发布它来表明其身份、能力(Capabilities)、端点和支持的身份验证方法。这些卡片使智能体能够找到同伴,评估其功能,并协商安全的通信渠道。能力被明确定义——例如 generateReport、summarizeLegalDocument——以及输入和输出的模式(Schemas),从而实现智能体工作流的结构化组合。端点信息和支持的身份验证方法(如 OAuth 2、API 密钥)确保可以安全地通过编程方式建立通信。版本控制和媒体支持等可选元数据进一步丰富了智能体的发现和兼容性。为了说明这一点,这里有一个简单的 Python 字典,表示一个摘要智能体的 Agent Card:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
agent_card = { "identity": "SummarizerAgent", "capabilities": ["summarizeText"], "schemas": { "summarizeText": { "input": {"text": "string"}, "output": {"summary": "string"} } }, "endpoint": "http://localhost:8000/api", "auth_methods": ["none"], # 在生产环境中:OAuth2, API keys 等 "version": "1.0"} |
此 JSON 可以在大家熟知的端点(如 /.well-known/agent.json)上提供以供发现。A2A 使用基于 HTTPS 的 JSON-RPC 2.0 作为其参考实现,但该协议被设计为与传输无关。这为通过 gRPC、WebSocket 或其他流和多路复用协议进行集成打开了大门,随着基础设施需求的发展。JSON-RPC 确保了对请求、响应和错误的一致处理,即使在以不同语言或框架构建的智能体之间也能创建共享的语义模型。
在实际使用中,智能体通过存储 Agent Card 的注册表(集中式或分布式)相互定位。一旦识别出同伴,发起智能体将执行握手,交换 Agent Card 并协商会话参数,如协议版本、超时预期或有效参数限制。例如,客户端智能体可能会发现并协商兼容性,如下所示(使用 Python 的 requests 库):
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import requests import json # 发现 Agent Card(模拟为直接访问;在生产中,查询注册表) card_url = 'http://localhost:8000/.well-known/agent.json' response = requests.get(card_url) if response.status_code != 200: raise ValueError("Failed to retrieve Agent Card") agent_card = response.json() print("Discovered Agent Card:", json.dumps(agent_card, indent=2)) # 握手:检查兼容性 if agent_card['version'] != '1.0': raise ValueError("Incompatible protocol version") if "summarizeText" not in agent_card['capabilities']: raise ValueError("Required capability not supported") print("Handshake successful: Agent is compatible.") |
一旦验证通过,智能体就可以开始协调工作:智能体 A 可以向智能体 B 发出 requestSummarize 调用,智能体 B 随后处理请求并视情况返回结构化响应或错误。继续该示例,以下是客户端如何发出 JSON-RPC 请求:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# 发出 JSON-RPC 请求 rpc_url = agent_card['endpoint'] rpc_request = { "jsonrpc": "2.0", "method": "summarizeText", "params": {"text": '''This is a long example text that needs summarization. It discusses multiagent systems and communication protocols.'''}, "id": 123 # 唯一的请求 ID } response = requests.post(rpc_url, json=rpc_request) if response.status_code == 200: rpc_response = response.json() print("RPC Response:", json.dumps(rpc_response, indent=2)) else: print("Error:", response.status_code, response.text) |
在服务器端,处理此请求可能如下所示(为简单起见使用 Python 的 http.server):
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# 服务器处理程序的摘录(在 do_POST 方法中) import os from openai import OpenAI content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) rpc_request = json.loads(post_data) # 处理 JSON-RPC 请求(A2A 的核心) if rpc_request.get('jsonrpc') == '2.0' \ and rpc_request['method'] == 'summarizeText': text = rpc_request['params']['text'] # 使用 OpenAI API 进行真正的 LLM 摘要 client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) try: llm_response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": '''You are a helpful assistant that provides concise summaries.'''}, {"role": "user", "content": f"""Summarize the following text: {text}"""} ], max_tokens=150, temperature=0.7 ) summary = llm_response.choices[0].message.content.strip() except Exception as e: summary = f"Error in summarization: {str(e)}" # 错误的后备方案 response = { "jsonrpc": "2.0", "result": {"summary": summary}, "id": rpc_request['id'] } # 发送响应 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(response).encode()) else: # 错误响应 error_response = { "jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": rpc_request.get('id') } self.send_response(400) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(error_response).encode()) |
虽然 A2A 为多智能体系统提供了一个令人兴奋的方向——提供了一种模块化、运行时无关的委托和协调方法——但它仍处于起步阶段。仍然存在重大的开放性问题,特别是在安全性方面。目前通过可插拔机制支持身份验证,但健壮的授权、速率限制、信任建立和反滥用远未解决。与任何早期协议一样,应以既热情又谨慎的态度对待它。早期采用者应预料到漏洞、实施差距和不断发展的规范。
尽管如此,A2A 指向了一个未来,在这个未来中,智能体不是孤立运行,而是作为动态的、松散耦合的生态系统的一部分,能够解决更广泛、更复杂的问题。就像 HTTP 实现了网络的组合性一样,A2A 渴望为 AI 智能体做同样的事情。现在说它是否会成为标准还为时过早——但这是在追求使智能体合作无缝、可扩展且安全的道路上一个充满希望的开端。
消息代理与事件总线 (Message Brokers and Event Buses)
随着基于智能体的系统扩展,点对点通信变得脆弱且不灵活。一种常见的替代方案是采用消息代理或事件总线,它们将发送者与接收者解耦,并使智能体能够通过共享的通信结构进行异步交互。这种模式建立了可扩展、容错和可观察的工作流,尤其是在松散耦合的多智能体架构中。
要了解这种方法的实用性,可以考虑将消息代理集成到本章前面的供应链多智能体系统中。在最初的同步设置中,监督者直接通过图的边路由到专家,造成紧密耦合。通过使用代理,监督者可以将任务发布到共享topic(例如,“supply-chain-tasks”),专家异步订阅——仅处理相关消息。这解耦了智能体,实现了独立扩展(例如,重放库存实例)、容错(例如,重放丢失的消息)以及在不重写图的情况下更轻松地添加新智能体。主要选项包括:
-
Apache Kafka
这是一个高吞吐量、分布式的事件流平台,非常适合智能体需要发布和消费结构化事件的系统。Kafka 支持强大的持久性、用于并行处理的主题分区以及用于协调的消费者组。对于构建基于日志的通信架构特别有效,其中每个交互都被保留且可重放。
-
Redis Stream 和 RabbitMQ
这些是用于较低吞吐量或更简单用例的轻量级替代方案,具有更低的延迟和更容易的部署。特别是 Redis Stream 提供快速、基于内存的通信,尽管持久性较为有限。
-
Neural Autonomic Transport System (NATS)
专为低延迟、高吞吐量通信而设计的轻量级、云原生消息传递系统。NATS 是微服务或边缘环境中实时智能体协调的理想选择。它支持发布/订阅、请求/回复,并且——通过 JetStream——支持持久消息流和重放。NATS 强调简单性、速度和可扩展性,使其非常适合需要快速、弹性通信且开销最小的分布式智能体系统。
对于供应链智能体系统,Redis Stream 提供了非常适合原型设计的快速、低延迟解耦。监督者将任务添加到流中,专家在单独的进程中读取/消费它们。假设 Redis 正在运行(例如,通过 Docker: docker run -p 6379:6379 redis)并使用 redis-py (pip install redis)。监督者确定专家并发布任务:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import redis import json import uuid # 序列化消息的辅助函数 def serialize_messages(messages): return [m.dict() for m in messages] def supervisor_publish(operation: dict, messages): # ... (现有的监督者提示词和 LLM 逻辑以获取 agent_name) r = redis.Redis(host='localhost', port=6379) task_id = str(uuid.uuid4()) task_message = { 'task_id': task_id, 'agent': agent_name, 'operation': operation, 'messages': serialize_messages(messages) } r.xadd('supply-chain-tasks', {'data': json.dumps(task_message)}) return task_id |
专家(例如,库存)在一个循环中消费,使用其节点逻辑进行处理,并发布响应:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import redis import json # 反序列化消息的辅助函数 def deserialize_messages(serialized): # 根据类型重新水化 (HumanMessage, AIMessage 等) return [...] # 实现同完整代码 def inventory_consumer(): r = redis.Redis(host='localhost', port=6379) last_id = '0' # ... (inventory_prompt) while True: msgs = r.xread({'supply-chain-tasks': last_id}, count=1, block=5000) if msgs: stream, entries = msgs[0] for entry_id, entry_data in entries: task = json.loads(entry_data[b'data']) if task['agent'] == 'inventory': state = { 'operation': task['operation'], 'messages': deserialize_messages(task['messages']) } result = specialist_node(state, inventory_llm, inventory_prompt) response = { 'task_id': task['task_id'], 'from': 'inventory', 'result': {'messages': serialize_messages( result['messages'])} } r.xadd('supply-chain-responses', {'data': json.dumps(response)}) last_id = entry_id |
然后,我们设置类似的消费者循环来为运输和供应商专家运行。要等待响应:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import time def wait_for_response(task_id, timeout=60): r = redis.Redis(host='localhost', port=6379) last_id = '0' start = time.time() while time.time() - start < timeout: msgs = r.xread({'supply-chain-responses': last_id}, count=1, block=5000) if msgs: stream, entries = msgs[0] for entry_id, entry_data in entries: resp = json.loads(entry_data[b'data']) if resp['task_id'] == task_id: return resp last_id = entry_id raise TimeoutError("No response") |
通常,明智的做法是在单独的进程中运行专家(例如,通过多处理)。这使得快速异步协调成为可能——例如,供应商智能体可以处理合规任务而不会阻塞其他智能体——同时保持较低规模系统的简单设置。
消息总线支持智能体之间的松散耦合,允许灵活扩展、通过日志管道实现可观测性,以及重放失败或丢失的消息。然而,这也引入了围绕最终一致性的挑战以及对更复杂错误处理的需求。
Actor 框架:Ray, Orleans, 和 Akka
虽然消息总线主要通过在组件之间异步路由事件来解耦通信——专注于数据流而不规定执行——但 Actor 框架将消息传递和计算集成到一个统一模型中。在这里,Actor(代表智能体)不仅交换消息,还封装了自己的状态和行为,确保顺序处理以消除竞态条件和传统线程系统中常见的共享状态错误。这与许多开发人员最初采用的标准单体方法形成了鲜明对比:部署一个处理所有逻辑的单容器智能体服务,通常依赖同步的基础模型调用和内存编排。虽然对于原型来说很简单,但这样的设置在规模上会成为瓶颈——容易出现单点故障,空闲期间资源利用效率低,以及在没有自定义并发黑客手段的情况下难以并行化不同的智能体角色。
Actor 框架在需要细粒度分布、弹性和动态扩展的场景中大放异彩,例如具有持久记忆的各个智能体的多智能体模拟(例如,跟踪对话历史或学到的行为)、实时竞价或 IoT 协调等高并发环境,或跨集群集成异构智能体的系统。它们实现了“位置透明”的调用——Actor 可以在不更改代码的情况下迁移或复制——以及内置的监督以从故障中自动恢复,与手动管理队列或容器相比,减少了运营开销。
当系统超过几个智能体或处理可变工作负载时,对基础设施的投资(例如,设置集群、监控 Actor 生命周期)就会得到回报:例如,在停机成本很高的生产智能体蜂群中,或者当从本地原型演变为云原生部署时。对于较小的、低流量的设置,增加的复杂性可能不值得——保持使用总线或单体服务——但随着智能体数量增长超过 10-20 或延迟要求收紧,Actor 提供了无与伦比的弹性和容错能力。该领域的三个主要框架是 Ray、Orleans 和 Akka,每个框架都根据环境和语言生态系统提供独特的优势:
-
Ray
Ray 是一个 Python 原生的分布式计算框架,支持用于有状态、可扩展计算的 Actor 模型。Ray 中的 Actor 使用
@ray.remote装饰器定义,支持异步方法调用,处理消息的同时在调用之间保留内部状态。Ray 自动管理分布,具有资源感知调度、通过可选的重启和重试实现的容错能力,以及支持集群以处理大规模部署。它与 AutoGen 或 LangGraph 等工具自然兼容用于智能体系统,在优先考虑易用性和快速原型设计而非 JVM 特定(Java 虚拟机)性能调整的 Python 环境中提供了一种轻量级的替代方案。 -
Orleans
Orleans 提供了一个虚拟 Actor 模型,其中 Actor(或智能体)是逻辑上可寻址的,并根据需求自动实例化、暂停或恢复。Orleans 处理状态持久性、并发和生命周期管理,样板代码极少。它抽象了分布式系统的许多复杂性,同时使开发人员能够自然地在集群中扩展类智能体组件。当与 AutoGen 配对时,Orleans 可以为将每个智能体视为服务的智能体系统提供动力,随着系统需求动态扩展,同时保留内部状态和身份。
-
Akka
Akka 是 JVM 生态系统中一个完善的 Actor 框架,支持 Java 和 Scala。Akka 的经典 Actor 模型性能极高,适合构建容错、分布式系统,并对 Actor 行为进行细粒度控制。通过 Akka Cluster,Actor 可以分布在多个节点上,支持分片、持久性、监督和自适应负载平衡等高级功能。Akka 特别适合需要严格控制并发的高吞吐量、低延迟应用程序,并已被用于从电信系统到交易平台的各种生产环境中。
这种 Actor 风格的设计与多智能体协调自然契合,其中每个智能体保持自己的身份、角色和内部状态。Actor 系统使这些智能体能够被动态调用,对消息或事件做出反应,并通过消息传递而非共享状态或全局控制来管理复杂的工作流。
由于本书强调基于 Python 的多智能体系统实现(例如,使用 LangChain 和相关库),我们将用集成到供应链系统中的 Ray 示例来说明 Actor 模型。类似的原则适用于 Orleans(主要基于 .NET,适合 Windows 生态系统或企业集成)和 Akka(专注于 JVM,适合高性能 Java/Scala 应用程序),但它们的代码需要特定于语言的调整,超出了我们以 Python 为中心的范围。
在供应链多智能体系统的背景下,专家智能体(例如,库存、运输)被实现为具有每个会话隔离的 Ray Actor。每个会话(由 operation_id 标识)每种专家类型都有自己的 Actor 实例,确保干净的状态管理——每个会话隔离的历史记录或缓存——同时保证该会话中任务在该 Actor 内的顺序执行。这避免了跨会话污染,并在集群中的会话之间实现了并行处理。会话管理器 Actor 负责按需跟踪和创建这些 Actor。以下是专家的核心 Ray Actor 类,它按顺序处理任务并维护隔离的会话状态:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
@ray.remote class SpecialistActor: def __init__(self, name: str, specialist_llm, tools: list, system_prompt: str): self.name = name self.llm = specialist_llm self.tools = {t.name: t for t in tools} self.prompt = system_prompt self.internal_state = {} def process_task(self, operation: dict, messages: Sequence[BaseMessage]): if not operation: operation = {"operation_id": "UNKNOWN", "type": "general", "priority": "medium", "status": "active"} operation_json = json.dumps(operation, ensure_ascii=False) full_prompt = self.prompt + f"\n\nOPERATION: {operation_json}" full = [SystemMessage(content=full_prompt)] + messages first = self.llm.invoke(full) result_messages = [first] if hasattr(first, "tool_calls"): for tc in first.tool_calls: print(first) print(tc['name']) fn = self.tools.get(tc['name']) if fn: out = fn.invoke(tc["args"]) result_messages.append(ToolMessage(content=str(out), tool_call_id=tc["id"])) second = self.llm.invoke(full + result_messages) result_messages.append(second) # 更新内部状态(示例:跟踪会话内已处理的步骤) step_key = str(len(self.internal_state) + 1) # 或使用更具体的键 self.internal_state[step_key] = {"status": "processed", "timestamp": time.time()} return {"messages": result_messages} def get_state(self): return self.internal_state # 返回整个会话状态 |
该 Actor 封装了基础模型和工具逻辑,通过 process_task 串行处理消息(任务)——Ray 对同一个 Actor 的并发调用进行排队并逐个执行,保持顺序和状态完整性。internal_state 字典是会话隔离的,因为每个 Actor 都是按会话创建的,无需共享内存风险即可实现每个会话的持久性(例如,步骤跟踪)。会话管理器 Actor 处理动态创建以实现隔离:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
@ray.remote class SessionManager: def __init__(self): self.sessions: Dict[str, Dict[str, ray.actor.ActorHandle]] = {} def get_or_create_actor(self, session_id: str, agent_name: str, llm, tools: list, prompt: str): if session_id not in self.sessions: self.sessions[session_id] = {} if agent_name not in self.sessions[session_id]: actor = SpecialistActor.remote(agent_name, llm, tools, prompt) self.sessions[session_id][agent_name] = actor return self.sessions[session_id][agent_name] def get_session_state(self, session_id: str, agent_name: str): if session_id in self.sessions and \ agent_name in self.sessions[session_id]: actor = self.sessions[session_id][agent_name] return actor.get_state.remote() # 返回 future return None |
管理器使用字典按 session_id 和 agent_name 跟踪 Actor,延迟创建。这实现了可扩展性:Ray 将 Actor 分布在集群节点上,查询状态(例如,ray.get(manager.get_session_state.remote(session_id, agent_name)))检索特定于会话的数据,而无需全局共享。
对于构建智能体系统的开发人员来说,像 Orleans 和 Akka 这样的 Actor 框架为将每个智能体表示为自主的、独立的单元提供了一个经过验证的、可扩展的基础——能够处理异步工作流、维护持久记忆并干净地集成到分布式基础设施中。
编排与工作流引擎
即使有稳健的消息传递和智能体执行模型,现实世界的系统也需要编排——对任务进行排序、处理重试、跟踪依赖关系以及管理跨智能体故障的逻辑。这对于跨越时间和组件的长时间运行或多步骤交互尤为重要。工作流编排工具提供了更高级别的抽象,确保复杂智能体系统的持久性和可恢复性。
工作流编排工具在流程涉及不可靠的外部依赖关系(例如 API、基础模型或人工批准)、潜在故障或延长时间(例如由于异步智能体行动或现实世界延迟而可能需要数天的供应链工作流)时特别有用。通过持久化状态和自动恢复,这些引擎防止数据丢失和重复工作,使其对于生产级可靠性至关重要,而简单的内存协调在这方面往往不足。当从原型扩展到弹性部署时,请使用编排工具,尤其是在金融交易、合规性繁重的操作或分布式 AI 智能体等高风险场景中;对于快速、低风险的实验,基本的脚本可能就足够了。
Temporal 提供持久的、有状态的工作流,支持长时间运行的任务、重试和故障恢复。它是管理多智能体系统的理想选择,其中每个智能体可能执行异步、多步骤的动作。Temporal 工作流提供了一个清晰的抽象,用于封装跨越多个服务或智能体并持续较长时间的业务逻辑。
为了说明 Temporal 在供应链多智能体系统中的持久执行,考虑一个对智能体步骤进行排序的工作流(例如,库存管理,然后是运输安排,接着是供应商合规)——具有自动重试和用于恢复的持久状态。Temporal 确保即使在崩溃后,工作流也能从最后一个成功的步骤恢复,使其适合生产级智能体协调。假设已设置好 Temporal (例如,通过 pip install temporalio),并且为每个专家定义了活动(封装其基础模型/工具逻辑)。这是一个简化的工作流定义:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
from datetime import timedelta from temporalio import workflow from temporalio.common import RetryPolicy # 假设活动在其他地方定义,例如 inventory_activity, # transportation_activity, supplier_activity # 每个活动接受操作字典和消息,返回结果 @workflow.defn class SupplyChainWorkflow: @workflow.run async def run(self, operation: dict, initial_messages: list) -> dict: # 步骤 1: 带重试的库存管理 inventory_result = await workflow.execute_activity( "inventory_activity", {"operation": operation, "messages": initial_messages}, start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3) ) # 更新状态并继续进行运输 updated_messages = initial_messages + inventory_result["messages"] transportation_result = await workflow.execute_activity( "transportation_activity", {"operation": operation, "messages": updated_messages}, start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3) ) # 最后步骤: 供应商合规 final_messages = updated_messages + transportation_result["messages"] supplier_result = await workflow.execute_activity( "supplier_activity", {"operation": operation, "messages": final_messages}, start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3) ) # 编译并返回结果 return { "inventory": inventory_result, "transportation": transportation_result, "supplier": supplier_result } |
此工作流持久地对智能体进行排序。每个活动(智能体步骤)都带有重试运行,并且 Temporal 持久化进度——例如,如果运输失败,它会重试而无需重新运行库存。对于长时间运行的流程,添加用于用户输入或暂停的信号,类似于完整示例中的确认处理。
Apache Airflow 广泛用于数据管道,但也可以通过 DAG(有向无环图)协调智能体流。虽然功能强大,但 Airflow 最适合批处理或时间触发的工作流。Airflow 仍然是数据工程和业务运营中用于定时的、工具无关的编排的主要工具,例如 ETL(提取、转换、加载)作业或 ML 模型训练。在处理受益于其成熟生态系统和可视化工具的周期性、依赖性重的管道时,选择 Airflow,但不要用于实时或高度动态的智能体交互。
对于喜欢在扩展到分布式环境之前在本地进行原型设计和运行编排的开发人员来说,像 Dagger 这样的工具特别有用,它允许使用容器、基础模型和其他资源将工作流编写为代码,并具有自动缓存和类型安全性。这确保了本地开发、CI/CD 管道和生产之间的一致性,甚至支持智能体集成,例如由基础模型启用的自动化,使其成为根据不同技术栈灵活选择。工作流引擎提供了更高层的抽象——将协调逻辑与通信机制分开。它们有助于确保幂等性、可恢复性和持久状态——当智能体失败、停滞或必须响应不断变化的环境时,这些特性变得至关重要。
管理状态与持久化
仅有通信是不够的——多智能体系统还必须管理经常跨越多次执行、工作流或系统重启的共享状态、智能体记忆和任务元数据。这在数据持久性、一致性和访问模式方面引入了显着的复杂性,特别是随着系统的扩展。
正如表 8-1 所示,传统的解决方案依赖于像 PostgreSQL、Redis 或向量存储这样的有状态数据库来持久化任务结果、交互日志和智能体记忆。这些提供了细粒度的控制,可以根据每个智能体的需求进行定制,但它们也需要开发人员显式管理模式设计、读/写一致性、缓存和恢复逻辑——增加了工程开销和出现细微错误的机会。
对于非结构化或大规模输出(例如,计划、工具跟踪、JSON blobs),像 Amazon S3 或 Azure Blob Storage 这样的对象存储选项提供了具有高可用性的持久、低成本存储。这对于不可变工件(artifact)非常理想,但在访问延迟和需要单独的索引或跟踪系统以将工件与智能体任务或状态关联方面存在权衡。
表 8-1. 持久化存储选项概览
| 方法 | 优点 | 缺点 | 适用场景 |
| 关系型数据库 (如 PostgreSQL/Redis) | 灵活、可查询、成本效益高 | 手动管理、潜在的不一致性 | 自定义、高查询系统 |
| 向量存储 (如 Pinecone) | 语义搜索、可扩展嵌入 | 成本较高、专门设置 | 知识密集型智能体 |
| 对象存储 (如 S3) | 便宜、适合大数据持久化 | 访问慢、无原生索引 | 归档输出 |
| 有状态编排框架 | 自动恢复、低样板代码 | 框架锁定 | 弹性、长运行工作流 |
像 Temporal 和 Orleans 这样的框架提供了一种不同的方法:它们通过将状态管理紧密集成到智能体或工作流生命周期中,抽象了大部分持久性复杂性。Temporal 自动设置工作流进度检查点,支持确定性重放,并透明地处理故障。Orleans 使每个 Actor(智能体)能够以最少的样板代码维护持久的、事件驱动的状态。这些抽象减少了开发工作量并提高了弹性,但它们也施加了特定于框架的约束——例如序列化格式、执行模型或语言绑定——这可能并不适合每个架构。
正确的选择取决于所需的内存和协调的性质:
-
情景记忆(Episodic memory)(短寿命、特定于任务的状态)可能只需要内存或瞬态存储,持久性要求最低。
-
语义记忆(Semantic memory)(跨交互的长期知识)通常需要具有搜索或向量索引功能的持久存储。
-
工作流持久性(Workflow durability)(对中途故障的弹性)最能从 Temporal 或 Orleans 等集成引擎中受益,这些引擎会自动检查点进度和状态。
最终,持久性决策反映了开发人员工作量、性能、持久性和灵活性之间的权衡。具有严格服务级别协议(SLA)、跨智能体依赖关系或实时协调要求的系统通常会从工作流原生持久层中受益,而更多模块化或以研究为导向的系统可能更喜欢提供更多控制和可见性的显式、数据库驱动的状态管理。
结论
从单智能体向多智能体系统的过渡为解决复杂任务、增强适应性和提高效率提供了显著优势。然而,正如我们在本章中所探讨的那样,添加更多智能体带来的可扩展性也带来了需要仔细规划的挑战。决定智能体的最佳数量需要对任务复杂性、潜在的任务分解以及多智能体协作的成本效益平衡有细致的理解。
协调对于多智能体系统的成功至关重要,各种协调策略——如民主式、基于管理者的、层级式的、Actor-Critic 方法以及使用 ADAS 的自动化设计——在鲁棒性、效率和复杂性之间提供了不同的权衡。每种协调策略都提供了独特的优势和局限性,适用于特定场景,认真选择可以显著提高系统的有效性和可靠性。
同样关键的是通信基础设施的选择。随着系统的扩展,对智能体之间可靠、低延迟和持久消息传递的需求也在增加。虽然内存队列在简单的设置中可能就足够了,但生产级系统通常依赖消息代理(如 Kafka、NATS、RabbitMQ)、Actor 框架(如 Orleans、Akka)和工作流引擎(如 Temporal、Conductor)来管理通信以及状态、重试和执行持久性。设计有效的通信不仅仅是一个实现细节——它是塑造智能体如何在其环境中感知、响应和协作的首要关注点。为了辅助开发人员查看这些选项,表 8-2 总结了多智能体系统的关键通信和执行方法,比较了它们的概念、权衡以及在我们的供应链示例背景下的理想用例。
表 8-2. 智能体协调技术
| 方法 | 关键概念 | 优点 | 挑战 | 用例和示例 |
| 单容器部署 | 单容器中的单体智能体/服务;同步调用,内存状态/编排 | 设置简单,低延迟,易于原型设计 | 单点故障,可扩展性差,并发问题 | 原型中的基本供应链查询;具有有限智能体/工具的快速实验(例如,处理客户支持查询的单个智能体) |
| A2A 协议 | 通过 Agent Card 进行标准化发现、协商,用于结构化请求的 JSON-RPC;传输无关 (HTTP/gRPC) | 跨异构智能体互操作,模块化,安全通道 | 早期阶段(安全漏洞,不断发展的规范),发现开销 | 动态生态系统中的智能体协作(例如,供应链分析中的一个智能体请求另一个智能体进行摘要) |
| 消息代理 | 通过发布/订阅进行解耦异步消息传递(Kafka 用于持久性,Redis Stream 用于低延迟,NATS 用于实时) | 松散耦合,可扩展性,容错重放 | 最终一致性,复杂的错误处理,潜在延迟 | 供应链中的分布式任务路由(例如,监督者发布到流,专家订阅/处理/响应) |
| Actor 框架 | 有状态 Actor 按顺序处理消息(Ray 用于 Python/分布式,Orleans 用于虚拟 Actor,Akka 用于 JVM/性能) | 集成状态/行为,弹性(自动恢复),位置透明扩展 | 基础设施投资,框架锁定,每个 Actor 的顺序限制 | 供应链中的每会话隔离智能体(例如,为库存任务中的特定操作状态动态创建 Actor) |
通过理解这些因素并深思熟虑地进行应用,开发人员可以创建不仅稳健且有能力,而且准备好满足现实世界应用中日益复杂、动态任务需求的多智能体系统。这种战略方法使多智能体系统能够发展成为强大的解决方案,推动各个领域的有意义的进步。
-
Shengran Hu et al., “Automated Design of Agentic Systems”, paper presented at the International Conference on Learning Representations, Singapore, April 2025.
翻译整理自Building Applications with AI Agents一书,仅供学习交流使用





