【LLM】MCP(Python):实现 stdio 通信的Client与Server

06-02 1448阅读

本文将详细介绍如何使用 Model Context Protocol (MCP) 在 Python 中实现基于 STDIO 通信的 Client 与 Server。MCP 是一个开放协议,它使 LLM 应用与外部数据源和工具之间的无缝集成成为可能。无论你是构建 AI 驱动的 IDE、改善 chat 交互,还是构建自定义的 AI 工作流,MCP 提供了一种标准化的方式,将 LLM 与它们所需的上下文连接起来。部分灵感来源:Se7en。

提示

在开始前,请确保你已经安装了必要的依赖包:

pip install openai mcp

本文中,我们将介绍如何配置环境、编写 MCP Server 以及实现 MCP Client。


环境配置

在使用 MCP 之前,需要先配置相关环境变量,以便 Client 与 Server 都能正确加载所需的参数。你可以在项目根目录下创建一个 .env 文件,并写入以下内容:

此外,创建一个 .env 文件来存储您的配置:

MODEL_NAME=deepseek-chat
BASE_URL=https://api.deepseek.com/v1
API_KEY=your_api_key_here

上述配置中,MODEL_NAME 表示使用的 OpenAI 模型名称(例如 “deepseek-chat”),BASE_URL 指向 OpenAI API 的基础地址,而 API_KEY 则为访问 API 所需的密钥。


书写 Server 的规范

