企业级分布式 MCP 方案

06-01 1357阅读

原文档地址:企业级分布式 MCP 方案

企业级分布式 MCP 方案

[!TIP]

背景:现阶段 MCP Client 和 MCP Server 是一对一的连接方式,若当前 MCP Server 挂掉了,那么 MCP Client 便不能使用 MCP Server 提供的工具能力。工具稳定性的提供得不到保证

解决:做了一些分布式 Client 连接的探索,一个 MCP Client 端可以连接多个 MCP Server(分布式部署),目前采用的方案如下:

  1. 新建一个包含服务名和对应连接的类
  2. 另外实现监听机制,可以动态的应对 MCP Server 节点上下线,去动态调整 mcpAsyncClientList
  3. (读操作)获取 MCP Server 相关信息的,采用从 mcpAsyncClientList 列表中随机中获取一个去发起请求,比如获取工具列表信息
  4. (写操作)对应 MCP Server 需要更改的信息,由 MCP Client 端发起,需要修改所有的 MCP Server

public class LoadbalancedAsyncClient implements EventListener {

private String serviceName;

private List mcpAsyncClientList;

}

给社区贡献代码:https://github.com/alibaba/spring-ai-alibaba/pull/755

模块代码解析

yaml 文件

spring:
  ai:
    mcp:
      client:
        enabled: true
        name: mcp-client-webflux
        version: 1.0.0
        type: SYNC
        nacos-enabled: true # 开启nacos-client配置,启动分布式
        
    alibaba:
      mcp:
        nacos: ## nacos的基础配置信息
          enabled: true
          server-addr: 
          service-namespace:   
          service-group: 
        client:
          sse:
            connections: // 注册在nacos的MCP Server服务,这里mcp-server1代表多节点
              nacos-server1: mcp-server1
              nacos-server2: mcp-server2

自动注入部分

NacosMcpSseClientProperties(配置类)
@ConfigurationProperties("spring.ai.alibaba.mcp.client.sse")
public class NacosMcpSseClientProperties {
    public static final String _CONFIG_PREFIX _= "spring.ai.alibaba.mcp.client.sse";
    private final Map connections = new HashMap();
    public Map getConnections() {
       return connections;
    }
}
NacosMcpSseClientAutoConfiguration

