首页手机【Workflow】基于Ernie&A2A协议Workflow篇

【Workflow】基于Ernie&A2A协议Workflow篇

圆圆2025-07-11 17:00:56次浏览条评论
1. 引言:AI Agent 的新时代

欢迎来到ai agent的新时代!随着大型语言模型(llms)如ernie的飞速发展,我们不再仅仅满足于ai的问答能力,而是期望它们能像智能助手一样,主动理解任务、拆解问题、调用工具、并与其他ai协作,最终完成复杂的目标。这就是 ai agent 的核心思想。

1.1 什么是 AI Agent?

简单来说,AI Agent 是一个能够感知其环境、进行决策并采取行动以达成特定目标的智能实体。在当前大模型的背景下,AI Agent 通常具备以下特征:

自主性 (Autonomy) :能够在没有人为干预的情况下独立运作。反应性 (Reactivity) :能够感知环境变化并作出响应。主动性 (Pro-activeness) :能够主动发起行为以达成目标,而不仅仅是被动响应。社交性 (Social ability) :能够通过某种通信语言与其他Agent(包括人类或其他AI Agent)进行交互。1.2 多 Agent 协作的意义和挑战

现实世界中的复杂问题往往需要多个不同角色、不同能力的个体协同合作才能解决。AI领域也是如此。一个全能的、无所不知的单一Agent模型往往难以构建且维护成本高昂。

多Agent协作的意义:

模块化与专业化 :可以将复杂任务分解给具有特定专长的Agent处理,例如一个Agent负责数据分析,另一个负责文本生成,第三个负责用户交互。可扩展性与灵活性 :更容易根据需求增加、移除或替换某个Agent,而不需要改动整个系统。鲁棒性 :即使某个Agent出现故障,其他Agent仍可能继续工作或接管任务。效率提升 :多个Agent可以并行处理任务的不同部分。

挑战:

通信标准 :Agent之间如何理解对方的意图和信息?任务分配与协调 :谁来决定哪个Agent执行哪个任务?如何协调它们的行为?知识共享与一致性 :如何确保Agent之间的信息同步和知识一致?发现机制 :Agent如何找到其他可以协作的Agent?1.3 A2A 协议简介:为 Agent 互操作性而生

为了解决上述挑战中的通信和发现问题,Google提出了 Agent-to-Agent (A2A) 协议。A2A协议旨在为AI Agent之间的交互建立一套标准的通信格式和规范。它使得由不同开发者、不同技术栈构建的Agent能够"说同一种语言",从而实现互操作和协作。

核心理念包括:

Agent Card (代理名片) :描述一个Agent的身份、能力、如何联系它等元数据。Message (消息) :Agent之间交换信息的基本单元。Task (任务) :描述一个需要Agent完成的具体工作,包含状态追踪。

python-a2a 是A2A协议的一个官方Python实现,它极大地简化了遵循A2A协议的Agent的开发。

1.4 Ernie 大模型简介:强大的中文理解与生成能力

Ernie 是由百度研发的知识增强大语言模型。在中文的自然语言理解、生成、对话等方面表现出色。在本教程中,我们将探讨如何将Ernie的智能融入到A2A Agent中,使其具备更强的任务处理和交互能力。

我们将使用如下方式与Ernie API进行交互(请确保你已获得有效的API Key和正确的Base URL):