构建 MCP Server(特别是基于 stdio 通信)时,推荐遵循统一规范,提升可读性、可维护性与复用性。

  • 服务命名统一

    使用 MCP_SERVER_NAME 作为唯一名称,贯穿日志、初始化等环节。

  • 日志配置清晰

    统一使用 logging 模块,推荐 INFO 级别,便于调试和追踪。

  • 工具注册规范

    通过 @mcp.tool() 装饰器注册工具函数,要求:

    • 命名清晰
    • 参数有类型注解
    • 注释说明参数与返回值(推荐中文)
    • 加入边界检查或异常处理
    • 使用标准 stdio 启动方式

      通过 async with stdio_server() 获取输入输出流,统一调用 _mcp_server.run(...) 启动服务。

    • 初始化选项规范

      使用 InitializationOptions 设置服务名、版本及能力声明(通常由 FastMCP 提供)。

      • 通用模板

        import asyncio
        import logging
        from mcp.server.fastmcp import FastMCP
        from mcp.server import InitializationOptions, NotificationOptions
        from mcp.server.stdio import stdio_server  # STDIO 通信方式
        # 定义唯一服务名称
        MCP_SERVER_NAME = "your-stdio-server-name"
        # 配置日志输出
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        logger = logging.getLogger(MCP_SERVER_NAME)
        # 创建 FastMCP 实例
        mcp = FastMCP(MCP_SERVER_NAME)
        # 定义工具
        @mcp.tool()
        def your_tool_name(param1: type, param2: type) -> return_type:
            """
            工具描述。
            参数:
            - param1 (type): 描述
            - param2 (type): 描述
            返回:
            - return_type: 描述
            """
            # 工具实现
            pass
        # 启动 MCP Server 主函数
        async def main():
            # 创建 stdio 通信通道
            async with stdio_server() as (read_stream, write_stream):
                # 构建初始化选项
                init_options = InitializationOptions(
                    server_name=MCP_SERVER_NAME,
                    server_version="1.0.0",
                    capabilities=mcp._mcp_server.get_capabilities(
                        notification_options=NotificationOptions(),
                        experimental_capabilities={}
                    )
                )
                logger.info("MCP Server 以 STDIO 模式启动中...")
                # 启动 Server
                await mcp._mcp_server.run(read_stream, write_stream, init_options)
        # 主程序入口
        if __name__ == "__main__":
            asyncio.run(main())
        

        编写 MCP Server

        MCP Server 的实现主要基于标准输入输出(STDIO)进行通信。服务端通过注册工具,向外界提供如加法、减法、乘法以及除法等计算功能。下面简述服务端的主要实现步骤:

        1. 初始化 FastMCP 实例

          服务端首先创建一个 FastMCP 实例,并为其命名(例如 “math-stdio-server”)。

        2. 工具注册

          使用装饰器的方式注册加法、减法、乘法、除法等工具,每个工具均包含详细的参数说明和返回值说明。

        3. 日志配置

          通过 Python 标准日志模块对服务端进行日志配置,以便记录服务运行状态和错误信息。

        4. 建立 STDIO 通信

          使用 stdio_server() 函数建立基于 STDIO 的通信,并构造初始化选项,包含服务器名称、版本以及能力说明。随后,调用 MCP 内部的服务启动函数开始监听和处理来自 Client 的请求。

        import asyncio
        import logging
        from mcp.server.fastmcp import FastMCP
        from mcp.server import InitializationOptions, NotificationOptions
        from mcp.server.stdio import stdio_server  # 直接导入 stdio_server 函数
        # 定义服务器名称
        MCP_SERVER_NAME = "math-stdio-server"
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        logger = logging.getLogger(MCP_SERVER_NAME)
        # 初始化 FastMCP 实例
        mcp = FastMCP(MCP_SERVER_NAME)
        # 注册加法工具
        @mcp.tool()
        def add(a: float, b: float) -> float:
            """
            加法工具
            参数:
              - a (float): 第一个数字(必填)
              - b (float): 第二个数字(必填)
            返回:
              - float: a 与 b 的和
            """
            return a + b
        # 注册减法工具
        @mcp.tool()
        def subtract(a: float, b: float) -> float:
            """
            减法工具
            参数:
              - a (float): 被减数(必填)
              - b (float): 减数(必填)
            返回:
              - float: a 与 b 的差
            """
            return a - b
        # 注册乘法工具
        @mcp.tool()
        def multiply(a: float, b: float) -> float:
            """
            乘法工具
            参数:
              - a (float): 第一个数字(必填)
              - b (float): 第二个数字(必填)
            返回:
              - float: a 与 b 的积
            """
            return a * b
        # 注册除法工具
        @mcp.tool()
        def divide(a: float, b: float) -> float:
            """
            除法工具
            参数:
              - a (float): 分子(必填)
              - b (float): 分母(必填,且不能为零)
            返回:
              - float: a 与 b 的商
            """
            if b == 0:
                raise ValueError("除数不能为零")
            return a / b
        async def main():
            # 使用 stdio_server 建立 STDIO 通信
            async with stdio_server() as (read_stream, write_stream):
                # 构造初始化选项
                init_options = InitializationOptions(
                    server_name=MCP_SERVER_NAME,
                    server_version="1.0.0",
                    capabilities=mcp._mcp_server.get_capabilities(
                        notification_options=NotificationOptions(),
                        experimental_capabilities={}
                    )
                )
                logger.info("通过 STDIO 模式启动 MCP Server ...")
                # 使用内部的 _mcp_server 运行服务
                await mcp._mcp_server.run(read_stream, write_stream, init_options)
        if __name__ == "__main__":
            asyncio.run(main())
        

        编写 MCP Client

        MCP Client 主要实现与多个基于 STDIO 的服务器建立连接,并通过 OpenAI API 对用户的自然语言查询进行处理,调用相应工具获得最终结果。客户端的主要逻辑可以分为以下部分:

        1. 初始化客户端

          在 MCPClient 类的构造函数中,传入所需的模型名称、OpenAI API 基础地址、API 密钥以及包含服务端脚本路径的列表。客户端将使用这些参数初始化 OpenAI 异步客户端,同时准备一个 AsyncExitStack 来管理所有异步上下文。

        2. 建立多个 STDIO 连接

          通过遍历服务器脚本列表,为每个脚本生成唯一标识符(如 server0、server1 等),然后依次调用 stdio_client 函数建立连接,并通过 ClientSession 完成初始化。在连接成功后,从每个服务器获取可用工具列表,并将工具名称加上前缀(例如 server0_add)保存到映射表中,避免工具名称冲突。

        3. 处理用户查询

          在 process_query 方法中,客户端首先根据用户的输入构造消息,然后汇总所有连接服务器提供的工具,传递给 OpenAI API 进行处理。当 API 返回调用工具的请求时,客户端根据工具名称找到对应服务器会话,并执行相应的工具调用,收集返回结果后再交由 API 生成后续回复,直至所有工具调用处理完成。

        4. 交互式对话循环

          客户端提供一个简单的命令行交互循环,用户输入查询后,调用 process_query 方法获取最终回复,并打印在终端上。如果用户输入 quit 或使用 Ctrl+C 中断,则客户端将平滑退出并释放所有资源。

        5. 资源清理

          最后,在退出前,通过 AsyncExitStack 统一关闭所有连接,确保资源不会泄露。

        import asyncio
        import json
        import os
        import sys
        from typing import List
        from contextlib import AsyncExitStack
        from mcp import ClientSession, StdioServerParameters
        from mcp.client.stdio import stdio_client
        from openai import AsyncOpenAI
        class MCPClient:
            def __init__(self, model_name: str, base_url: str, api_key: str, server_scripts: List[str]):
                """
                初始化 MCP 客户端,支持多个 stdio 服务器。
                :param model_name: OpenAI 模型名称,例如 "deepseek-chat"。
                :param base_url: OpenAI API 基础地址,例如 "https://api.deepseek.com/v1"。
                :param api_key: OpenAI API 密钥。
                :param server_scripts: stdio 服务脚本路径列表。
                """
                self.model_name = model_name
                self.base_url = base_url
                self.api_key = api_key
                self.server_scripts = server_scripts
                self.sessions = {}         # server_id -> (session, session_ctx, stdio_ctx)
                self.tool_mapping = {}     # 带前缀的工具名 -> (session, 原始工具名)
                self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
                self.exit_stack = AsyncExitStack()
            async def initialize_sessions(self):
                """初始化所有 stdio 服务器连接,并收集工具映射。"""
                for i, script in enumerate(self.server_scripts):
                    if not (os.path.exists(script) and script.endswith(".py")):
                        print(f"脚本 {script} 不存在或不是 .py 文件,跳过。")
                        continue
                    server_id = f"server{i}"
                    params = StdioServerParameters(command="python", args=[script], env=None)
                    try:
                        stdio_ctx = stdio_client(params)
                        stdio = await self.exit_stack.enter_async_context(stdio_ctx)
                        session_ctx = ClientSession(*stdio)
                        session = await self.exit_stack.enter_async_context(session_ctx)
                        await session.initialize()
                        self.sessions[server_id] = (session, session_ctx, stdio_ctx)
                        response = await session.list_tools()
                        for tool in response.tools:
                            self.tool_mapping[f"{server_id}_{tool.name}"] = (session, tool.name)
                        print(f"已连接到 {script},工具:{[tool.name for tool in response.tools]}")
                    except Exception as e:
                        print(f"连接 {script} 失败:{e}")
            async def cleanup(self):
                """释放所有资源。"""
                try:
                    await self.exit_stack.aclose()
                    print("所有连接资源已释放")
                except asyncio.CancelledError:
                    pass
                except Exception as e:
                    print(f"清理资源时异常:{e}")
            async def _gather_available_tools(self):
                """汇总所有服务器的工具列表。"""
                tools = []
                for server_id, (session, _, _) in self.sessions.items():
                    response = await session.list_tools()
                    for tool in response.tools:
                        tools.append({
                            "type": "function",
                            "function": {
                                "name": f"{server_id}_{tool.name}",
                                "description": tool.description,
                                "parameters": tool.inputSchema,
                            }
                        })
                return tools
            async def process_query(self, query: str) -> str:
                """处理查询,调用 OpenAI API 和相应工具后返回结果。"""
                messages = [{"role": "user", "content": query}]
                available_tools = await self._gather_available_tools()
                try:
                    response = await self.client.chat.completions.create(
                        model=self.model_name, messages=messages, tools=available_tools
                    )
                except Exception as e:
                    return f"调用 OpenAI API 失败:{e}"
                final_text = [response.choices[0].message.content or ""]
                message = response.choices[0].message
                # 当有工具调用时循环处理
                while message.tool_calls:
                    for call in message.tool_calls:
                        tool_name = call.function.name
                        if tool_name not in self.tool_mapping:
                            final_text.append(f"未找到工具:{tool_name}")
                            continue
                        session, original_tool = self.tool_mapping[tool_name]
                        tool_args = json.loads(call.function.arguments)
                        try:
                            result = await session.call_tool(original_tool, tool_args)
                            final_text.append(f"[调用 {tool_name} 参数: {tool_args}]")
                            final_text.append(f"工具结果: {result.content}")
                        except Exception as e:
                            final_text.append(f"调用 {tool_name} 出错:{e}")
                            continue
                        messages += [
                            {"role": "assistant", "tool_calls": [{
                                "id": call.id,
                                "type": "function",
                                "function": {"name": tool_name, "arguments": json.dumps(tool_args)}
                            }]},
                            {"role": "tool", "tool_call_id": call.id, "content": str(result.content)}
                        ]
                    try:
                        response = await self.client.chat.completions.create(
                            model=self.model_name, messages=messages, tools=available_tools
                        )
                    except Exception as e:
                        final_text.append(f"调用 OpenAI API 失败:{e}")
                        break
                    message = response.choices[0].message
                    if message.content:
                        final_text.append(message.content)
                return "\n".join(final_text)
            async def chat_loop(self):
                """交互式对话循环,捕获中断平滑退出。"""
                print("MCP 客户端已启动,输入问题,输入 'quit' 退出。")
                while True:
                    try:
                        query = input("问题: ").strip()
                        if query.lower() == "quit":
                            break
                        result = await self.process_query(query)
                        print("\n" + result)
                    except KeyboardInterrupt:
                        print("\n检测到中断信号,退出。")
                        break
                    except Exception as e:
                        print(f"发生错误:{e}")
        async def main():
            model_name = os.getenv("MODEL_NAME", "deepseek-chat")
            base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")
            api_key = os.getenv("API_KEY")
            if not api_key:
                print("未设置 API_KEY 环境变量")
                sys.exit(1)
            # 示例:使用两个 stdio 脚本
            server_scripts = ["server.py"]
            client = MCPClient(model_name, base_url, api_key, server_scripts)
            try:
                await client.initialize_sessions()
                await client.chat_loop()
            except KeyboardInterrupt:
                print("\n收到中断信号")
            finally:
                await client.cleanup()
        if __name__ == "__main__":
            try:
                asyncio.run(main())
            except KeyboardInterrupt:
                print("程序已终止。")
        

        总结

        通过本文的介绍,我们了解了如何使用 MCP 协议在 Python 中构建基于 STDIO 通信的 Client 与 Server。服务端通过注册多个工具为外部应用提供计算能力,而客户端则利用 OpenAI API 和工具调用的方式,将自然语言查询转化为对具体工具的调用,最终将结果反馈给用户。

        这种基于 STDIO 的通信方式不仅简化了服务端与客户端之间的连接,还能方便地支持多服务器同时运行,为构建灵活高效的 LLM 应用提供了坚实的基础。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码