提供 Map 的 bean

  • 键代表服务名
  • 值为对应的后续连接的 WebFluxSseClientTransport 列表
    @AutoConfiguration
    @EnableConfigurationProperties({ NacosMcpSseClientProperties.class, NacosMcpRegistryProperties.class })
    public class NacosMcpSseClientAutoConfiguration {
        private static final Logger _logger _= LoggerFactory._getLogger_(NacosMcpSseClientAutoConfiguration.class);
        public NacosMcpSseClientAutoConfiguration() {
        }
        @Bean
        public NamingService nacosNamingService(NacosMcpRegistryProperties nacosMcpRegistryProperties) {
           Properties nacosProperties = nacosMcpRegistryProperties.getNacosProperties();
           try {
              return NamingFactory._createNamingService_(nacosProperties);
           }
           catch (NacosException e) {
              throw new RuntimeException(e);
           }
        }
        @Bean(name = "server2NamedTransport")
        public Map server2NamedTransport(
              NacosMcpSseClientProperties nacosMcpSseClientProperties, NamingService namingService,
              ObjectProvider webClientBuilderProvider,
              ObjectProvider objectMapperProvider) {
           Map server2NamedTransport = new HashMap();
           WebClient.Builder webClientBuilderTemplate = (WebClient.Builder) webClientBuilderProvider
              .getIfAvailable(WebClient::_builder_);
           ObjectMapper objectMapper = (ObjectMapper) objectMapperProvider.getIfAvailable(ObjectMapper::new);
           Map connections = nacosMcpSseClientProperties.getConnections();
           connections.forEach((serviceKey, serviceName) -> {
              try {
                 List instances = namingService.selectInstances(serviceName, true);
                 List namedTransports = new ArrayList();
                 for (Instance instance : instances) {
                    String url = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"
                          + instance.getPort();
                    WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(url);
                    WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);
                    namedTransports
                       .add(new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport));
                 }
                 server2NamedTransport.put(serviceName, namedTransports);
              }
              catch (NacosException e) {
                 _logger_.error("nacos naming service: {} error", serviceName, e);
              }
           });
           return server2NamedTransport;
        }
    }
    
    NacosMcpClientAutoConfiguration

    提供和 MCP Server 进行交互的客户端

    • List
    • List
      @AutoConfiguration(after = { NacosMcpSseClientAutoConfiguration.class, McpClientAutoConfiguration.class })
      @ConditionalOnClass({ McpSchema.class })
      @EnableConfigurationProperties({ McpClientCommonProperties.class })
      @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "nacos-enabled" }, havingValue = "true",
             matchIfMissing = false)
      public class NacosMcpClientAutoConfiguration {
          public NacosMcpClientAutoConfiguration() {
          }
          private String connectedClientName(String clientName, String serverConnectionName) {
             return clientName + " - " + serverConnectionName;
          }
          @Bean
          @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC",
                matchIfMissing = true)
          public List loadbalancedMcpSyncClientList(
                ObjectProvider mcpSyncClientConfigurerProvider,
                McpClientCommonProperties commonProperties,
                @Qualifier("server2NamedTransport") ObjectProvider server2NamedTransportProvider,
                ObjectProvider namingServiceProvider) {
             NamingService namingService = namingServiceProvider.getObject();
             McpSyncClientConfigurer mcpSyncClientConfigurer = mcpSyncClientConfigurerProvider.getObject();
             List loadbalancedMcpSyncClients = new ArrayList();
             Map server2NamedTransport = server2NamedTransportProvider.getObject();
             for (Map.Entry entry : server2NamedTransport.entrySet()) {
                String serviceName = entry.getKey();
                List namedTransports = entry.getValue();
                List mcpSyncClients = new ArrayList();
                McpSyncClient syncClient;
                for (NamedClientMcpTransport namedTransport : namedTransports) {
                   McpSchema.Implementation clientInfo = new McpSchema.Implementation(
                         this.connectedClientName(commonProperties.getName(), namedTransport.name()),
                         commonProperties.getVersion());
                   McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport())
                      .clientInfo(clientInfo)
                      .requestTimeout(commonProperties.getRequestTimeout());
                   syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec);
                   syncClient = syncSpec.build();
                   if (commonProperties.isInitialized()) {
                      syncClient.initialize();
                   }
                   mcpSyncClients.add(syncClient);
                }
                LoadbalancedMcpSyncClient loadbalancedMcpSyncClient = LoadbalancedMcpSyncClient._builder_()
                   .serviceName(serviceName)
                   .mcpSyncClientList(mcpSyncClients)
                   .namingService(namingService)
                   .build();
                loadbalancedMcpSyncClient.subscribe();
                loadbalancedMcpSyncClients.add(loadbalancedMcpSyncClient);
             }
             return loadbalancedMcpSyncClients;
          }
          @Bean
          @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC")
          public List loadbalancedMcpAsyncClientList(
                ObjectProvider mcpAsyncClientConfigurerProvider,
                McpClientCommonProperties commonProperties,
                @Qualifier("server2NamedTransport") ObjectProvider server2NamedTransportProvider,
                ObjectProvider namingServiceProvider) {
             NamingService namingService = namingServiceProvider.getObject();
             McpAsyncClientConfigurer mcpAsyncClientConfigurer = mcpAsyncClientConfigurerProvider.getObject();
             List loadbalancedMcpAsyncClients = new ArrayList();
             Map server2NamedTransport = server2NamedTransportProvider.getObject();
             for (Map.Entry entry : server2NamedTransport.entrySet()) {
                String serviceName = entry.getKey();
                List namedTransports = entry.getValue();
                List mcpAsyncClients = new ArrayList();
                McpAsyncClient asyncClient;
                for (NamedClientMcpTransport namedTransport : namedTransports) {
                   McpSchema.Implementation clientInfo = new McpSchema.Implementation(
                         this.connectedClientName(commonProperties.getName(), namedTransport.name()),
                         commonProperties.getVersion());
                   McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport())
                      .clientInfo(clientInfo)
                      .requestTimeout(commonProperties.getRequestTimeout());
                   asyncSpec = mcpAsyncClientConfigurer.configure(namedTransport.name(), asyncSpec);
                   asyncClient = asyncSpec.build();
                   if (commonProperties.isInitialized()) {
                      asyncClient.initialize().block();
                   }
                   mcpAsyncClients.add(asyncClient);
                }
                LoadbalancedMcpAsyncClient loadbalancedMcpAsyncClient = LoadbalancedMcpAsyncClient._builder_()
                   .serviceName(serviceName)
                   .mcpAsyncClientList(mcpAsyncClients)
                   .namingService(namingService)
                   .build();
                loadbalancedMcpAsyncClient.subscribe();
                loadbalancedMcpAsyncClients.add(loadbalancedMcpAsyncClient);
             }
             return loadbalancedMcpAsyncClients;
          }
      }
      

      Client 端部分

      LoadbalancedMcpAsyncClient

      各字段含义:

      • String serviceName:MCP Server 注册的服务名称
      • List mcpAsyncClientList:对应的多节点客户端
      • NamingService namingService:Nacos 服务
      • List instances:Nacos 中 MCP Server 的实例列表

        其余方法的使用和 McpAsyncClient 保持一致,已经全面封装好了

        1. 读操作:通过 getMcpAsyncClient()方法轮询得到 McpAsyncClient 列表
        2. 写操作:对所有 List 进行操作

        通过实现 EventListener 接口,动态增加 or 减少 McpAsyncClient

        public class LoadbalancedMcpAsyncClient implements EventListener {
            private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class);
            private final String serviceName;
            private final List mcpAsyncClientList;
            private final AtomicInteger currentIndex = new AtomicInteger(0);
            private final NamingService namingService;
            private List instances;
            public LoadbalancedMcpAsyncClient(String serviceName, List mcpAsyncClientList,
                  NamingService namingService) {
               Assert._notNull_(serviceName, "serviceName cannot be null");
               Assert._notNull_(mcpAsyncClientList, "mcpAsyncClientList cannot be null");
               Assert._notNull_(namingService, "namingService cannot be null");
               this.serviceName = serviceName;
               this.mcpAsyncClientList = mcpAsyncClientList;
               try {
                  this.namingService = namingService;
                  this.instances = namingService.selectInstances(serviceName, true);
               }
               catch (NacosException e) {
                  throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName));
               }
            }
            public void subscribe() {
               try {
                  this.namingService.subscribe(this.serviceName, this);
               }
               catch (NacosException e) {
                  throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName));
               }
            }
            public String getServiceName() {
               return serviceName;
            }
            public List getMcpAsyncClientList() {
               return mcpAsyncClientList;
            }
            public NamingService getNamingService() {
               return this.namingService;
            }
            public List getInstances() {
               return this.instances;
            }
            private McpAsyncClient getMcpAsyncClient() {
               if (mcpAsyncClientList.isEmpty()) {
                  throw new IllegalStateException("No McpAsyncClient available");
               }
               int index = currentIndex.getAndIncrement() % mcpAsyncClientList.size();
               return mcpAsyncClientList.get(index);
            }
            // ------------------------------------------------------------------------------------------------------------------------------------------------
            public McpSchema.ServerCapabilities getServerCapabilities() {
               return getMcpAsyncClient().getServerCapabilities();
            }
            public McpSchema.Implementation getServerInfo() {
               return getMcpAsyncClient().getServerInfo();
            }
            public boolean isInitialized() {
               return getMcpAsyncClient().isInitialized();
            }
            public McpSchema.ClientCapabilities getClientCapabilities() {
               return getMcpAsyncClient().getClientCapabilities();
            }
            public McpSchema.Implementation getClientInfo() {
               return getMcpAsyncClient().getClientInfo();
            }
            public void close() {
               Iterator iterator = mcpAsyncClientList.iterator();
               while (iterator.hasNext()) {
                  McpAsyncClient mcpAsyncClient = iterator.next();
                  mcpAsyncClient.close();
                  iterator.remove();
                  _logger_.info("Closed and removed McpSyncClient: {}", mcpAsyncClient.getClientInfo().name());
               }
            }
            public Mono closeGracefully() {
               Iterator iterator = mcpAsyncClientList.iterator();
               List closeMonos = new ArrayList();
               while (iterator.hasNext()) {
                  McpAsyncClient mcpAsyncClient = iterator.next();
                  Mono voidMono = mcpAsyncClient.closeGracefully().doOnSuccess(v -> {
                     iterator.remove();
                     _logger_.info("Closed and removed McpAsyncClient: {}", mcpAsyncClient.getClientInfo().name());
                  });
                  closeMonos.add(voidMono);
               }
               return Mono._when_(closeMonos);
            }
            public Mono ping() {
               return getMcpAsyncClient().ping();
            }
            public Mono addRoot(McpSchema.Root root) {
               return Mono._when_(mcpAsyncClientList.stream()
                  .map(mcpAsyncClient -> mcpAsyncClient.addRoot(root))
                  .collect(Collectors._toList_()));
            }
            public Mono removeRoot(String rootUri) {
               return Mono._when_(mcpAsyncClientList.stream()
                  .map(mcpAsyncClient -> mcpAsyncClient.removeRoot(rootUri))
                  .collect(Collectors._toList_()));
            }
            public Mono rootsListChangedNotification() {
               return Mono._when_(mcpAsyncClientList.stream()
                  .map(McpAsyncClient::rootsListChangedNotification)
                  .collect(Collectors._toList_()));
            }
            public Mono callTool(McpSchema.CallToolRequest callToolRequest) {
               return getMcpAsyncClient().callTool(callToolRequest);
            }
            public Mono listTools() {
               return getMcpAsyncClient().listTools();
            }
            public Mono listTools(String cursor) {
               return getMcpAsyncClient().listTools(cursor);
            }
            public Mono listResources() {
               return getMcpAsyncClient().listResources();
            }
            public Mono listResources(String cursor) {
               return getMcpAsyncClient().listResources(cursor);
            }
            public Mono readResource(McpSchema.Resource resource) {
               return getMcpAsyncClient().readResource(resource);
            }
            public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) {
               return getMcpAsyncClient().readResource(readResourceRequest);
            }
            public Mono listResourceTemplates() {
               return getMcpAsyncClient().listResourceTemplates();
            }
            public Mono listResourceTemplates(String cursor) {
               return getMcpAsyncClient().listResourceTemplates(cursor);
            }
            public Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
               return Mono._when_(mcpAsyncClientList.stream()
                  .map(mcpAsyncClient -> mcpAsyncClient.subscribeResource(subscribeRequest))
                  .collect(Collectors._toList_()));
            }
            public Mono unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
               return Mono._when_(mcpAsyncClientList.stream()
                  .map(mcpAsyncClient -> mcpAsyncClient.unsubscribeResource(unsubscribeRequest))
                  .collect(Collectors._toList_()));
            }
            public Mono listPrompts() {
               return getMcpAsyncClient().listPrompts();
            }
            public Mono listPrompts(String cursor) {
               return getMcpAsyncClient().listPrompts(cursor);
            }
            public Mono getPrompt(McpSchema.GetPromptRequest getPromptRequest) {
               return getMcpAsyncClient().getPrompt(getPromptRequest);
            }
            public Mono setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
               return Mono._when_(mcpAsyncClientList.stream()
                  .map(mcpAsyncClient -> mcpAsyncClient.setLoggingLevel(loggingLevel))
                  .collect(Collectors._toList_()));
            }
            // ------------------------------------------------------------------------------------------------------------------------------------------------
            @Override
            public void onEvent(Event event) {
               if (event instanceof NamingEvent namingEvent) {
                  if (this.serviceName.equals(namingEvent.getServiceName())) {
                     _logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName());
                     List instances = namingEvent.getInstances();
                     _logger_.info("Updated instances count: {}", instances.size());
                     // 打印每个实例的详细信息
                     instances.forEach(instance -> {
                        _logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(),
                              instance.getPort(), instance.isHealthy(), instance.isEnabled(),
                              JacksonUtils._toJson_(instance.getMetadata()));
                     });
                     updateClientList(instances);
                  }
               }
            }
            private void updateClientList(List currentInstances) {
               McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class);
               McpAsyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder
                  ._getBean_(McpAsyncClientConfigurer.class);
               ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class);
               WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class);
               // 移除的实例列表
               List removeInstances = instances.stream()
                  .filter(instance -> !currentInstances.contains(instance))
                  .collect(Collectors._toList_());
               // 新增的实例列表
               List addInstances = currentInstances.stream()
                  .filter(instance -> !instances.contains(instance))
                  .collect(Collectors._toList_());
               // 删除McpAsyncClient实例
               List clientInfoNames = removeInstances.stream()
                  .map(instance -> connectedClientName(commonProperties.getName(),
                        this.serviceName + "-" + instance.getInstanceId()))
                  .toList();
               Iterator iterator = mcpAsyncClientList.iterator();
                while (iterator.hasNext()) {
                    McpAsyncClient mcpAsyncClient = iterator.next();
                    McpSchema.Implementation clientInfo = mcpAsyncClient.getClientInfo();
                    if (clientInfoNames.contains(clientInfo.name())) {
                       _logger_.info("Removing McpAsyncClient: {}", clientInfo.name());
                       mcpAsyncClient.closeGracefully().subscribe(v -> {
                          iterator.remove();
                       }, e -> _logger_.error("Failed to remove McpAsyncClient: {}", clientInfo.name(), e));
                    }
                }
                // 新增McpAsyncClient实例
                McpAsyncClient asyncClient;
                for (Instance instance : addInstances) {
                    String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"
                          + instance.getPort();
                    WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl);
                    WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);
                    NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(
                          serviceName + "-" + instance.getInstanceId(), transport);
                
                    McpSchema.Implementation clientInfo = new McpSchema.Implementation(
                          this.connectedClientName(commonProperties.getName(), namedTransport.name()),
                          commonProperties.getVersion());
                    McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport())
                       .clientInfo(clientInfo)
                       .requestTimeout(commonProperties.getRequestTimeout());
                    asyncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), asyncSpec);
                    asyncClient = asyncSpec.build();
                    if (commonProperties.isInitialized()) {
                       asyncClient.initialize().block();
                    }
                    _logger_.info("Added McpAsyncClient: {}", clientInfo.name());
                    mcpAsyncClientList.add(asyncClient);
                }
            private String connectedClientName(String clientName, String serverConnectionName) {
               return clientName + " - " + serverConnectionName;
            }
            public static Builder builder() {
               return new Builder();
            }
            public static class Builder {
               private String serviceName;
               private List mcpAsyncClientList;
               private NamingService namingService;
               public Builder serviceName(String serviceName) {
                  this.serviceName = serviceName;
                  return this;
               }
               public Builder mcpAsyncClientList(List mcpAsyncClientList) {
                  this.mcpAsyncClientList = mcpAsyncClientList;
                  return this;
               }
               public Builder namingService(NamingService namingService) {
                  this.namingService = namingService;
                  return this;
               }
               public LoadbalancedMcpAsyncClient build() {
                  return new LoadbalancedMcpAsyncClient(this.serviceName, this.mcpAsyncClientList, this.namingService);
               }
            }
        }
        
        LoadbalancedMcpSyncClient

        同上

        public class LoadbalancedMcpSyncClient implements EventListener {
            private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class);
            private final String serviceName;
            private final List mcpSyncClientList;
            private final AtomicInteger currentIndex = new AtomicInteger(0);
            private final NamingService namingService;
            private List instances;
            public LoadbalancedMcpSyncClient(String serviceName, List mcpSyncClientList,
                  NamingService namingService) {
               Assert._notNull_(serviceName, "Service name must not be null");
               Assert._notNull_(mcpSyncClientList, "McpSyncClient list must not be null");
               Assert._notNull_(namingService, "NamingService must not be null");
               this.serviceName = serviceName;
               this.mcpSyncClientList = mcpSyncClientList;
               try {
                  this.namingService = namingService;
                  this.instances = namingService.selectInstances(serviceName, true);
               }
               catch (NacosException e) {
                  throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName));
               }
            }
            public void subscribe() {
               try {
                  this.namingService.subscribe(this.serviceName, this);
               }
               catch (NacosException e) {
                  throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName));
               }
            }
            public String getServiceName() {
               return this.serviceName;
            }
            public List getMcpSyncClientList() {
               return this.mcpSyncClientList;
            }
            public NamingService getNamingService() {
               return this.namingService;
            }
            public List getInstances() {
               return this.instances;
            }
            private McpSyncClient getMcpSyncClient() {
               if (mcpSyncClientList.isEmpty()) {
                  throw new IllegalStateException("No McpAsyncClient available");
               }
               int index = currentIndex.getAndIncrement() % mcpSyncClientList.size();
               return mcpSyncClientList.get(index);
            }
            // ------------------------------------------------------------------------------------------------------------------------------------------------
            public McpSchema.ServerCapabilities getServerCapabilities() {
               return getMcpSyncClient().getServerCapabilities();
            }
            public McpSchema.Implementation getServerInfo() {
               return getMcpSyncClient().getServerInfo();
            }
            public McpSchema.ClientCapabilities getClientCapabilities() {
               return getMcpSyncClient().getClientCapabilities();
            }
            public McpSchema.Implementation getClientInfo() {
               return getMcpSyncClient().getClientInfo();
            }
            public void close() {
               Iterator iterator = mcpSyncClientList.iterator();
               while (iterator.hasNext()) {
                  McpSyncClient mcpSyncClient = iterator.next();
                  mcpSyncClient.close();
                  iterator.remove();
                  _logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name());
               }
            }
            public boolean closeGracefully() {
               List flagList = new ArrayList();
               Iterator iterator = mcpSyncClientList.iterator();
               while (iterator.hasNext()) {
                  McpSyncClient mcpSyncClient = iterator.next();
                  boolean flag = mcpSyncClient.closeGracefully();
                  flagList.add(flag);
                  if (flag) {
                     iterator.remove();
                     _logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name());
                  }
               }
               return !flagList.stream().allMatch(flag -> flag);
            }
            public Object ping() {
               return getMcpSyncClient().ping();
            }
            public void addRoot(McpSchema.Root root) {
               for (McpSyncClient mcpSyncClient : mcpSyncClientList) {
                  mcpSyncClient.addRoot(root);
               }
            }
            public void removeRoot(String rootUri) {
               for (McpSyncClient mcpSyncClient : mcpSyncClientList) {
                  mcpSyncClient.removeRoot(rootUri);
               }
            }
            public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
               return getMcpSyncClient().callTool(callToolRequest);
            }
            public McpSchema.ListToolsResult listTools() {
               return getMcpSyncClient().listTools();
            }
            public McpSchema.ListToolsResult listTools(String cursor) {
               return getMcpSyncClient().listTools(cursor);
            }
            public McpSchema.ListResourcesResult listResources(String cursor) {
               return getMcpSyncClient().listResources(cursor);
            }
            public McpSchema.ListResourcesResult listResources() {
               return getMcpSyncClient().listResources();
            }
            public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
               return getMcpSyncClient().readResource(resource);
            }
            public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {
               return getMcpSyncClient().readResource(readResourceRequest);
            }
            public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {
               return getMcpSyncClient().listResourceTemplates(cursor);
            }
            public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
               return getMcpSyncClient().listResourceTemplates();
            }
            public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
               for (McpSyncClient mcpSyncClient : mcpSyncClientList) {
                  mcpSyncClient.subscribeResource(subscribeRequest);
               }
            }
            public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
               for (McpSyncClient mcpSyncClient : mcpSyncClientList) {
                  mcpSyncClient.unsubscribeResource(unsubscribeRequest);
               }
            }
            public McpSchema.ListPromptsResult listPrompts(String cursor) {
               return getMcpSyncClient().listPrompts(cursor);
            }
            public McpSchema.ListPromptsResult listPrompts() {
               return getMcpSyncClient().listPrompts();
            }
            public McpSchema.GetPromptResult getPrompt(McpSchema.GetPromptRequest getPromptRequest) {
               return getMcpSyncClient().getPrompt(getPromptRequest);
            }
            public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
               for (McpSyncClient mcpSyncClient : mcpSyncClientList) {
                  mcpSyncClient.setLoggingLevel(loggingLevel);
               }
            }
            // ------------------------------------------------------------------------------------------------------------------------------------------------
            @Override
            public void onEvent(Event event) {
               if (event instanceof NamingEvent namingEvent) {
                  if (this.serviceName.equals(namingEvent.getServiceName())) {
                     _logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName());
                     List instances = namingEvent.getInstances();
                     _logger_.info("Updated instances count: {}", instances.size());
                     // 打印每个实例的详细信息
                     instances.forEach(instance -> {
                        _logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(),
                              instance.getPort(), instance.isHealthy(), instance.isEnabled(),
                              JacksonUtils._toJson_(instance.getMetadata()));
                     });
                     updateClientList(instances);
                  }
               }
            }
            private void updateClientList(List currentInstances) {
               McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class);
               McpSyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder
                  ._getBean_(McpSyncClientConfigurer.class);
               ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class);
               WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class);
               // 移除的实例列表
               List removeInstances = instances.stream()
                  .filter(instance -> !currentInstances.contains(instance))
                  .collect(Collectors._toList_());
               // 新增的实例列表
               List addInstances = currentInstances.stream()
                  .filter(instance -> !instances.contains(instance))
                  .collect(Collectors._toList_());
               // 删除McpSyncClient实例
               List clientInfoNames = removeInstances.stream()
                  .map(instance -> connectedClientName(commonProperties.getName(),
                        this.serviceName + "-" + instance.getInstanceId()))
                  .toList();
               Iterator iterator = mcpSyncClientList.iterator();
               while (iterator.hasNext()) {
                  McpSyncClient mcpSyncClient = iterator.next();
                  McpSchema.Implementation clientInfo = mcpSyncClient.getClientInfo();
                  if (clientInfoNames.contains(clientInfo.name())) {
                     _logger_.info("Removing McpsyncClient: {}", clientInfo.name());
                     if (mcpSyncClient.closeGracefully()) {
                        iterator.remove();
                     }
                     else {
                        _logger_.warn("Failed to remove mcpSyncClient: {}", clientInfo.name());
                     }
                  }
               }
               // 新增McpSyncClient实例
               McpSyncClient syncClient;
               for (Instance instance : addInstances) {
                  String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":"
                        + instance.getPort();
                  WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl);
                  WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper);
                  NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(
                        serviceName + "-" + instance.getInstanceId(), transport);
                  McpSchema.Implementation clientInfo = new McpSchema.Implementation(
                        this.connectedClientName(commonProperties.getName(), namedTransport.name()),
                        commonProperties.getVersion());
                  McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport())
                     .clientInfo(clientInfo)
                     .requestTimeout(commonProperties.getRequestTimeout());
                  syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec);
                  syncClient = syncSpec.build();
                  if (commonProperties.isInitialized()) {
                     syncClient.initialize();
                  }
                  _logger_.info("Added McpAsyncClient: {}", clientInfo.name());
                  mcpSyncClientList.add(syncClient);
               }
               this.instances = currentInstances;
            }
            private String connectedClientName(String clientName, String serverConnectionName) {
               return clientName + " - " + serverConnectionName;
            }
            public static Builder builder() {
               return new Builder();
            }
            public static class Builder {
               private String serviceName;
               private List mcpSyncClientList;
               private NamingService namingService;
               public Builder serviceName(String serviceName) {
                  this.serviceName = serviceName;
                  return this;
               }
               public Builder mcpSyncClientList(List mcpSyncClientList) {
                  this.mcpSyncClientList = mcpSyncClientList;
                  return this;
               }
               public Builder namingService(NamingService namingService) {
                  this.namingService = namingService;
                  return this;
               }
               public LoadbalancedMcpSyncClient build() {
                  return new LoadbalancedMcpSyncClient(this.serviceName, this.mcpSyncClientList, this.namingService);
               }
            }
        }
        

        工具类

        ApplicationContextHolder
        @Component
        public class ApplicationContextHolder implements ApplicationContextAware {
            private static ApplicationContext _applicationContext_;
            @Override
            public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
               ApplicationContextHolder._applicationContext _= applicationContext;
            }
            public static  T getBean(Class clazz) {
               return _applicationContext_.getBean(clazz);
            }
        }
        

        效果演示

        我在 nacos 中,注册了 MCP Server 服务,部署两个节点

        • 同一台机器以不同端口号启动的 MCP Server 服务,分别是 19000、19001,注册在 Nacos 中以 mcp-server-provider 为服务名

          企业级分布式 MCP 方案

          yml 配置如下

          server:
            port: 8080
          spring:
            application:
              name: mcp-client-webflux
          _  _ai:
              alibaba:
                mcp:
                  nacos:
                    enabled: true
                    server-addr: 127.0.0.1:8848
                    username: nacos
                    password: nacos
                    
                  client:
                    sse:
                      connections:
                        nacos-server1: mcp-server-provider
                        
              mcp:
                client:
                  enabled: true
                  name: mcp-client-webflux
                  version: 0.0.1
                  initialized: true
                  request-timeout: 600s
              
                  nacos-enabled: true
          

          我们能发现已经成功注入 LoadbalancedMcpSyncClient 类,其中 mcp-server-provider 有两个实例,对应的两个 McpSyncClient

          企业级分布式 MCP 方案

          我们停掉其中的 MCP Server19001 端口的服务,通过 removeInstances 获取移除的实例列表,同步在 mcpSyncClientList 移除对应的 McpSyncClient

          企业级分布式 MCP 方案

          我们再新启动 MCP Server19001 端口的服务,通过 addInstances 获取新增的实例列表,同步在 mcpSyncClientList 新增对应的 McpSyncClient

          企业级分布式 MCP 方案

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码