import osfrom openai import OpenAIclient = OpenAI(     api_key="Your-sdk",  # 替换为你的 AI Studio 访问令牌     base_url="https://aistudio.baidu.com/llm/lmapi/v3",  # AI Studio 大模型 API 服务域名)# 示例:简单对话# chat_completion = client.chat.completions.create(#     messages=[#         {'role': 'system', 'content': '你是 AI Studio 实训AI开发平台的开发者助理,你精通开发相关的知识,负责给开发者提供搜索帮助建议。'},#         {'role': 'user', 'content': '你好,请介绍一下AI Studio'}#     ],#     model="ernie-3.5-8k", # 或者 ernie-4.0-8k 等其他适用模型# )# print(chat_completion.choices[0].message.content)
登录后复制

注意:上述代码中的API Key是一个示例,请替换为您自己的有效密钥。在后续教程中,我们会将实际的API调用注释掉或用模拟数据替代,以确保教程的通用性,但会指明在何处可以集成真实的LLM调用。

1.5 本教程目标

本教程旨在引导您:

理解A2A协议和MCP(Model Context Protocol)的核心概念。学习如何使用 python-a2a 库构建符合A2A协议的Agent。通过一个"本地智能旅行助手"的实例,实践如何将多个Agent组织成一个工作流来解决问题。探讨如何集成Ernie大模型来增强Agent的智能。强调A2A和MCP如何协同工作,发挥更强大的能力。2. 核心概念解析

在深入实践之前,让我们先打下坚实的理论基础。理解A2A协议和MCP是构建高效协作Agent系统的关键。

2.1 A2A (Agent-to-Agent) 协议详解

A2A协议为AI Agent之间的异步通信和协作提供了一个标准化的框架。它的设计目标是简单、可扩展且独立于特定的AI技术。

核心组件:

Agent Card (代理名片):

定义:一个JSON对象,包含了Agent的元数据信息,如同一个Agent的"身份证"和"能力说明书"。主要字段:name: Agent的名称,例如 "Weather Agent"。description: Agent功能的简短描述。url: Agent的API端点地址,其他Agent通过这个地址与之通信。version: Agent的版本号。skills: 一个列表,描述Agent拥有的技能(AgentSkill),每个技能通常有名称、描述和标签。capabilities: (可选) 更细致地描述Agent的能力,例如是否兼容某个特定协议 (google_a2a_compatible: true)。作用:使得Agent可以被发现、理解其能力并与之建立连接。

Message (消息):

定义:Agent之间交换信息的基本载体,通常用于非任务驱动的简单查询或通知。主要字段:message_id: 消息的唯一标识符。conversation_id: (可选) 用于关联同一对话中的多条消息。parent_message_id: (可选) 回复消息时,指明父消息的ID。role: 发送者的角色,通常是 MessageRole.USER 或 MessageRole.AGENT。content: 消息内容,可以是文本 (TextContent)、图片或其他类型的数据。timestamp: 消息发送的时间。使用场景:简单的问答、状态更新等。

Task (任务):

定义:当一个Agent需要另一个Agent执行一个具有明确目标、可能需要持续一段时间或有中间状态的工作时,会创建一个Task。主要字段:id: 任务的唯一标识符。message: (可选) 触发或描述该任务的初始消息。status: 任务的当前状态 (TaskStatus),包含:state: 任务状态,例如 TaskState.PENDING, TaskState.IN_PROGRESS, TaskState.COMPLETED, TaskState.FAILED, TaskState.INPUT_REQUIRED。message: (可选) 描述当前状态的附加消息。artifacts: (可选) 任务执行过程中产生的结果或文件。created_at, updated_at: 时间戳。使用场景:需要明确追踪进度的复杂请求,例如"为我规划去北京的三天行程并预订机票酒店"。

通信流程(简化版):

发现 (Discovery):Agent A 通过某种机制(如Agent注册中心,或直接知道URL)找到 Agent B 的 Agent Card。请求 (Request):Agent A 向 Agent B 的 /tasks/send (推荐) 或根路径 / (兼容旧版,发送Message) 端点发送一个 Task 或 Message。处理 (Processing):Agent B 接收到请求,根据其内部逻辑处理该 Task 或 Message。响应 (Response):对于Task,Agent B 会更新Task的状态和结果,Agent A 可以通过轮询或订阅来获取更新。对于Message,Agent B 通常会直接返回一个响应Message。

python-a2a 库提供了这些组件的Python类实现(如 AgentCard, Message, Task, A2AServer, A2AClient),使得开发者可以方便地创建和管理这些交互。

2.2 MCP (Model Context Protocol) 简介

虽然A2A协议定义了Agent之间如何"对话",但它本身并不关心Agent内部如何完成任务,特别是当Agent需要使用外部工具或数据源(如API、数据库、代码执行器等)时。

Model Context Protocol (MCP) 应运而生,它是一种开放标准,旨在让语言模型(或基于语言模型的Agent)能够以标准化的方式访问外部工具和数据源,从而获取"上下文"知识并执行动作。

MCP 的作用:

工具化LLMs:使LLMs不再仅仅是文本生成器,而是能通过调用工具来与外部世界交互,执行更广泛的任务。标准化工具接口:为工具的定义、发现和调用提供了一套统一的规范,使得不同的LLM和工具可以更容易地集成。

核心概念:

MCP Server (Tool Provider):一个实现了MCP规范的服务,它暴露一个或多个"工具"给MCP Client调用。MCP Client (Tool User):通常是一个LLM或其代理,它能够发现并调用MCP Server提供的工具。Tool Definition:描述一个工具的功能、输入参数、输出格式等。2.3 A2A 与 MCP 的区别与协作:强强联合

理解A2A和MCP的区别与联系至关重要,它们共同构成了构建高级AI Agent系统的基石。

区别:

特性A2A (Agent-to-Agent) ProtocolMCP (Model Context Protocol)主要目标定义 Agent之间 的通信和协作标准。定义 Agent/LLM与外部工具/数据源之间 的交互标准。关注点Agent的发现、对话管理、任务流转、Agent间互操作性。工具的定义、发现、调用、参数传递、结果返回,增强Agent的执行能力。交互对象通常是其他自主的AI Agent。通常是API、数据库、代码解释器、或其他形式的"工具服务"。协议层级更侧重于Agent间的"社交"和"协作"框架。更侧重于Agent执行具体动作时的"能力扩展"机制。

协作与协同:

A2A和MCP并非互斥,而是可以完美协同工作的。

A2A Agent 使用 MCP 工具: 一个遵循A2A协议的Agent(我们称之为"主Agent")在执行其接收到的A2A任务时,可能会发现自己需要某些特定能力(如获取实时股票价格、翻译文本、执行一段Python代码)。这时,这个"主Agent"可以扮演MCP Client的角色,去调用一个或多个实现了MCP协议的"工具Agent"或"工具服务"。

示例:一个"旅行规划A2A Agent"接收到用户的旅行规划任务。它需要查询实时天气,于是它通过MCP调用了一个"天气API工具服务";它还需要预订酒店,于是通过MCP调用了一个"酒店预订API工具服务"。

MCP 工具本身也可以是 A2A Agent: 一个提供MCP工具的服务,其内部逻辑也可能非常复杂,甚至它本身就是一个小型的A2A Agent网络。但从外部调用者的角度看,它只是一个MCP Tool Provider。

增强Agent的自主性和能力边界:

A2A使得Agent可以相互委托任务,形成复杂的协作网络。MCP则赋予了每个Agent连接和使用无限外部工具的能力。两者结合,可以构建出既能自主协作、又能调用广泛工具的强大AI Agent系统。Agent可以专注于自身的核心逻辑,而将通用能力(如搜索、计算、知识查询)交给专门的MCP工具。

一个形象的比喻:

A2A协议 就像一个公司的组织架构和部门间的沟通流程。它定义了"销售部Agent"如何与"技术部Agent"沟通,如何传递"客户需求Task"。MCP协议 就像每个部门员工可以使用的工具箱。"技术部Agent"在处理"客户需求Task"时,可能会用到MCP工具箱里的"数据库查询工具"、"代码编译工具"或"Ernie API调用工具"。

通过这种方式,A2A负责宏观的Agent协作流程,而MCP则在微观层面增强了每个Agent的实际执行能力。Ernie大模型既可以直接作为A2A Agent的核心智能,也可以被封装成一个MCP工具供其他Agent调用。

2.4 python-a2a 库概览

python-a2a 库为开发者提供了在Python中轻松实现A2A和MCP兼容Agent所需的一切。

主要组件与功能:

A2AServer: 用于创建A2A Agent服务的基础类。你需要继承它并实现 handle_task (处理任务) 和/或 handle_message (处理消息) 方法。AgentCard, AgentSkill: 用于定义你的Agent的"名片"和能力。A2AClient: 用于与其他A2A Agent进行通信的客户端。AgentNetwork: 用于管理和发现多个Agent的实用工具,可以将多个Agent的URL注册到一个网络中,方便统一调用。Flow: 强大的工作流引擎,允许你用链式API定义多步骤、条件分支、并行执行的Agent交互流程。例如:flow.ask("agent1", "问题1").if_contains("条件").ask("agent2", "问题2").else_branch().ask("agent3", "问题3").end_if()MCP支持 (python_a2a.mcp): 提供了 FastMCP (用于快速创建MCP工具服务) 和相关工具函数,可以将普通Python函数轻松封装成MCP工具。LangChain集成 (python_a2a.langchain): 允许在A2A Agent和LangChain框架之间无缝转换工具和Agent,例如将LangChain的Tool转换为MCP Server,或将MCP Tool转换为LangChain Tool。LLM客户端集成: 内置了对OpenAI, Anthropic等LLM提供商的A2A客户端封装,方便将这些LLM直接作为A2A Agent使用。

在接下来的实战演练中,我们将大量使用这些组件。

3. 实战演练:构建一个本地智能旅行助手

理论学习完毕,让我们动手实践!我们将构建一个简单的"本地智能旅行助手"。用户输入一个城市名称,AI助手将查询该城市的天气,并根据天气情况推荐合适的室内或室外活动。这个例子将主要借鉴 官方的basic_workflow.py 的思路,并融入Ernie的思考。

3.1 场景设定与目标用户输入:一个城市名称 (例如: "北京")。Agent 1 (WeatherAgent):接收城市名称,返回该城市当前的天气状况(为简化,本教程使用模拟数据,但会指出如何接入真实API或Ernie)。Agent 2 (ActivityAgent):接收城市名称和天气状况,返回推荐的活动列表(同样,模拟数据为主,可扩展为Ernie生成)。工作流 (Flow):协调WeatherAgent和ActivityAgent,完成从获取天气到推荐活动的整个流程。目标:通过Jupyter Notebook展示如何定义、运行这些Agent和工作流,并得到最终的活动推荐。3.2 步骤 1:准备工作 - Ernie API 对接与环境配置

首先,确保你已经安装了 python-a2a 库。如果还没有,可以通过pip安装(建议安装all extras以便体验全部功能):

在你的终端或Jupyter Notebook的代码单元格中运行 (前面加上 !)

pip install "python-a2a[all]"

接下来,是Ernie API的对接代码。我们再次列出,并假设它保存在一个名为 ernie_client.py 的文件中,或者在Notebook的初始化代码块中定义。

# ernie_client.py (或Notebook初始化块)import osfrom openai import OpenAI# AI Studio Ernie API 配置# 请确保使用你自己的有效API Key和正确的Base URLASTUDIO_API_KEY = os.getenv("AISTUDIO_API_KEY", "") # 从环境变量读取或直接填写ASTUDIO_BASE_URL = os.getenv("AISTUDIO_BASE_URL", "https://aistudio.baidu.com/llm/lmapi/v3")client = OpenAI(    api_key=ASTUDIO_API_KEY,    base_url=ASTUDIO_BASE_URL,)def get_ernie_response(user_prompt, system_prompt="你是一个乐于助人的AI助手。", model="ernie-3.5-8k"):    """调用Ernie API获取回复"""    try:        chat_completion = client.chat.completions.create(            messages=[                {'role': 'system', 'content': system_prompt},                {'role': 'user', 'content': user_prompt}            ],            model=model,        )        return chat_completion.choices[0].message.content    except Exception as e:        print(f"调用Ernie API失败: {e}")        return f"无法获取Ernie的回应,错误: {e}" # 返回错误信息,而不是None# 测试一下 (可选)# if __name__ == "__main__":#     response = get_ernie_response("你好,请介绍一下北京的天气如何?")#     print(response)
登录后复制

重要提示:

为了教程的通用性和避免不必要的API调用开销,在后续Agent的实现中,我们将主要使用 模拟数据。但我们会清晰地标出哪些地方可以用 get_ernie_response 函数来替代或增强,从而让Agent变得更"智能"。实际项目中,API Key不应硬编码在代码中,建议使用环境变量或配置文件管理。3.3 步骤 2:创建基础 Agent

我们将创建两个Agent:WeatherAgent 和 ActivityAgent。每个Agent都会在一个独立的Python脚本中定义,并通过Flask启动为HTTP服务。这是A2A Agent的标准运行方式,使得它们可以被网络中的其他Agent或工作流发现和调用。

3.3.1 WeatherAgent

WeatherAgent 负责提供天气信息。创建一个名为 weather_agent_server.py 的文件。

# weather_agent_server.pyimport timefrom flask import Flask, request, jsonifyfrom python_a2a import A2AServer, AgentCard, AgentSkill, Task, TaskStatus, TaskState, Message, TextContent, MessageRoleclass WeatherAgent(A2AServer):    def __init__(self, port=5001):        self.port = port        agent_card = AgentCard(            name="Weather Agent",            description="提供指定城市的天气信息",            url=f"http://localhost:{self.port}",            version="1.0.0",            skills=[                AgentSkill(                    name="GetCityWeather",                    description="获取一个城市的当前天气",                    tags=["weather", "location", "forecast"]                )            ]        )        super().__init__(agent_card=agent_card)    def _get_simulated_weather(self, city):        print(f"[WeatherAgent] 正在为城市 '{city}' 查询模拟天气...")        time.sleep(1) # 模拟网络延迟        weather_conditions = {            "北京": "晴朗,25°C,微风。",            "上海": "多云转小雨,22°C,东南风3级。",            "广州": "雷阵雨,28°C,注意防范。",            "深圳": "晴间多云,29°C,空气质量良。",            "伦敦": "持续小雨,15°C,请带好雨具。",            "巴黎": "阳光明媚,23°C,适合户外活动。"        }        return weather_conditions.get(city, f"抱歉,暂时无法获取'{city}'的天气信息。也许可以问问Ernie?")    # 如何集成Ernie (示例性注释,实际使用时需要取消注释并确保 ernie_client.py 可导入)    # from ernie_client import get_ernie_response    # def _get_weather_from_ernie(self, city):    #     print(f"[WeatherAgent] 正在通过Ernie查询城市 '{city}' 的天气...\")    #     prompt = f"请告诉我{city}今天的天气怎么样?请简要描述。\"    #     weather_info = get_ernie_response(prompt, system_prompt="你是一个天气预报助手。")    #     return weather_info    def handle_task(self, task: Task) -> Task:        query_text = ""        if task.message and task.message.content and hasattr(task.message.content, 'text'):            query_text = task.message.content.text                city = query_text # 假设查询文本直接是城市名                # **核心逻辑:获取天气**        # 默认使用模拟数据        weather_data = self._get_simulated_weather(city)        # # 如果想使用Ernie获取天气 (取消下面一行的注释,并注释掉上面一行):        # weather_data = self._get_weather_from_ernie(city)        print(f"[WeatherAgent] 为 '{city}' 获取到的天气: {weather_data}")        task.artifacts = [{            "parts": [{"type": "text", "text": weather_data}]        }]        task.status = TaskStatus(state=TaskState.COMPLETED)        return task# ---- Flask App Setup ----def create_app(agent_instance):    app = Flask(__name__)    @app.route('/agent.json', methods=['GET'])    def get_agent_card():        return jsonify(agent_instance.agent_card.to_dict())    @app.route('/a2a/agent.json', methods=['GET'])    def get_a2a_agent_card(): # A2A标准兼容端点        return jsonify(agent_instance.agent_card.to_dict())    @app.route('/tasks/send', methods=['POST'])    def handle_send_task():        try:            task_data = request.json            # 兼容JSON-RPC风格的请求            if "jsonrpc" in task_data and "method" in task_data and task_data["method"] == "tasks/send":                task_dict = task_data.get("params", {})            else:                task_dict = task_data                        task = Task.from_dict(task_dict)            updated_task = agent_instance.handle_task(task)            response_data = updated_task.to_dict()            if "jsonrpc" in task_data:                 return jsonify({"jsonrpc": "2.0", "id": task_data.get("id"), "result": response_data})            return jsonify(response_data)        except Exception as e:            error_response = {"code": -32603, "message": str(e)}            if "jsonrpc" in request.json:                return jsonify({"jsonrpc": "2.0", "id": request.json.get("id"), "error": error_response}), 500            return jsonify({"error": error_response}), 500                # 兼容旧版Message端点 (可选,推荐使用Task)    @app.route('/', methods=['POST'])    def handle_root_message():        try:            data = request.json            # 尝试将整个请求体作为Task处理            if isinstance(data, dict) and "id" in data and "status" in data: # 看起来像Task                 return handle_send_task()            # 否则,尝试作为Message处理 (简化版)            text_content = data.get("content", {}).get("text", "") if isinstance(data.get("content"), dict) else str(data)            message = Message(content=TextContent(text=text_content), role=MessageRole.USER)            # 模拟任务转换            task = Task(id=f"task-{time.time_ns()}", message=message, status=TaskStatus(state=TaskState.PENDING))            updated_task = agent_instance.handle_task(task)            # 从Task结果中提取文本给Message响应            response_text = "处理完成。"            if updated_task.artifacts and updated_task.artifacts[0].get("parts"):                response_text = updated_task.artifacts[0]["parts"][0].get("text", response_text)                        response_message = Message(                content=TextContent(text=response_text),                role=MessageRole.AGENT,                message_id=f"response-{time.time_ns()}",                parent_message_id=data.get("message_id")            )            return jsonify(response_message.to_dict())        except Exception as e:            return jsonify({"error": str(e)}), 400                return appif __name__ == "__main__":    PORT = 5001    weather_agent = WeatherAgent(port=PORT)    app = create_app(weather_agent)    print(f"WeatherAgent 正在启动,监听端口: http://localhost:{PORT}")    app.run(host='0.0.0.0', port=PORT, debug=False)
登录后复制

终端运行 WeatherAgent:

打开一个新的终端窗口,进入该文件所在目录,然后运行:

python weather_agent_server.py
登录后复制

你应该能看到类似 "WeatherAgent 正在启动,监听端口: http://localhost:5001" 的输出。这个Agent现在就在后台运行了,等待请求。

3.3.2 ActivityAgent

ActivityAgent 负责根据天气推荐活动。创建一个名为 activity_agent_server.py 的文件。

# activity_agent_server.pyimport timeimport re # 用于从查询中提取天气信息from flask import Flask, request, jsonifyfrom python_a2a import A2AServer, AgentCard, AgentSkill, Task, TaskStatus, TaskState, Message, TextContent, MessageRoleclass ActivityAgent(A2AServer):    def __init__(self, port=5002):        self.port = port        agent_card = AgentCard(            name="Activity Agent",            description="根据城市和天气推荐活动",            url=f"http://localhost:{self.port}",            version="1.0.0",            skills=[                AgentSkill(                    name="GetActivityRecommendations",                    description="获取基于天气和地点的活动推荐",                    tags=["activity", "recommendation", "travel", "leisure"]                )            ]        )        super().__init__(agent_card=agent_card)    def _get_simulated_activities(self, city, weather_condition):        print(f"[ActivityAgent] 正在为 '{city}' (天气: {weather_condition}) 生成模拟活动推荐...")        time.sleep(1.5) # 模拟处理时间        is_rainy_or_bad_weather = any(kw in weather_condition.lower() for kw in ["雨", "雷", "雪", "糟", "霾"])                recommendations = {            "北京": {                "indoor": "故宫博物院深度游、国家博物馆看展、798艺术区感受艺术氛围。",                "outdoor": "颐和园漫步、长城徒步(天气好时)、什刹海胡同骑行。"            },            "上海": {                "indoor": "上海博物馆探索历史、中华艺术宫赏析艺术、体验密室逃脱。",                "outdoor": "外滩漫步欣赏万 国建筑群、南京路步行街购物、豫园游览。"            },            "广州": {                "indoor": "广东省博物馆、广州塔室内观光层、正佳广场极地海洋世界。",                "outdoor": "白云山登高望远、越秀公园五羊石像、珠江夜游。"            },            "深圳": {                "indoor": "深圳博物馆、当代艺术与城市规划馆、万象城购物体验。",                "outdoor": "世界之窗(天气好时)、莲花山公园放风筝、深圳湾公园骑行。"            },            "伦敦": {                "indoor": "大英博物馆、国家美术馆、自然历史博物馆。",                "outdoor": "海德公园野餐、伦敦眼俯瞰城市、泰晤士河畔散步。"            },            "巴黎": {                "indoor": "卢浮宫、奥赛博物馆、蓬皮杜艺术中心。",                "outdoor": "埃菲尔铁塔下草坪休闲、塞纳河畔漫步、蒙马特高地探索。"            }        }                city_activities = recommendations.get(city, {            "indoor": f"在{city}可以逛逛当地的博物馆或购物中心。",            "outdoor": f"在{city}可以探索当地的公园或著名地标。"        })                if is_rainy_or_bad_weather:            return f"由于{city}的天气是'{weather_condition}',推荐室内活动:{city_activities['indoor']}"        else:            return f"{city}天气'{weather_condition}',很棒!推荐户外活动:{city_activities['outdoor']}"    # 如何集成Ernie (示例性注释)    # from ernie_client import get_ernie_response    # def _get_activities_from_ernie(self, city, weather_condition):    #     print(f"[ActivityAgent] 正在通过Ernie为 '{city}' (天气: {weather_condition}) 生成活动推荐...\")    #     prompt = f"我现在在{city},天气是{weather_condition}。请为我推荐一些合适的活动,户外和室内都可以考虑,并简要说明理由。\"    #     activities = get_ernie_response(prompt, system_prompt="你是一个旅行活动推荐助手。")    #     return activities    def _extract_info_from_query(self, query_text):        # 这是一个非常简化的提取逻辑,实际应用中可能需要更复杂的NLP        # 假设查询格式是 "为[城市]在[天气状况]下推荐活动"        # 或者直接从上下文中获取城市和天气        city = "未知城市"        weather = "未知天气"        # 尝试从文本中提取城市,例如北京、上海等        known_cities = ["北京", "上海", "广州", "深圳", "伦敦", "巴黎"]        for c in known_cities:            if c in query_text:                city = c                break                # 尝试提取天气信息,例如包含"晴朗"、"雨"等关键词        # 这里我们假设天气信息在查询文本的后半部分,或者作为独立参数传入        # 在我们的工作流中,天气信息会由WeatherAgent提供,并作为输入的一部分        # 所以这里的提取逻辑更多是针对直接调用此Agent的情况        weather_keywords = {            "晴朗": ["晴", "太阳", "sunny"],            "多云": ["多云", "cloudy"],            "阴": ["阴天", "overcast"],            "雨": ["雨", "rainy", "雷阵雨"],            "雪": ["雪", "snowy"]        }        for condition, kws in weather_keywords.items():            if any(kw in query_text for kw in kws):                weather = condition                break        # 如果没有通过关键词匹配到,则尝试取查询文本中关于天气的描述        # 比如,如果query_text是 "北京的天气是晴朗,25°C,微风。" 中的后半段        # 这里的逻辑需要根据实际的输入格式来完善        if weather == "未知天气": #如果上面没匹配到,尝试更通用的提取            match = re.search(r"天气是([^,。]+)|状况是([^,。]+)", query_text)            if match:                weather = match.group(1) or match.group(2) or "宜人"            elif "天气" in query_text: # 退化情况,如果只说了天气,但没描述                 weather = "宜人(具体情况未知)"        return city, weather    def handle_task(self, task: Task) -> Task:        query_text = ""        if task.message and task.message.content and hasattr(task.message.content, 'text'):            query_text = task.message.content.text # 例如 "为北京在晴朗,25°C,微风。下推荐活动"                # 从查询文本中解析出城市和天气状况        # 在我们的工作流中,城市是初始输入,天气是上一个Agent的输出        # query_text的格式会是 Flow 构建的,例如 "Recommend activities in Beijing given weather: 晴朗,25°C,微风。"                city_match = re.search(r"in (\w+)", query_text, re.IGNORECASE)        city = city_match.group(1) if city_match else "北京" # 默认北京                weather_match = re.search(r"weather: (.+)", query_text, re.IGNORECASE)        weather_condition = weather_match.group(1) if weather_match else "天气不错" # 默认天气不错        if not weather_match: # 尝试从更简单的格式提取天气,如果上一个Agent直接返回天气描述            if "推荐活动" in query_text: # 假设我们构建的查询是 "为[城市]在[天气描述]下推荐活动"                 parts = query_text.split("在")                 if len(parts) > 1:                    potential_weather = parts[-1].replace("下推荐活动","").strip()                    if potential_weather:                        weather_condition = potential_weather        # **核心逻辑:获取活动推荐**        # 默认使用模拟数据        activity_data = self._get_simulated_activities(city, weather_condition)        # # 如果想使用Ernie获取活动 (取消下面一行的注释,并注释掉上面一行):        # activity_data = self._get_activities_from_ernie(city, weather_condition)                print(f"[ActivityAgent] 为 '{city}' (天气: {weather_condition}) 推荐的活动: {activity_data}")        task.artifacts = [{            "parts": [{"type": "text", "text": activity_data}]        }]        task.status = TaskStatus(state=TaskState.COMPLETED)        return task# ---- Flask App Setup (与WeatherAgent类似) ----def create_app(agent_instance):    app = Flask(__name__)    @app.route('/agent.json', methods=['GET'])    def get_agent_card():        return jsonify(agent_instance.agent_card.to_dict())    @app.route('/a2a/agent.json', methods=['GET'])    def get_a2a_agent_card():        return jsonify(agent_instance.agent_card.to_dict())    @app.route('/tasks/send', methods=['POST'])    def handle_send_task():        try:            task_data = request.json            if "jsonrpc" in task_data and "method" in task_data and task_data["method"] == "tasks/send":                task_dict = task_data.get("params", {})            else:                task_dict = task_data                        task = Task.from_dict(task_dict)            updated_task = agent_instance.handle_task(task)            response_data = updated_task.to_dict()            if "jsonrpc" in task_data:                 return jsonify({"jsonrpc": "2.0", "id": task_data.get("id"), "result": response_data})            return jsonify(response_data)        except Exception as e:            error_response = {"code": -32603, "message": str(e)}            if "jsonrpc" in request.json:                return jsonify({"jsonrpc": "2.0", "id": request.json.get("id"), "error": error_response}), 500            return jsonify({"error": error_response}), 500                @app.route('/', methods=['POST'])    def handle_root_message(): # 兼容旧版        try:            data = request.json            if isinstance(data, dict) and "id" in data and "status" in data:                 return handle_send_task()            text_content = data.get("content", {}).get("text", "") if isinstance(data.get("content"), dict) else str(data)            message = Message(content=TextContent(text=text_content), role=MessageRole.USER)            task = Task(id=f"task-{time.time_ns()}", message=message, status=TaskStatus(state=TaskState.PENDING))            updated_task = agent_instance.handle_task(task)            response_text = "处理完成。"            if updated_task.artifacts and updated_task.artifacts[0].get("parts"):                response_text = updated_task.artifacts[0]["parts"][0].get("text", response_text)            response_message = Message(                content=TextContent(text=response_text),                role=MessageRole.AGENT,                message_id=f"response-{time.time_ns()}",                parent_message_id=data.get("message_id")            )            return jsonify(response_message.to_dict())        except Exception as e:            return jsonify({"error": str(e)}), 400                return appif __name__ == "__main__":    PORT = 5002    activity_agent = ActivityAgent(port=PORT)    app = create_app(activity_agent)    print(f"ActivityAgent 正在启动,监听端口: http://localhost:{PORT}")    app.run(host='0.0.0.0', port=PORT, debug=False)
登录后复制

终端运行 ActivityAgent:

打开 另一个新的 终端窗口(保持WeatherAgent的终端仍在运行),进入该文件所在目录,然后运行:

python activity_agent_server.py
登录后复制

你应该能看到类似 "ActivityAgent 正在启动,监听端口: http://localhost:5002" 的输出。现在,我们有了两个独立的Agent服务在后台运行了。

注意:上述Agent代码中的Flask部分是为了能独立运行和被调用,python-a2a也支持在其他Web框架中集成,或者不通过HTTP直接在代码中实例化和调用Agent Client(例如 agents_workflow.py 示例中的LLM Client)。为了清晰展示A2A的分布式特性,我们这里采用了独立服务的方式。

3.4 步骤 3:定义工作流 (Flow) 并执行

现在两个Agent都已就绪,我们可以在Jupyter Notebook(或一个新的Python脚本 run_workflow.py)中定义和执行工作流了。这部分代码不需要在终端预先运行,而是直接在Notebook中执行。

# 在Jupyter Notebook单元格中或 run_workflow.py 中执行import timefrom python_a2a import AgentNetwork, Flow, Message, TextContent, MessageRole,A2AClient# 0. (可选) 导入Ernie客户端,如果要在工作流本身或结果处理中使用from ernie_client import get_ernie_response# 1. 定义Agent网络# 这里的URL应与我们启动Agent服务时指定的端口一致WEATHER_AGENT_URL = "http://localhost:5001"ACTIVITY_AGENT_URL = "http://localhost:5002"# 确保WeatherAgent和ActivityAgent服务已在终端启动并正在运行!# 可以简单测试一下Agent是否可达 (可选)import requeststry:    print(f"尝试连接WeatherAgent: {WEATHER_AGENT_URL}/agent.json")    weather_card = requests.get(f"{WEATHER_AGENT_URL}/agent.json", timeout=3).json()    print(f"WeatherAgent连接成功: {weather_card.get('name')}")    print(f"尝试连接ActivityAgent: {ACTIVITY_AGENT_URL}/agent.json")    activity_card = requests.get(f"{ACTIVITY_AGENT_URL}/agent.json", timeout=3).json()    print(f"ActivityAgent连接成功: {activity_card.get('name')}")except requests.exceptions.ConnectionError as e:    print(f"连接Agent失败! 请确保 weather_agent_server.py 和 activity_agent_server.py 正在运行。错误: {e}")    # 如果连接失败,后续工作流会出错,可以根据情况决定是否中止    # raise SystemExit("Agent服务未运行,工作流无法执行。") from eagent_network = AgentNetwork(name="Local Travel Assistant Network")# 直接使用URL添加Agent,AgentNetwork.add内部会创建A2AClient# 第二个参数名为 agent_or_url,可以直接传递URL字符串agent_network.add(name="weather_service", agent_or_url=WEATHER_AGENT_URL)agent_network.add(name="activity_service", agent_or_url=ACTIVITY_AGENT_URL)print("\nAgent网络已配置:")for agent_info in agent_network.list_agents():    print(f"- {agent_info['name']}: {agent_info['url']}")# 2. 定义工作流# 用户希望查询的城市user_city_input = "北京"# user_city_input = "伦敦"# user_city_input = "火星" # 测试一下Agent的边界情况print(f"\n准备为城市 '{user_city_input}'规划行程。")# 使用Flow API定义工作流程# Flow的每一步都像是在构建一个请求链travel_flow = Flow(agent_network=agent_network, name="Smart Travel Planning Flow")# 第一步:调用 WeatherAgent 获取天气# .ask(agent_name_in_network, query_string_or_Task_Message_or_lambda)# 如果是字符串,则会被自动包装travel_flow.ask(    agent_name="weather_service",     query=user_city_input) # 第二步:条件分支,根据天气推荐活动travel_flow.if_contains("雨") # 如果天气结果包含"雨"# 如果包含"雨" (IF分支):# 2a. 使用 FunctionStep 构造特定于此分支的查询字符串travel_flow.execute_function(    lambda weather_report_from_step1: f"Recommend indoor activities in {user_city_input} given weather: {weather_report_from_step1}",    "{1}" # 参数:传递天气服务步骤的结果给 lambda)# 2b. 调用 ActivityAgent,使用 FunctionStep 生成的查询 (现在在 latest_result 中)travel_flow.ask(    agent_name="activity_service",    query="{latest_result}" # QueryStep 将从 context.data['latest_result'] 获取查询字符串)# 如果不包含"雨" (ELSE分支):travel_flow.else_branch()# 2c. 使用 FunctionStep 构造特定于此分支的查询字符串travel_flow.execute_function(    lambda weather_report_from_step1: f"Recommend outdoor activities in {user_city_input} given weather: {weather_report_from_step1}",    "{1}" # 参数:传递天气服务步骤的结果给 lambda)# 2d. 调用 ActivityAgent,使用 FunctionStep 生成的查询 (现在在 latest_result 中)travel_flow.ask(    agent_name="activity_service",    query="{latest_result}" # QueryStep 将从 context.data['latest_result'] 获取查询字符串)# 结束条件分支travel_flow.end_if()# 3. 执行工作流print("\n---- 开始执行工作流 ----")start_time = time.time()# flow.run_sync() 会同步执行整个流程并返回最终步骤的结果# 对于更复杂的场景,可以使用 flow.run() (异步) 或 flow.stream() (流式处理)# 在Jupyter Notebook等已运行asyncio事件循环的环境中,应使用 await 调用异步方法try:    final_result = await travel_flow.run() # context可以传递初始数据,这里我们简单处理        print("---- 工作流执行完毕 ----")    print(f"总耗时: {time.time() - start_time:.2f} 秒")        print("\n========= 最终行程推荐 ==========")    if final_result:        print(final_result)        # 你也可以在这里用Ernie对结果进行润色或总结        ernie_summary = get_ernie_response(f"请帮我总结以下旅行建议,使其更吸引人:\n{final_result}")        print("\n========= Ernie优化后的建议 ==========")        print(ernie_summary)    else:        print("未能获取到最终结果。")        print("\n工作流各步骤结果:")        for step_num, step_result in travel_flow.results.items():            print(f"步骤 {step_num}: {step_result}")except Exception as e:    print(f"工作流执行失败: {e}")    print("详细错误信息:")    import traceback    traceback.print_exc()    print("\n工作流部分结果(如果存在):")    if hasattr(travel_flow, 'results'):        for step_num, step_result in travel_flow.results.items():            print(f"步骤 {step_num}: {step_result}")
登录后复制
INFO:python_a2a.client.network:Added agent 'weather_service' from URL: http://localhost:5001INFO:python_a2a.client.network:Added agent 'activity_service' from URL: http://localhost:5002
登录后复制
尝试连接WeatherAgent: http://localhost:5001/agent.jsonWeatherAgent连接成功: Weather Agent尝试连接ActivityAgent: http://localhost:5002/agent.jsonActivityAgent连接成功: Activity AgentAgent网络已配置:- weather_service: http://localhost:5001- activity_service: http://localhost:5002准备为城市 '北京'规划行程。---- 开始执行工作流 -------- 工作流执行完毕 ----总耗时: 2.57 秒========= 最终行程推荐 =========={"artifacts":[{"parts":[{"text":"\u5317\u4eac\u5929\u6c14'\u5929\u6c14\u4e0d\u9519'\uff0c\u5f88\u68d2\uff01\u63a8\u8350\u6237\u5916\u6d3b\u52a8\uff1a\u9890\u548c\u56ed\u6f2b\u6b65\u3001\u957f\u57ce\u5f92\u6b65\uff08\u5929\u6c14\u597d\u65f6\uff09\u3001\u4ec0\u5239\u6d77\u80e1\u540c\u9a91\u884c\u3002","type":"text"}]}],"id":"9bf71300-715e-4f76-9890-28a92c928a6d","metadata":{"message_id":"c8fdf577-28ba-4d79-b7d9-8099d7cb8315"},"sessionId":"76cef800-1e60-42f9-afb3-19d1f82c27b1","status":{"state":"completed","timestamp":"2025-05-16T15:07:10.761995"}}
登录后复制
INFO:httpx:HTTP Request: POST https://aistudio.baidu.com/llm/lmapi/v3/chat/completions "HTTP/1.1 200 OK"
登录后复制
========= Ernie优化后的建议 ==========探索北京,尽享绝美天气!北京的天蓝云白,简直棒极了!强烈推荐你体验户外活动的乐趣:- **饕餮和园漫步**:在颐和园的美丽风光中悠闲散步,感受皇家园林的宏伟与细腻。- **长城徒步**(天气晴好时):踏上长城,挑战自我,俯瞰壮丽山河,领略历史的厚重。- **什刹海胡同骑行**:穿梭在北京的老胡同中,体验地道的京味儿文化,享受骑行的乐趣。快来北京,让每一次呼吸都成为享受,让每一步行走都充满惊喜!
登录后复制3.5 步骤 4:(可选进阶) 引入 Ernie 增强 Agent 智能

在上面的 WeatherAgent 和 ActivityAgent 的代码中,我们已经用注释标出了可以集成 get_ernie_response 函数的地方。

如何操作:

修改Agent代码:在 weather_agent_server.py 中,取消 _get_weather_from_ernie 方法的注释,并修改 handle_task 让其调用此方法。在 activity_agent_server.py 中,取消 _get_activities_from_ernie 方法的注释,并修改 handle_task 让其调用此方法。确保这些Agent脚本可以正确导入 ernie_client.py 中的 get_ernie_response (如果 ernie_client.py 和Agent脚本在同一目录,直接 from ernie_client import get_ernie_response 即可;否则需要配置Python的搜索路径 sys.path)。重启Agent服务:回到运行 weather_agent_server.py 和 activity_agent_server.py 的终端,按 Ctrl+C 停止它们。然后重新运行 python weather_agent_server.py 和 python activity_agent_server.py 以加载修改后的代码。重新执行工作流:再次运行Jupyter Notebook中定义和执行工作流的代码单元格。现在,Agent的响应将由Ernie生成(或者至少尝试由Ernie生成,如果API调用失败会有提示)。

思考与挑战:

提示工程 (Prompt Engineering):如何设计好的prompt让Ernie返回我们期望格式和内容的答复?例如,要求Ernie只返回天气描述,不要有额外寒暄。错误处理:如果Ernie API调用失败或返回不符合预期的内容,Agent应如何处理?(我们的示例中简单打印了错误并返回了错误信息字符串,实际应用中可能需要更完善的fallback机制)。成本与延迟:调用LLM API会带来额外的成本和时间延迟。需要权衡哪些步骤适合用LLM增强,哪些用本地逻辑或简单模型即可。上下文传递:在多轮对话或复杂工作流中,如何有效地将历史信息和中间结果作为上下文传递给Ernie?

这个实战演练展示了A2A Agent和工作流的基本构建方法。通过将特定功能封装到独立的Agent中,并通过工作流将它们串联起来,我们可以构建出模块化、可扩展的AI应用。而Ernie的集成则为这些Agent注入了更高级的智能。


4. 深入探讨:A2A 与 MCP 的协同威力

通过前面的实战,我们已经体验了如何构建基础的A2A Agent并用工作流将它们组织起来。现在,让我们深入探讨A2A与MCP如何协同工作,以及如何借鉴 python-a2a 库中 agents_workflow.py 和 parallel_workflow.py 的思想,来构建更复杂、更智能、更高效的系统。

4.1 场景扩展 1:智能路由与多LLM协作 (借鉴 python-a2a官方agents_workflow.py)

在 agents_workflow.py 示例中,展示了如何根据任务类型(如创作、技术、分析)将用户查询路由到不同的LLM Agent(如GPT-4, Claude等)。这种模式对于充分发挥不同模型的优势、控制成本以及处理特定领域的任务非常有用。

如何应用于Ernie生态?

百度AI Studio平台通常会提供不同版本、不同能力的Ernie模型(例如 ernie-speed, ernie-lite, ernie-3.5-8k, ernie-x1 等)。它们在性能、成本、上下文长度、特定任务的擅长程度上可能有所不同。

我们可以构建一个类似的智能路由系统:

创建多个Ernie Agent: 将不同版本的Ernie模型封装成独立的A2A Agent。每个Agent的 AgentCard 可以描述其特点(如"快速响应型Ernie"、"长文本处理型Ernie"、"高级创作型Ernie")。

例如,ErnieLiteAgent 使用 ernie-lite 模型,适合快速、简单的问答。ErnieProAgent 使用 ernie-4.0-8k 模型,适合复杂推理和高质量内容生成。

终端运行 :这些Agent同样可以作为独立服务运行,每个监听不同端口。

# 假设你有 ernie_lite_agent_server.py 和 ernie_pro_agent_server.pypython ernie_lite_agent_server.py # 例如运行在 5003 端口python ernie_pro_agent_server.py  # 例如运行在 5004 端口
登录后复制

创建"任务分类与路由Agent" (RouterAgent) : 这个Agent的核心职责是接收用户原始查询,然后决定哪个Ernie Agent最适合处理这个查询。

实现方式1 (基于规则或简单模型):可以像 agents_workflow.py 中的 determine_task_type 函数一样,通过关键词匹配、查询长度等简单规则进行判断。实现方式2 (基于LLM判断):RouterAgent本身也可以调用一个轻量级的Ernie模型(比如 ernie-lite)来分析用户查询的意图和复杂度,然后输出一个目标Agent的名称。
# RouterAgent 的 handle_task 逻辑片段# from ernie_client import get_ernie_response # 假设已配置user_query = task.message.content.text# prompt_for_routing = f"用户的问题是:'{user_query}'。请判断这个问题应该由 轻量级问答模型 还是 高级分析模型 处理?请只回答 '轻量级' 或 '高级'。"# routing_decision = get_ernie_response(prompt_for_routing, model="ernie-lite") # # 根据 routing_decision 选择目标Agent的名称或标识# if "高级" in routing_decision:#     target_agent_name = "ErnieProAgent"# else:#     target_agent_name = "ErnieLiteAgent"# task.artifacts = [{"parts": [{"type": "text", "text": target_agent_name}]}] # 返回目标Agent的名字
登录后复制终端运行:RouterAgent也需要作为服务运行,例如在5005端口。

定义路由工作流 (Flow) :

# 在Jupyter Notebook或主控脚本中# agent_network.add("router", "http://localhost:5005")# agent_network.add("ernie_lite", "http://localhost:5003")# agent_network.add("ernie_pro", "http://localhost:5004")# routing_flow = Flow(agent_network)# routing_flow.ask("router", user_initial_query) # 第一步,让RouterAgent决定用哪个Ernie# # 第二步,根据RouterAgent的结果,动态调用相应的Ernie Agent# # 使用 .route_to_agent() 或更通用的 .execute_function() 结合 .ask()# def decide_next_agent(results, context):#     chosen_agent_name = results['1'] # RouterAgent返回的目标Agent名#     if chosen_agent_name == "ErnieProAgent":#         return {"agent": "ernie_pro", "query": context["original_query"]}#     else:#         return {"agent": "ernie_lite", "query": context["original_query"]}# routing_flow.execute_function(decide_next_agent, context={"original_query": user_initial_query})# routing_flow.ask(lambda res, ctx: res['2']['agent'], lambda res, ctx: res['2']['query'])# final_answer = routing_flow.run_sync()
登录后复制

python-a2a 的 AIAgentRouter 类提供了更成熟的LLM驱动路由方案,它能根据Agent的描述自动选择最合适的Agent。这里我们用 Flow 的基本功能来示意这个概念。

意义:

成本效益:简单任务用低成本模型,复杂任务用高成本模型。性能优化:针对性使用最擅长该任务的模型。系统专业化:每个Ernie Agent可以有不同的system prompt和微调(如果支持),使其更专注于特定领域。4.2 场景扩展 2:并行处理与效率提升 (借鉴 python-a2a官方parallel_workflow.py)

parallel_workflow.py 示例通过模拟延迟展示了并行执行多个独立Agent任务的优势,可以显著减少总等待时间。

如何应用?

假设我们需要为一个复杂的报告搜集多方面信息:

Task 1: 调用 ErnieKnowledgeAgent (基于Ernie,负责通用知识问答) 获取背景资料。Task 2: 调用 ErnieAnalysisAgent (基于Ernie,负责数据分析和洞察) 对一组数据进行分析。Task 3: 调用 ErnieTranslationAgent (基于Ernie,负责翻译) 将一段英文材料翻译成中文。

这三个任务是相对独立的,可以并行执行。

# 在Jupyter Notebook或主控脚本中# 假设 agent_network 中已注册了 ErnieKnowledgeAgent, ErnieAnalysisAgent, ErnieTranslationAgent# 它们都在各自的端口(如5006, 5007, 5008)上独立运行# parallel_flow = Flow(agent_network)# (parallel_flow.parallel() # 开始并行块#     .ask("ErnieKnowledgeAgent", "关于A2A协议的背景资料")#     .branch() # 新建一个并行分支#     .ask("ErnieAnalysisAgent", "分析这份销售数据:[数据...],总结趋势。")#     .branch() # 再新建一个并行分支#     .ask("ErnieTranslationAgent", "请将这段英文翻译成中文:'The A2A protocol facilitates interoperability between AI agents.'")# .end_parallel()) # 结束并行块,等待所有分支完成# # 并行块的结果是一个字典,key是分支的序号(默认从'1'开始)或自定义名称,value是该分支最后一个操作的结果# # 例如 results_from_parallel['1'] 是知识问答结果, results_from_parallel['2'] 是数据分析结果等# # 接下来可以有一个串行步骤,汇总并行结果# def combine_results(results, context):#     knowledge_info = results['1'] # 上一步并行块整体是第1步#     analysis_report = knowledge_info["2"] # 获取并行块中第二个分支的结果#     translation = knowledge_info["3"]     # 第三个分支的结果#     # ... 结合这些信息生成最终报告#     # final_report = f"背景资料:{knowledge_info["1"]}\n数据分析:{analysis_report}\n翻译内容:{translation}"#     # return final_report#     # 注意:这里获取并行结果的方式需要根据实际 end_parallel() 返回的嵌套结构来调整#     # 通常 results['step_number_of_parallel_block'] 会是一个字典,包含各分支的结果#     # 例如: results_from_parallel = results['1'] # 假设并行块是第一步#     # background = results_from_parallel['1'] # 并行块的第一个分支#     # analysis = results_from_parallel['2'] # 并行块的第二个分支#     # translation_text = results_from_parallel['3'] # 并行块的第三个分支#     # return f"... {background} ... {analysis} ... {translation_text} ..."#     pass # 实际的组合逻辑# parallel_flow.execute_function(combine_results)# report = parallel_flow.run_sync()
登录后复制

注意:上述并行Flow的写法是一个示意。 python-a2a 的 Flow.parallel() 和 Flow.branch() 提供了强大的并行控制能力。并行执行Agent调用时,每个Agent服务必须能够处理并发请求(Flask默认是线程安全的,可以处理一定程度的并发)。

终端运行:确保所有参与并行的Agent (ErnieKnowledgeAgent, ErnieAnalysisAgent, ErnieTranslationAgent) 都在各自的服务器上运行。

意义:

时间效率:如果每个任务平均耗时5秒,串行执行总共需要15秒。并行执行理想情况下只需要约5秒(取决于最慢的那个任务)。资源利用:当Agent依赖外部I/O(如调用Ernie API、读写文件、访问数据库)时,等待时间较长。并行化可以让CPU在等待一个Agent响应时处理其他Agent的请求或响应。4.3 A2A 与 MCP 的实际结合点:打造全能 Agent

现在,让我们聚焦于A2A和MCP如何真正地融合,以打造能力更全面的Agent。

想象一个"超级研究助理Agent (SuperResearchAssistant)"。它是一个A2A Agent,负责接收用户的复杂研究任务。

A2A层面:

SuperResearchAssistant 通过A2A协议与其他Agent(如 QueryClarificationAgent, ReportFormattingAgent)协作。它接收一个A2A Task,例如:"研究一下近期关于'量子计算在药物研发中的应用'的最新进展,并生成一份摘要报告。"

MCP层面: 为了完成这个任务,SuperResearchAssistant (作为MCP Client) 需要调用一系列MCP工具:

ErnieSearchTool (MCP Tool) :调用封装了Ernie或搜索引擎API的MCP工具,进行网络文献检索。这个工具本身可能是一个独立的MCP Server。输入:关键词 "量子计算 药物研发 最新进展"输出:相关文献链接列表、摘要片段。PDFTextExtractorTool (MCP Tool) :如果检索到的文献是PDF格式,调用此MCP工具提取文本内容。输入:PDF文件URL或内容。输出:纯文本。ErnieSummarizationTool (MCP Tool) :调用封装了Ernie总结能力的MCP工具,对提取的文本进行摘要。输入:长文本。输出:精炼摘要。CodeExecutionTool (MCP Tool) :如果研究中需要运行某些模拟代码或数据分析脚本,调用此MCP工具(例如,一个Jupyter Kernel Gateway封装成的MCP服务)。输入:Python代码片段。输出:代码执行结果、图表等。

实现这个MCP Tool的终端运行方式:

每个MCP工具都可以是一个独立的Python服务。例如,ErnieSearchTool 可能是一个Flask应用,它接收MCP格式的请求,内部调用百度搜索API或Ernie自身的搜索增强能力,然后返回MCP格式的响应。

# 假设你有 ernie_search_mcp_server.pypython ernie_search_mcp_server.py # 例如运行在 6001 端口# 假设你有 pdf_extractor_mcp_server.pypython pdf_extractor_mcp_server.py # 例如运行在 6002 端口
登录后复制

python-a2a 提供了 FastMCP 类来帮助快速创建这样的MCP工具服务器。

# 简化版 ernie_search_mcp_server.py 示例# from python_a2a.mcp import FastMCP, text_response, error_response# from ernie_client import get_ernie_response # 假设你想用Ernie来增强搜索# mcp_search_server = FastMCP(name="Ernie Search Tool", description="Performs web searches using Ernie")# @mcp_search_server.tool(#     name="web_search",#     description="Searches the web for a given query and returns top results."# )# def search_online(query: str, num_results: int = 3):#     """MCP tool function"""#     try:#         # 实际的搜索逻辑,例如调用搜索引擎API或 get_ernie_response
登录后复制

SuperResearchAssistant的内部工作流(伪代码思路):

# SuperResearchAssistant 内部逻辑 - 简化示意# class SuperResearchAssistant(A2AServer):#     # ... 初始化时传入 mcp_client ...#     def handle_task(self, task: Task) -> Task:#         user_query = task.message.content.text#         keywords = self._extract_keywords(user_query) # 可能用Ernie#         # 1. MCP调用:搜索工具 (如 ErnieSearchTool on port 6001)#         try:#             raw_search_results = self.mcp_client.call("http://localhost:6001", "web_search", {"query": keywords})#             # search_items = parse_results(raw_search_results) #             search_items = [{"type":"pdf", "url":"url1", "title":"PDF1"}, {"type":"text", "snippet":"text1"}] # 模拟#         except Exception as e:#             return self._fail_task(task, f"搜索失败: {e}")#         # 2. MCP调用:内容提取 (如 PDFExtractor on port 6002 for PDFs)#         processed_content = []#         for item in search_items:#             if item["type"] == "pdf":#                 try:#                     # pdf_text = self.mcp_client.call("http://localhost:6002", "extract_pdf", {"url": item["url"]})#                     pdf_text = "模拟PDF文本 for " + item["title"]#                     processed_content.append(pdf_text)#                 except Exception:#                     processed_content.append(f"(无法提取PDF: {item['title']})")#             else:#                 processed_content.append(item["snippet"])#         full_text = "\n".join(processed_content)#         # 3. MCP调用:总结工具 (如 ErnieSummarizer on port 6003)#         try:#             # summary = self.mcp_client.call("http://localhost:6003", "summarize", {"text": full_text})#             # 为了演示,直接调用本地Ernie函数 (实际应通过MCP)#             from ernie_client import get_ernie_response#             summary = get_ernie_response(f"总结以下内容:{full_text[:1000]}", model="ernie-lite")#         except Exception as e:#             return self._fail_task(task, f"总结失败: {e}")#         task.artifacts = [{"parts": [{"type": "text", "text": summary}]}]#         task.status = TaskStatus(state=TaskState.COMPLETED)#         return task#     def _fail_task(self, task, error_message):#         task.status = TaskStatus(state=TaskState.FAILED, message=error_message)#         return task
登录后复制

python-a2a 的 A2AClient 可以配置 mcp_servers 参数,使其能够直接调用MCP工具。或者,你也可以使用 python_a2a.mcp.MCPClient 来独立调用MCP工具。

协同的威力:

A2A的"总指挥"角色:SuperResearchAssistant 作为A2A Agent,负责理解复杂任务、拆解任务、编排子任务(哪些串行,哪些并行),并与其他A2A Agent协作(如让人类审批中间结果)。MCP的"具体执行者"角色:各种MCP工具(搜索、提取、总结、计算等)提供了具体的"手臂"和"腿脚",让 SuperResearchAssistant 能够实际操作数据和与外部世界交互。Ernie的多重角色:Ernie可以作为SuperResearchAssistant的核心智能,帮助理解和规划任务。Ernie也可以被封装成多个不同的MCP工具(如ErnieSearchTool, ErnieSummarizationTool, ErnieCodeGenerationTool),提供专门的AI能力。

通过这种A2A指挥、MCP执行的模式,我们可以构建出既有宏观协作能力,又有微观执行能力的强大Agent系统。这使得Agent开发更加模块化:A2A层面关注流程和协作逻辑,MCP层面关注具体能力的实现和工具化封装。学生们可以基于此理念,为自己的Ernie应用设计更富有想象力的Agent架构!


5. 总结与展望

恭喜你完成了这篇"基于Ernie&A2A协议:构建智能协作的AI应用新范式"教程!我们一起探索了AI Agent的世界,从基本概念到实战演练,再到A2A与MCP协同的深入探讨。

5.1 回顾核心价值

A2A协议:为AI Agent之间的互操作性提供了坚实的基础。它使得我们可以构建模块化的、可独立开发和部署的Agent,并通过标准化的接口(Agent Card, Message, Task)进行通信和协作。python-a2a库的Flow引擎更是简化了复杂Agent交互流程的编排。

MCP协议:极大地扩展了Agent的能力边界。通过将外部API、数据源、代码执行环境等封装为MCP工具,Agent(尤其是基于LLM的Agent)可以超越单纯的文本生成,与真实世界进行交互,获取实时信息,执行具体动作。

A2A与MCP的协同:这是构建高级AI应用的关键。A2A负责Agent间的"社交"与任务委派,形成宏观的协作网络;MCP则赋予每个Agent调用工具的"超能力",解决微观的执行问题。这种分层架构使得系统设计更加清晰、灵活和强大。

Ernie的角色:作为先进的中文大语言模型,Ernie在A2A/MCP生态中可以扮演多种角色:

Agent的核心智能:驱动Agent进行理解、推理、决策和生成。专业的MCP工具:封装Ernie的特定能力(如搜索、翻译、摘要、代码生成、情感分析等)作为一个个MCP服务,供其他Agent按需调用。任务路由器/协调器:利用Ernie的理解能力来分析用户意图,并将任务智能地分配给最合适的下游Agent或工具。5.2 Ernie + A2A/MCP 的无限潜力

将Ernie的强大语言能力与A2A/MCP的标准化协作框架相结合,为我们打开了构建下一代智能应用的大门。想象一下:

企业级智能助理集群 :由多个专精不同业务领域(如销售、客服、HR、财务、法务)的Ernie Agent组成,它们通过A2A协议协同工作,并通过MCP调用企业内部的各种API和数据库工具,为员工提供一站式的智能支持。个性化教育辅导系统 :一个主辅导Ernie Agent通过A2A与学生互动,根据学生的学习进度和理解程度,动态调用不同的教学内容生成Agent、练习题生成Agent、知识图谱查询MCP工具等,实现千人千面的自适应学习体验。复杂的科研自动化流程 :研究员可以通过一个顶层Ernie Agent描述实验目标,该Agent随后通过A2A协调数据收集Agent、文献分析Agent、模拟计算Agent(其内部可能通过MCP调用高性能计算集群或专业软件工具),自动完成大部分研究流程并生成报告。更智能的物联网(IoT)中枢 :家庭或工业环境中的各种设备可以被封装为MCP工具。一个中央Ernie Agent通过A2A接收用户指令(自然语言),理解后通过MCP控制相应的设备工具,实现更自然、更智能的人机交互和场景自动化。5.3 给学生的进一步探索建议

本教程仅仅是一个开始。python-a2a库还有许多高级功能值得探索:

流式处理 (Streaming):对于需要实时反馈或处理长序列数据的Agent(如实时语音识别Agent、代码生成Agent),A2A的流式处理能力至关重要。可以研究 StreamingClient 和Agent端的SSE (Server-Sent Events) 支持。Agent发现与注册表 (Discovery & Registry):在大型Agent网络中,如何让Agent动态地发现彼此?python-a2a提供了AgentRegistry等组件来构建Agent发现服务。更高级的LangChain集成:深入了解如何将LangChain的Agent、Chain、Tool与A2A/MCP生态无缝对接,利用LangChain庞大的预构建组件库。异步Agent与工作流:对于I/O密集型的Agent和工作流,充分利用Python的async/await机制,结合python-a2a的异步支持,可以显著提升系统吞吐量和响应速度。错误处理与韧性设计:在真实的分布式Agent系统中,网络分区、Agent宕机、API调用失败等都是常见问题。思考如何在Agent和工作流层面设计更健壮的错误处理、重试、熔断和回退机制。安全与认证:当Agent处理敏感信息或执行关键操作时,如何在A2A通信和MCP工具调用中确保安全性和身份认证?

动手实践是最好的学习方式!

尝试基于本教程的示例进行扩展:

替换模拟数据:将WeatherAgent中的模拟天气替换为真实的在线天气API调用(可以将其封装为一个简单的MCP工具,然后让WeatherAgent通过MCP调用它)。增强Ernie应用:在ActivityAgent中,不仅仅是用Ernie生成推荐文本,而是让Ernie根据更丰富的上下文(如用户偏好、预算、历史行为——这些可以作为Task的输入message传递)来动态生成个性化推荐。构建新的Agent:构思一个新的Agent角色(例如,"餐厅预订Agent"、"交通查询Agent"),实现它,并将其加入到我们的旅行助手中,形成更完整的工作流。探索MCP工具封装:选择一个你常用的Python库或外部API,尝试将其封装成一个MCP工具服务,并让一个简单的A2A Agent来调用它。

我们正处在AI Agent技术爆发的前夜。掌握A2A和MCP这样的标准化协议,结合Ernie这样强大的大模型能力,将使你具备构建未来复杂智能系统的核心竞争力。希望本教程能为你点燃探索的热情,祝你在AI Agent的开发之路上不断进步,创造出令人惊叹的应用!

以上就是【Workflow】基于Ernie&A2A协议Workflow篇的详细内容,更多请关注乐哥常识网其它相关文章!

【Workflow】
Claude生成内容是否可追踪来源 引用来源显示与核验机制说明
相关内容
发表评论

游客 回复需填写必要信息