【LLM】MCP(Python):实现 SSE 通信的 Server 和 Client

06-01 1577阅读

部分灵感来源:Se7en。

环境准备

在开始之前,请确保您已安装必要的 Python 库:

pip install openai mcp

此外,创建一个 .env 文件来存储您的配置:

MODEL_NAME=deepseek-chat
BASE_URL=https://api.deepseek.com/v1
API_KEY=your_api_key_here

将 your_api_key_here 替换为您的实际 API 密钥。


Server 实现

Server 的作用是提供工具(tools),这些工具可以被 Client 调用。以下是通用的 Server 结构:

Server 结构

import argparse
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from mcp.server.sse import SseServerTransport
from starlette.requests import Request
from starlette.routing import Mount, Route
from mcp.server import Server
import logging
import uvicorn
# 定义服务器名称
MCP_SERVER_NAME = "your-server-name"
# 配置日志
logging.basicConfig(
    level=logging.INFO, 
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
# 定义工具
@mcp.tool()
def your_tool_name(param1: type, param2: type) -> return_type:
    """
    工具描述。
    参数:
    - param1 (type): 描述
    - param2 (type): 描述
    返回:
    - return_type: 描述
    """
    # 工具实现
    pass
# 创建 Starlette 应用
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
    sse = SseServerTransport("/messages/")
    async def handle_sse(request: Request) -> None:
        async with sse.connect_sse(
                request.scope,
                request.receive,
                request._send,
        ) as (read_stream, write_stream):
            await mcp_server.run(
                read_stream,
                write_stream,
                mcp_server.create_initialization_options(),
            )
    return Starlette(
        debug=debug,
        routes=[
            Route("/sse", endpoint=handle_sse),
            Mount("/messages/", app=sse.handle_post_message),
        ],
    )
# 主程序入口
if __name__ == "__main__":
    mcp_server = mcp._mcp_server
    # 解析命令行参数
    parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
    parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
    parser.add_argument('--port', type=int, default=18080, help='Port to listen on')
    args = parser.parse_args()
    # 创建并运行 Starlette 应用
    starlette_app = create_starlette_app(mcp_server, debug=True)
    uvicorn.run(starlette_app, host=args.host, port=args.port)

实现工具

  • 选择工具:根据您的需求定义工具,例如数学运算、数据查询等。
  • 实现工具函数:使用 @mcp.tool() 装饰器,并确保有详细的文档字符串。
  • 配置服务器:设置服务器名称、日志、主机和端口。

    Client 实现

    Client 的作用是连接到 Server,获取可用的工具,并使用这些工具来处理用户的查询。以下是通用的 Client 结构:

    Client 结构

    import asyncio
    import os
    import sys
    from typing import List
    from mcp import ClientSession
    from mcp.client.sse import sse_client
    from openai import AsyncOpenAI
    class MCPClient:
        def __init__(self, model_name: str, base_url: str, api_key: str, server_urls: List[str]):
            # 初始化代码
            pass
        async def initialize_sessions(self):
            # 初始化会话代码
            pass
        async def cleanup(self):
            # 清理资源代码
            pass
        async def process_query(self, query: str) -> str:
            # 处理查询代码
            pass
        async def chat_loop(self):
            # 交互循环代码
            pass
    async def main():
        # 主函数代码
        pass
    if __name__ == "__main__":
        asyncio.run(main())
    

    实现 Client

    • 配置环境:从 .env 文件或环境变量中获取模型名称、基础 URL 和 API 密钥。
    • 定义服务器 URL 列表:指定要连接的 Server 地址。
    • 实现 MCPClient 类:
      • 初始化:设置模型和服务器信息。
      • 会话管理:连接到每个 Server 并获取工具列表。
      • 查询处理:收集工具,发送请求到 OpenAI API,处理工具调用。
      • 资源清理:确保所有连接正确关闭。
      • 交互循环:接受用户输入,显示回复,并处理退出命令。

        逻辑流程

        以下 Mermaid 图展示了 Client 和 Server 之间的交互逻辑:

        • Client 连接到多个 Server,获取工具列表。
        • Client 收集所有可用工具,并将用户查询发送到 OpenAI API。
        • OpenAI API 返回响应,Client 处理工具调用并获取结果。
        • 最终,Client 将回复显示给用户。

          源码运行测试

          本节将指导您如何运行和测试提供的三个源码文件:数学运算服务器(math-mcp-sse.py)、取模服务器(modulo-mcp-sse.py)和客户端(client.py)。通过以下步骤,您可以验证它们的功能并观察其交互效果。


          运行 Server

          我们将分别启动两个服务器,它们分别提供不同的工具并监听不同的端口。

          • 数学运算服务器:

            • 保存代码为:

              import argparse
              from mcp.server.fastmcp import FastMCP
              from starlette.applications import Starlette
              from mcp.server.sse import SseServerTransport
              from starlette.requests import Request
              from starlette.routing import Mount, Route
              from mcp.server import Server
              import logging
              import uvicorn
              # 定义服务器名称
              MCP_SERVER_NAME = "math-mcp-sse"
              # 配置日志
              logging.basicConfig(
                  level=logging.INFO, 
                  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
              )
              logger = logging.getLogger(MCP_SERVER_NAME)
              # 初始化 FastMCP 实例
              mcp = FastMCP(MCP_SERVER_NAME)
              @mcp.tool()
              def add(a: float, b: float) -> float:
                  """
                  Add two numbers.
                  Parameters:
                  - a (float): First number (required)
                  - b (float): Second number (required)
                  Returns:
                  - float: The result of a + b
                  """
                  return a + b
              @mcp.tool()
              def subtract(a: float, b: float) -> float:
                  """
                  Subtract two numbers.
                  Parameters:
                  - a (float): The number to subtract from (required)
                  - b (float): The number to subtract (required)
                  Returns:
                  - float: The result of a - b
                  """
                  return a - b
              @mcp.tool()
              def multiply(a: float, b: float) -> float:
                  """
                  Multiply two numbers.
                  Parameters:
                  - a (float): First number (required)
                  - b (float): Second number (required)
                  Returns:
                  - float: The result of a * b
                  """
                  return a * b
              @mcp.tool()
              def divide(a: float, b: float) -> float:
                  """
                  Divide two numbers.
                  Parameters:
                  - a (float): Numerator (required)
                  - b (float): Denominator (required, must not be zero)
                  Returns:
                  - float: The result of a / b
                  """
                  if b == 0:
                      raise ValueError("Division by zero is not allowed")
                  return a / b
              # 创建 Starlette 应用
              def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
                  """Create a Starlette application that can serve the provided mcp server with SSE."""
                  sse = SseServerTransport("/messages/")
                  async def handle_sse(request: Request) -> None:
                      async with sse.connect_sse(
                              request.scope,
                              request.receive,
                              request._send,
                      ) as (read_stream, write_stream):
                          await mcp_server.run(
                              read_stream,
                              write_stream,
                              mcp_server.create_initialization_options(),
                          )
                  return Starlette(
                      debug=debug,
                      routes=[
                          Route("/sse", endpoint=handle_sse),
                          Mount("/messages/", app=sse.handle_post_message),
                      ],
                  )
              # 主程序入口
              if __name__ == "__main__":
                  mcp_server = mcp._mcp_server
                  # 解析命令行参数
                  parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
                  parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
                  parser.add_argument('--port', type=int, default=18080, help='Port to listen on')
                  args = parser.parse_args()
                  # 创建并运行 Starlette 应用
                  starlette_app = create_starlette_app(mcp_server, debug=True)
                  uvicorn.run(starlette_app, host=args.host, port=args.port)
              
            • 运行命令:

              python math-mcp-sse.py --port 18080
              
            • 该服务器提供 add、subtract、multiply 和 divide 工具,监听在 0.0.0.0:18080。

            • 取模服务器:

              • 保存代码为:

                【LLM】MCP(Python):实现 SSE 通信的 Server 和 Client
                (图片来源网络,侵删)
                import argparse
                from mcp.server.fastmcp import FastMCP
                from starlette.applications import Starlette
                from mcp.server.sse import SseServerTransport
                from starlette.requests import Request
                from starlette.routing import Mount, Route
                from mcp.server import Server
                import logging
                import uvicorn
                # 定义服务器名称
                MCP_SERVER_NAME = "modulo-mcp-sse"
                # 配置日志
                logging.basicConfig(
                    level=logging.INFO, 
                    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
                )
                logger = logging.getLogger(MCP_SERVER_NAME)
                # 初始化 FastMCP 实例
                mcp = FastMCP(MCP_SERVER_NAME)
                # 定义取模工具
                @mcp.tool()
                def modulo(a: float, b: float) -> float:
                    """
                    计算两个数的取模结果。
                    参数:
                    - a (float):被除数(必填)
                    - b (float):除数(必填,不能为零)
                    返回:
                    - float:a % b 的结果
                    """
                    if b == 0:
                        raise ValueError("除数不能为零")
                    return a % b
                # 创建 Starlette 应用
                def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
                    """创建一个支持 SSE 的 Starlette 应用,用于运行 MCP 服务器。"""
                    sse = SseServerTransport("/messages/")
                    async def handle_sse(request: Request) -> None:
                        async with sse.connect_sse(
                                request.scope,
                                request.receive,
                                request._send,
                        ) as (read_stream, write_stream):
                            await mcp_server.run(
                                read_stream,
                                write_stream,
                                mcp_server.create_initialization_options(),
                            )
                    return Starlette(
                        debug=debug,
                        routes=[
                            Route("/sse", endpoint=handle_sse),
                            Mount("/messages/", app=sse.handle_post_message),
                        ],
                    )
                # 主程序入口
                if __name__ == "__main__":
                    mcp_server = mcp._mcp_server
                    # 解析命令行参数
                    parser = argparse.ArgumentParser(description='运行基于 SSE 的 MCP 取模服务器')
                    parser.add_argument('--host', default='0.0.0.0', help='绑定的主机地址')
                    parser.add_argument('--port', type=int, default=18081, help='监听端口')
                    args = parser.parse_args()
                    # 创建并运行 Starlette 应用
                    starlette_app = create_starlette_app(mcp_server, debug=True)
                    uvicorn.run(starlette_app, host=args.host, port=args.port)
                
              • 运行命令:

                python modulo-mcp-sse.py --port 18081
                
              • 该服务器提供 modulo 工具,监听在 0.0.0.0:18081。

                【LLM】MCP(Python):实现 SSE 通信的 Server 和 Client
                (图片来源网络,侵删)

                运行 Client

                客户端将连接到上述两个服务器,获取工具并处理用户查询。

                • 保存代码为:

                  【LLM】MCP(Python):实现 SSE 通信的 Server 和 Client
                  (图片来源网络,侵删)
                  import asyncio
                  import json
                  import os
                  import sys
                  from typing import List
                  from mcp import ClientSession
                  from mcp.client.sse import sse_client
                  from openai import AsyncOpenAI
                  class MCPClient:
                      def __init__(self, model_name: str, base_url: str, api_key: str, server_urls: List[str]):
                          """
                          初始化 MCP 客户端,连接 OpenAI 接口。
                          :param model_name: 使用的模型名称,例如 "deepseek-chat"。
                          :param base_url: OpenAI 接口的基础地址,例如 "https://api.deepseek.com/v1"。
                          :param api_key: OpenAI API 密钥,用于身份验证。
                          :param server_urls: SSE 服务地址列表,用于连接多个服务器。
                          """
                          self.model_name = model_name
                          self.server_urls = server_urls
                          self.sessions = {}  # 存储每个服务器的会话及其上下文:server_id -> (session, session_context, streams_context)
                          self.tool_mapping = {}  # 工具映射:prefixed_name -> (session, original_tool_name)
                          # 初始化 OpenAI 异步客户端
                          self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
                      async def initialize_sessions(self):
                          """
                          初始化与所有 SSE 服务器的连接,并获取可用工具列表。
                          """
                          for i, server_url in enumerate(self.server_urls):
                              server_id = f"server{i}"  # 为每个服务器生成唯一标识符
                              # 创建 SSE 客户端并进入上下文
                              streams_context = sse_client(url=server_url)
                              streams = await streams_context.__aenter__()
                              session_context = ClientSession(*streams)
                              session = await session_context.__aenter__()
                              await session.initialize()
                              # 存储会话及其上下文
                              self.sessions[server_id] = (session, session_context, streams_context)
                              # 获取工具列表并建立映射
                              response = await session.list_tools()
                              for tool in response.tools:
                                  prefixed_name = f"{server_id}_{tool.name}"  # 为工具名添加服务器前缀
                                  self.tool_mapping[prefixed_name] = (session, tool.name)
                              print(f"已连接到 {server_url},工具列表:{[tool.name for tool in response.tools]}")
                      async def cleanup(self):
                          """
                          清理所有会话和连接资源,确保无资源泄漏。
                          """
                          for server_id, (session, session_context, streams_context) in self.sessions.items():
                              await session_context.__aexit__(None, None, None)  # 退出会话上下文
                              await streams_context.__aexit__(None, None, None)  # 退出 SSE 流上下文
                          print("所有会话已清理。")
                      async def process_query(self, query: str) -> str:
                          """
                          处理用户的自然语言查询,通过工具调用完成任务并返回结果。
                          :param query: 用户输入的查询字符串。
                          :return: 处理后的回复文本。
                          """
                          messages = [{"role": "user", "content": query}]  # 初始化消息列表
                          # 收集所有可用工具
                          available_tools = []
                          for server_id, (session, _, _) in self.sessions.items():
                              response = await session.list_tools()
                              for tool in response.tools:
                                  prefixed_name = f"{server_id}_{tool.name}"
                                  available_tools.append({
                                      "type": "function",
                                      "function": {
                                          "name": prefixed_name,
                                          "description": tool.description,
                                          "parameters": tool.inputSchema,
                                      },
                                  })
                          # 向模型发送初始请求
                          response = await self.client.chat.completions.create(
                              model=self.model_name,
                              messages=messages,
                              tools=available_tools,
                          )
                          final_text = []  # 存储最终回复内容
                          message = response.choices[0].message
                          final_text.append(message.content or "")  # 添加模型的初始回复
                          # 处理工具调用
                          while message.tool_calls:
                              for tool_call in message.tool_calls:
                                  prefixed_name = tool_call.function.name
                                  if prefixed_name in self.tool_mapping:
                                      session, original_tool_name = self.tool_mapping[prefixed_name]
                                      tool_args = json.loads(tool_call.function.arguments)
                                      try:
                                          result = await session.call_tool(original_tool_name, tool_args)
                                      except Exception as e:
                                          result = {"content": f"调用工具 {original_tool_name} 出错:{str(e)}"}
                                          print(result["content"])
                                      final_text.append(f"[调用工具 {prefixed_name} 参数: {tool_args}]")
                                      final_text.append(f"工具结果: {result.content}")
                                      messages.extend([
                                          {
                                              "role": "assistant",
                                              "tool_calls": [{
                                                  "id": tool_call.id,
                                                  "type": "function",
                                                  "function": {"name": prefixed_name, "arguments": json.dumps(tool_args)},
                                              }],
                                          },
                                          {"role": "tool", "tool_call_id": tool_call.id, "content": str(result.content)},
                                      ])
                                  else:
                                      print(f"工具 {prefixed_name} 未找到")
                                      final_text.append(f"工具 {prefixed_name} 未找到")
                              # 获取工具调用后的后续回复
                              response = await self.client.chat.completions.create(
                                  model=self.model_name,
                                  messages=messages,
                                  tools=available_tools,
                              )
                              message = response.choices[0].message
                              if message.content:
                                  final_text.append(message.content)
                          return "\n".join(final_text)
                      async def chat_loop(self):
                          """
                          启动命令行交互式对话循环,接受用户输入并显示回复。
                          """
                          print("\nMCP 客户端已启动,输入你的问题,输入 'quit' 退出。")
                          while True:
                              try:
                                  query = input("\n问题: ").strip()
                                  if query.lower() == "quit":
                                      break
                                  response = await self.process_query(query)
                                  print("\n" + response)
                              except Exception as e:
                                  print(f"\n发生错误: {str(e)}")
                  async def main():
                      """
                      程序入口,设置配置并启动 MCP 客户端。
                      """
                      # 从环境变量获取配置
                      model_name = os.getenv("MODEL_NAME", "deepseek-chat")
                      base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")
                      api_key = os.getenv("API_KEY")
                      if not api_key:
                          print("未设置 API_KEY 环境变量。")
                          sys.exit(1)
                      # 定义 SSE 服务器地址列表
                      server_urls = ["http://localhost:18080/sse", "http://localhost:18081/sse"]
                      # 创建并运行客户端
                      client = MCPClient(model_name=model_name, base_url=base_url, api_key=api_key, server_urls=server_urls)
                      try:
                          await client.initialize_sessions()
                          await client.chat_loop()
                      finally:
                          await client.cleanup()
                  if __name__ == "__main__":
                      asyncio.run(main())
                  
                • 确保 .env 文件已配置正确,包含以下内容:

                  MODEL_NAME=deepseek-chat
                  BASE_URL=https://api.deepseek.com/v1
                  API_KEY=your_api_key_here
                  

                  将 your_api_key_here 替换为您的实际 API 密钥。

                • 运行命令:

                  python client.py
                  
                • 客户端启动后,将连接到 http://localhost:18080/sse 和 http://localhost:18081/sse,并进入交互模式。

                  测试示例

                  以下是一些测试用例,用于验证服务器和客户端的协同工作:

                  • 测试加法运算:

                    • 在客户端命令行输入:计算 5 加 3。

                    • 预期输出类似:

                      [调用工具 server0_add 参数: {"a": 5, "b": 3}]
                      工具结果: 8
                      
                    • 说明:客户端调用 math-mcp-sse.py 提供的 add 工具。

                    • 测试取模运算:

                      • 在客户端命令行输入:计算 10 除以 3 的余数。

                      • 预期输出类似:

                        [调用工具 server1_modulo 参数: {"a": 10, "b": 3}]
                        工具结果: 1
                        
                      • 说明:客户端调用 modulo-mcp-sse.py 提供的 modulo 工具。

                      • 测试错误情况:

                        • 在客户端命令行输入:计算 7 除以 0。

                        • 预期输出类似:

                          调用工具 divide 出错:Division by zero is not allowed
                          [调用工具 server0_divide 参数: {"a": 7, "b": 0}]
                          工具结果: Division by zero is not allowed
                          
                        • 说明:客户端尝试调用 divide 工具,但因除数为零而报错。


                          通过以上步骤,您可以成功运行两个服务器和一个客户端,并通过交互测试验证它们的工具调用功能。确保在运行前安装必要的依赖(pip install openai mcp),并正确配置环境变量。


                          总结

                          通过本教程,您学习了如何使用 MCP 实现通用的 Server 和 Client。MCP 协议的灵活性使其适用于各种 AI 应用场景,您可以根据需求扩展工具和功能。希望本教程能帮助您快速上手 MCP,并构建出强大的 AI 驱动应用!

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码