企业级分布式 MCP 方案
原文档地址:企业级分布式 MCP 方案
企业级分布式 MCP 方案
[!TIP]
背景:现阶段 MCP Client 和 MCP Server 是一对一的连接方式,若当前 MCP Server 挂掉了,那么 MCP Client 便不能使用 MCP Server 提供的工具能力。工具稳定性的提供得不到保证
解决:做了一些分布式 Client 连接的探索,一个 MCP Client 端可以连接多个 MCP Server(分布式部署),目前采用的方案如下:
- 新建一个包含服务名和对应连接的类
- 另外实现监听机制,可以动态的应对 MCP Server 节点上下线,去动态调整 mcpAsyncClientList
- (读操作)获取 MCP Server 相关信息的,采用从 mcpAsyncClientList 列表中随机中获取一个去发起请求,比如获取工具列表信息
- (写操作)对应 MCP Server 需要更改的信息,由 MCP Client 端发起,需要修改所有的 MCP Server
public class LoadbalancedAsyncClient implements EventListener {
private String serviceName;
private List mcpAsyncClientList;
}
给社区贡献代码:https://github.com/alibaba/spring-ai-alibaba/pull/755
模块代码解析
yaml 文件
spring: ai: mcp: client: enabled: true name: mcp-client-webflux version: 1.0.0 type: SYNC nacos-enabled: true # 开启nacos-client配置,启动分布式 alibaba: mcp: nacos: ## nacos的基础配置信息 enabled: true server-addr: service-namespace: service-group: client: sse: connections: // 注册在nacos的MCP Server服务,这里mcp-server1代表多节点 nacos-server1: mcp-server1 nacos-server2: mcp-server2
自动注入部分
NacosMcpSseClientProperties(配置类)
@ConfigurationProperties("spring.ai.alibaba.mcp.client.sse") public class NacosMcpSseClientProperties { public static final String _CONFIG_PREFIX _= "spring.ai.alibaba.mcp.client.sse"; private final Map connections = new HashMap(); public Map getConnections() { return connections; } }
NacosMcpSseClientAutoConfiguration
提供 Map 的 bean
- 键代表服务名
- 值为对应的后续连接的 WebFluxSseClientTransport 列表
@AutoConfiguration @EnableConfigurationProperties({ NacosMcpSseClientProperties.class, NacosMcpRegistryProperties.class }) public class NacosMcpSseClientAutoConfiguration { private static final Logger _logger _= LoggerFactory._getLogger_(NacosMcpSseClientAutoConfiguration.class); public NacosMcpSseClientAutoConfiguration() { } @Bean public NamingService nacosNamingService(NacosMcpRegistryProperties nacosMcpRegistryProperties) { Properties nacosProperties = nacosMcpRegistryProperties.getNacosProperties(); try { return NamingFactory._createNamingService_(nacosProperties); } catch (NacosException e) { throw new RuntimeException(e); } } @Bean(name = "server2NamedTransport") public Map server2NamedTransport( NacosMcpSseClientProperties nacosMcpSseClientProperties, NamingService namingService, ObjectProvider webClientBuilderProvider, ObjectProvider objectMapperProvider) { Map server2NamedTransport = new HashMap(); WebClient.Builder webClientBuilderTemplate = (WebClient.Builder) webClientBuilderProvider .getIfAvailable(WebClient::_builder_); ObjectMapper objectMapper = (ObjectMapper) objectMapperProvider.getIfAvailable(ObjectMapper::new); Map connections = nacosMcpSseClientProperties.getConnections(); connections.forEach((serviceKey, serviceName) -> { try { List instances = namingService.selectInstances(serviceName, true); List namedTransports = new ArrayList(); for (Instance instance : instances) { String url = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":" + instance.getPort(); WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(url); WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper); namedTransports .add(new NamedClientMcpTransport(serviceName + "-" + instance.getInstanceId(), transport)); } server2NamedTransport.put(serviceName, namedTransports); } catch (NacosException e) { _logger_.error("nacos naming service: {} error", serviceName, e); } }); return server2NamedTransport; } }
NacosMcpClientAutoConfiguration
提供和 MCP Server 进行交互的客户端
- List
- List
@AutoConfiguration(after = { NacosMcpSseClientAutoConfiguration.class, McpClientAutoConfiguration.class }) @ConditionalOnClass({ McpSchema.class }) @EnableConfigurationProperties({ McpClientCommonProperties.class }) @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "nacos-enabled" }, havingValue = "true", matchIfMissing = false) public class NacosMcpClientAutoConfiguration { public NacosMcpClientAutoConfiguration() { } private String connectedClientName(String clientName, String serverConnectionName) { return clientName + " - " + serverConnectionName; } @Bean @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC", matchIfMissing = true) public List loadbalancedMcpSyncClientList( ObjectProvider mcpSyncClientConfigurerProvider, McpClientCommonProperties commonProperties, @Qualifier("server2NamedTransport") ObjectProvider server2NamedTransportProvider, ObjectProvider namingServiceProvider) { NamingService namingService = namingServiceProvider.getObject(); McpSyncClientConfigurer mcpSyncClientConfigurer = mcpSyncClientConfigurerProvider.getObject(); List loadbalancedMcpSyncClients = new ArrayList(); Map server2NamedTransport = server2NamedTransportProvider.getObject(); for (Map.Entry entry : server2NamedTransport.entrySet()) { String serviceName = entry.getKey(); List namedTransports = entry.getValue(); List mcpSyncClients = new ArrayList(); McpSyncClient syncClient; for (NamedClientMcpTransport namedTransport : namedTransports) { McpSchema.Implementation clientInfo = new McpSchema.Implementation( this.connectedClientName(commonProperties.getName(), namedTransport.name()), commonProperties.getVersion()); McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport()) .clientInfo(clientInfo) .requestTimeout(commonProperties.getRequestTimeout()); syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec); syncClient = syncSpec.build(); if (commonProperties.isInitialized()) { syncClient.initialize(); } mcpSyncClients.add(syncClient); } LoadbalancedMcpSyncClient loadbalancedMcpSyncClient = LoadbalancedMcpSyncClient._builder_() .serviceName(serviceName) .mcpSyncClientList(mcpSyncClients) .namingService(namingService) .build(); loadbalancedMcpSyncClient.subscribe(); loadbalancedMcpSyncClients.add(loadbalancedMcpSyncClient); } return loadbalancedMcpSyncClients; } @Bean @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC") public List loadbalancedMcpAsyncClientList( ObjectProvider mcpAsyncClientConfigurerProvider, McpClientCommonProperties commonProperties, @Qualifier("server2NamedTransport") ObjectProvider server2NamedTransportProvider, ObjectProvider namingServiceProvider) { NamingService namingService = namingServiceProvider.getObject(); McpAsyncClientConfigurer mcpAsyncClientConfigurer = mcpAsyncClientConfigurerProvider.getObject(); List loadbalancedMcpAsyncClients = new ArrayList(); Map server2NamedTransport = server2NamedTransportProvider.getObject(); for (Map.Entry entry : server2NamedTransport.entrySet()) { String serviceName = entry.getKey(); List namedTransports = entry.getValue(); List mcpAsyncClients = new ArrayList(); McpAsyncClient asyncClient; for (NamedClientMcpTransport namedTransport : namedTransports) { McpSchema.Implementation clientInfo = new McpSchema.Implementation( this.connectedClientName(commonProperties.getName(), namedTransport.name()), commonProperties.getVersion()); McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport()) .clientInfo(clientInfo) .requestTimeout(commonProperties.getRequestTimeout()); asyncSpec = mcpAsyncClientConfigurer.configure(namedTransport.name(), asyncSpec); asyncClient = asyncSpec.build(); if (commonProperties.isInitialized()) { asyncClient.initialize().block(); } mcpAsyncClients.add(asyncClient); } LoadbalancedMcpAsyncClient loadbalancedMcpAsyncClient = LoadbalancedMcpAsyncClient._builder_() .serviceName(serviceName) .mcpAsyncClientList(mcpAsyncClients) .namingService(namingService) .build(); loadbalancedMcpAsyncClient.subscribe(); loadbalancedMcpAsyncClients.add(loadbalancedMcpAsyncClient); } return loadbalancedMcpAsyncClients; } }
Client 端部分
LoadbalancedMcpAsyncClient
各字段含义:
- String serviceName:MCP Server 注册的服务名称
- List mcpAsyncClientList:对应的多节点客户端
- NamingService namingService:Nacos 服务
- List instances:Nacos 中 MCP Server 的实例列表
其余方法的使用和 McpAsyncClient 保持一致,已经全面封装好了
- 读操作:通过 getMcpAsyncClient()方法轮询得到 McpAsyncClient 列表
- 写操作:对所有 List 进行操作
通过实现 EventListener 接口,动态增加 or 减少 McpAsyncClient
public class LoadbalancedMcpAsyncClient implements EventListener { private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class); private final String serviceName; private final List mcpAsyncClientList; private final AtomicInteger currentIndex = new AtomicInteger(0); private final NamingService namingService; private List instances; public LoadbalancedMcpAsyncClient(String serviceName, List mcpAsyncClientList, NamingService namingService) { Assert._notNull_(serviceName, "serviceName cannot be null"); Assert._notNull_(mcpAsyncClientList, "mcpAsyncClientList cannot be null"); Assert._notNull_(namingService, "namingService cannot be null"); this.serviceName = serviceName; this.mcpAsyncClientList = mcpAsyncClientList; try { this.namingService = namingService; this.instances = namingService.selectInstances(serviceName, true); } catch (NacosException e) { throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName)); } } public void subscribe() { try { this.namingService.subscribe(this.serviceName, this); } catch (NacosException e) { throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName)); } } public String getServiceName() { return serviceName; } public List getMcpAsyncClientList() { return mcpAsyncClientList; } public NamingService getNamingService() { return this.namingService; } public List getInstances() { return this.instances; } private McpAsyncClient getMcpAsyncClient() { if (mcpAsyncClientList.isEmpty()) { throw new IllegalStateException("No McpAsyncClient available"); } int index = currentIndex.getAndIncrement() % mcpAsyncClientList.size(); return mcpAsyncClientList.get(index); } // ------------------------------------------------------------------------------------------------------------------------------------------------ public McpSchema.ServerCapabilities getServerCapabilities() { return getMcpAsyncClient().getServerCapabilities(); } public McpSchema.Implementation getServerInfo() { return getMcpAsyncClient().getServerInfo(); } public boolean isInitialized() { return getMcpAsyncClient().isInitialized(); } public McpSchema.ClientCapabilities getClientCapabilities() { return getMcpAsyncClient().getClientCapabilities(); } public McpSchema.Implementation getClientInfo() { return getMcpAsyncClient().getClientInfo(); } public void close() { Iterator iterator = mcpAsyncClientList.iterator(); while (iterator.hasNext()) { McpAsyncClient mcpAsyncClient = iterator.next(); mcpAsyncClient.close(); iterator.remove(); _logger_.info("Closed and removed McpSyncClient: {}", mcpAsyncClient.getClientInfo().name()); } } public Mono closeGracefully() { Iterator iterator = mcpAsyncClientList.iterator(); List closeMonos = new ArrayList(); while (iterator.hasNext()) { McpAsyncClient mcpAsyncClient = iterator.next(); Mono voidMono = mcpAsyncClient.closeGracefully().doOnSuccess(v -> { iterator.remove(); _logger_.info("Closed and removed McpAsyncClient: {}", mcpAsyncClient.getClientInfo().name()); }); closeMonos.add(voidMono); } return Mono._when_(closeMonos); } public Mono ping() { return getMcpAsyncClient().ping(); } public Mono addRoot(McpSchema.Root root) { return Mono._when_(mcpAsyncClientList.stream() .map(mcpAsyncClient -> mcpAsyncClient.addRoot(root)) .collect(Collectors._toList_())); } public Mono removeRoot(String rootUri) { return Mono._when_(mcpAsyncClientList.stream() .map(mcpAsyncClient -> mcpAsyncClient.removeRoot(rootUri)) .collect(Collectors._toList_())); } public Mono rootsListChangedNotification() { return Mono._when_(mcpAsyncClientList.stream() .map(McpAsyncClient::rootsListChangedNotification) .collect(Collectors._toList_())); } public Mono callTool(McpSchema.CallToolRequest callToolRequest) { return getMcpAsyncClient().callTool(callToolRequest); } public Mono listTools() { return getMcpAsyncClient().listTools(); } public Mono listTools(String cursor) { return getMcpAsyncClient().listTools(cursor); } public Mono listResources() { return getMcpAsyncClient().listResources(); } public Mono listResources(String cursor) { return getMcpAsyncClient().listResources(cursor); } public Mono readResource(McpSchema.Resource resource) { return getMcpAsyncClient().readResource(resource); } public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) { return getMcpAsyncClient().readResource(readResourceRequest); } public Mono listResourceTemplates() { return getMcpAsyncClient().listResourceTemplates(); } public Mono listResourceTemplates(String cursor) { return getMcpAsyncClient().listResourceTemplates(cursor); } public Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest) { return Mono._when_(mcpAsyncClientList.stream() .map(mcpAsyncClient -> mcpAsyncClient.subscribeResource(subscribeRequest)) .collect(Collectors._toList_())); } public Mono unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) { return Mono._when_(mcpAsyncClientList.stream() .map(mcpAsyncClient -> mcpAsyncClient.unsubscribeResource(unsubscribeRequest)) .collect(Collectors._toList_())); } public Mono listPrompts() { return getMcpAsyncClient().listPrompts(); } public Mono listPrompts(String cursor) { return getMcpAsyncClient().listPrompts(cursor); } public Mono getPrompt(McpSchema.GetPromptRequest getPromptRequest) { return getMcpAsyncClient().getPrompt(getPromptRequest); } public Mono setLoggingLevel(McpSchema.LoggingLevel loggingLevel) { return Mono._when_(mcpAsyncClientList.stream() .map(mcpAsyncClient -> mcpAsyncClient.setLoggingLevel(loggingLevel)) .collect(Collectors._toList_())); } // ------------------------------------------------------------------------------------------------------------------------------------------------ @Override public void onEvent(Event event) { if (event instanceof NamingEvent namingEvent) { if (this.serviceName.equals(namingEvent.getServiceName())) { _logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName()); List instances = namingEvent.getInstances(); _logger_.info("Updated instances count: {}", instances.size()); // 打印每个实例的详细信息 instances.forEach(instance -> { _logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(), instance.getPort(), instance.isHealthy(), instance.isEnabled(), JacksonUtils._toJson_(instance.getMetadata())); }); updateClientList(instances); } } } private void updateClientList(List currentInstances) { McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class); McpAsyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder ._getBean_(McpAsyncClientConfigurer.class); ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class); WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class); // 移除的实例列表 List removeInstances = instances.stream() .filter(instance -> !currentInstances.contains(instance)) .collect(Collectors._toList_()); // 新增的实例列表 List addInstances = currentInstances.stream() .filter(instance -> !instances.contains(instance)) .collect(Collectors._toList_()); // 删除McpAsyncClient实例 List clientInfoNames = removeInstances.stream() .map(instance -> connectedClientName(commonProperties.getName(), this.serviceName + "-" + instance.getInstanceId())) .toList(); Iterator iterator = mcpAsyncClientList.iterator(); while (iterator.hasNext()) { McpAsyncClient mcpAsyncClient = iterator.next(); McpSchema.Implementation clientInfo = mcpAsyncClient.getClientInfo(); if (clientInfoNames.contains(clientInfo.name())) { _logger_.info("Removing McpAsyncClient: {}", clientInfo.name()); mcpAsyncClient.closeGracefully().subscribe(v -> { iterator.remove(); }, e -> _logger_.error("Failed to remove McpAsyncClient: {}", clientInfo.name(), e)); } } // 新增McpAsyncClient实例 McpAsyncClient asyncClient; for (Instance instance : addInstances) { String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":" + instance.getPort(); WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl); WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper); NamedClientMcpTransport namedTransport = new NamedClientMcpTransport( serviceName + "-" + instance.getInstanceId(), transport); McpSchema.Implementation clientInfo = new McpSchema.Implementation( this.connectedClientName(commonProperties.getName(), namedTransport.name()), commonProperties.getVersion()); McpClient.AsyncSpec asyncSpec = McpClient._async_(namedTransport.transport()) .clientInfo(clientInfo) .requestTimeout(commonProperties.getRequestTimeout()); asyncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), asyncSpec); asyncClient = asyncSpec.build(); if (commonProperties.isInitialized()) { asyncClient.initialize().block(); } _logger_.info("Added McpAsyncClient: {}", clientInfo.name()); mcpAsyncClientList.add(asyncClient); } private String connectedClientName(String clientName, String serverConnectionName) { return clientName + " - " + serverConnectionName; } public static Builder builder() { return new Builder(); } public static class Builder { private String serviceName; private List mcpAsyncClientList; private NamingService namingService; public Builder serviceName(String serviceName) { this.serviceName = serviceName; return this; } public Builder mcpAsyncClientList(List mcpAsyncClientList) { this.mcpAsyncClientList = mcpAsyncClientList; return this; } public Builder namingService(NamingService namingService) { this.namingService = namingService; return this; } public LoadbalancedMcpAsyncClient build() { return new LoadbalancedMcpAsyncClient(this.serviceName, this.mcpAsyncClientList, this.namingService); } } }
LoadbalancedMcpSyncClient
同上
public class LoadbalancedMcpSyncClient implements EventListener { private static final Logger _logger _= LoggerFactory._getLogger_(LoadbalancedMcpAsyncClient.class); private final String serviceName; private final List mcpSyncClientList; private final AtomicInteger currentIndex = new AtomicInteger(0); private final NamingService namingService; private List instances; public LoadbalancedMcpSyncClient(String serviceName, List mcpSyncClientList, NamingService namingService) { Assert._notNull_(serviceName, "Service name must not be null"); Assert._notNull_(mcpSyncClientList, "McpSyncClient list must not be null"); Assert._notNull_(namingService, "NamingService must not be null"); this.serviceName = serviceName; this.mcpSyncClientList = mcpSyncClientList; try { this.namingService = namingService; this.instances = namingService.selectInstances(serviceName, true); } catch (NacosException e) { throw new RuntimeException(String._format_("Failed to get instances for service: %s", serviceName)); } } public void subscribe() { try { this.namingService.subscribe(this.serviceName, this); } catch (NacosException e) { throw new RuntimeException(String._format_("Failed to subscribe to service: %s", this.serviceName)); } } public String getServiceName() { return this.serviceName; } public List getMcpSyncClientList() { return this.mcpSyncClientList; } public NamingService getNamingService() { return this.namingService; } public List getInstances() { return this.instances; } private McpSyncClient getMcpSyncClient() { if (mcpSyncClientList.isEmpty()) { throw new IllegalStateException("No McpAsyncClient available"); } int index = currentIndex.getAndIncrement() % mcpSyncClientList.size(); return mcpSyncClientList.get(index); } // ------------------------------------------------------------------------------------------------------------------------------------------------ public McpSchema.ServerCapabilities getServerCapabilities() { return getMcpSyncClient().getServerCapabilities(); } public McpSchema.Implementation getServerInfo() { return getMcpSyncClient().getServerInfo(); } public McpSchema.ClientCapabilities getClientCapabilities() { return getMcpSyncClient().getClientCapabilities(); } public McpSchema.Implementation getClientInfo() { return getMcpSyncClient().getClientInfo(); } public void close() { Iterator iterator = mcpSyncClientList.iterator(); while (iterator.hasNext()) { McpSyncClient mcpSyncClient = iterator.next(); mcpSyncClient.close(); iterator.remove(); _logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name()); } } public boolean closeGracefully() { List flagList = new ArrayList(); Iterator iterator = mcpSyncClientList.iterator(); while (iterator.hasNext()) { McpSyncClient mcpSyncClient = iterator.next(); boolean flag = mcpSyncClient.closeGracefully(); flagList.add(flag); if (flag) { iterator.remove(); _logger_.info("Closed and removed McpSyncClient: {}", mcpSyncClient.getClientInfo().name()); } } return !flagList.stream().allMatch(flag -> flag); } public Object ping() { return getMcpSyncClient().ping(); } public void addRoot(McpSchema.Root root) { for (McpSyncClient mcpSyncClient : mcpSyncClientList) { mcpSyncClient.addRoot(root); } } public void removeRoot(String rootUri) { for (McpSyncClient mcpSyncClient : mcpSyncClientList) { mcpSyncClient.removeRoot(rootUri); } } public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) { return getMcpSyncClient().callTool(callToolRequest); } public McpSchema.ListToolsResult listTools() { return getMcpSyncClient().listTools(); } public McpSchema.ListToolsResult listTools(String cursor) { return getMcpSyncClient().listTools(cursor); } public McpSchema.ListResourcesResult listResources(String cursor) { return getMcpSyncClient().listResources(cursor); } public McpSchema.ListResourcesResult listResources() { return getMcpSyncClient().listResources(); } public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) { return getMcpSyncClient().readResource(resource); } public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) { return getMcpSyncClient().readResource(readResourceRequest); } public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) { return getMcpSyncClient().listResourceTemplates(cursor); } public McpSchema.ListResourceTemplatesResult listResourceTemplates() { return getMcpSyncClient().listResourceTemplates(); } public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) { for (McpSyncClient mcpSyncClient : mcpSyncClientList) { mcpSyncClient.subscribeResource(subscribeRequest); } } public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) { for (McpSyncClient mcpSyncClient : mcpSyncClientList) { mcpSyncClient.unsubscribeResource(unsubscribeRequest); } } public McpSchema.ListPromptsResult listPrompts(String cursor) { return getMcpSyncClient().listPrompts(cursor); } public McpSchema.ListPromptsResult listPrompts() { return getMcpSyncClient().listPrompts(); } public McpSchema.GetPromptResult getPrompt(McpSchema.GetPromptRequest getPromptRequest) { return getMcpSyncClient().getPrompt(getPromptRequest); } public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) { for (McpSyncClient mcpSyncClient : mcpSyncClientList) { mcpSyncClient.setLoggingLevel(loggingLevel); } } // ------------------------------------------------------------------------------------------------------------------------------------------------ @Override public void onEvent(Event event) { if (event instanceof NamingEvent namingEvent) { if (this.serviceName.equals(namingEvent.getServiceName())) { _logger_.info("Received service instance change event for service: {}", namingEvent.getServiceName()); List instances = namingEvent.getInstances(); _logger_.info("Updated instances count: {}", instances.size()); // 打印每个实例的详细信息 instances.forEach(instance -> { _logger_.info("Instance: {}:{} (Healthy: {}, Enabled: {}, Metadata: {})", instance.getIp(), instance.getPort(), instance.isHealthy(), instance.isEnabled(), JacksonUtils._toJson_(instance.getMetadata())); }); updateClientList(instances); } } } private void updateClientList(List currentInstances) { McpClientCommonProperties commonProperties = ApplicationContextHolder._getBean_(McpClientCommonProperties.class); McpSyncClientConfigurer mcpSyncClientConfigurer = ApplicationContextHolder ._getBean_(McpSyncClientConfigurer.class); ObjectMapper objectMapper = ApplicationContextHolder._getBean_(ObjectMapper.class); WebClient.Builder webClientBuilderTemplate = ApplicationContextHolder._getBean_(WebClient.Builder.class); // 移除的实例列表 List removeInstances = instances.stream() .filter(instance -> !currentInstances.contains(instance)) .collect(Collectors._toList_()); // 新增的实例列表 List addInstances = currentInstances.stream() .filter(instance -> !instances.contains(instance)) .collect(Collectors._toList_()); // 删除McpSyncClient实例 List clientInfoNames = removeInstances.stream() .map(instance -> connectedClientName(commonProperties.getName(), this.serviceName + "-" + instance.getInstanceId())) .toList(); Iterator iterator = mcpSyncClientList.iterator(); while (iterator.hasNext()) { McpSyncClient mcpSyncClient = iterator.next(); McpSchema.Implementation clientInfo = mcpSyncClient.getClientInfo(); if (clientInfoNames.contains(clientInfo.name())) { _logger_.info("Removing McpsyncClient: {}", clientInfo.name()); if (mcpSyncClient.closeGracefully()) { iterator.remove(); } else { _logger_.warn("Failed to remove mcpSyncClient: {}", clientInfo.name()); } } } // 新增McpSyncClient实例 McpSyncClient syncClient; for (Instance instance : addInstances) { String baseUrl = instance.getMetadata().getOrDefault("scheme", "http") + "://" + instance.getIp() + ":" + instance.getPort(); WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(baseUrl); WebFluxSseClientTransport transport = new WebFluxSseClientTransport(webClientBuilder, objectMapper); NamedClientMcpTransport namedTransport = new NamedClientMcpTransport( serviceName + "-" + instance.getInstanceId(), transport); McpSchema.Implementation clientInfo = new McpSchema.Implementation( this.connectedClientName(commonProperties.getName(), namedTransport.name()), commonProperties.getVersion()); McpClient.SyncSpec syncSpec = McpClient._sync_(namedTransport.transport()) .clientInfo(clientInfo) .requestTimeout(commonProperties.getRequestTimeout()); syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec); syncClient = syncSpec.build(); if (commonProperties.isInitialized()) { syncClient.initialize(); } _logger_.info("Added McpAsyncClient: {}", clientInfo.name()); mcpSyncClientList.add(syncClient); } this.instances = currentInstances; } private String connectedClientName(String clientName, String serverConnectionName) { return clientName + " - " + serverConnectionName; } public static Builder builder() { return new Builder(); } public static class Builder { private String serviceName; private List mcpSyncClientList; private NamingService namingService; public Builder serviceName(String serviceName) { this.serviceName = serviceName; return this; } public Builder mcpSyncClientList(List mcpSyncClientList) { this.mcpSyncClientList = mcpSyncClientList; return this; } public Builder namingService(NamingService namingService) { this.namingService = namingService; return this; } public LoadbalancedMcpSyncClient build() { return new LoadbalancedMcpSyncClient(this.serviceName, this.mcpSyncClientList, this.namingService); } } }
工具类
ApplicationContextHolder
@Component public class ApplicationContextHolder implements ApplicationContextAware { private static ApplicationContext _applicationContext_; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { ApplicationContextHolder._applicationContext _= applicationContext; } public static T getBean(Class clazz) { return _applicationContext_.getBean(clazz); } }
效果演示
我在 nacos 中,注册了 MCP Server 服务,部署两个节点
- 同一台机器以不同端口号启动的 MCP Server 服务,分别是 19000、19001,注册在 Nacos 中以 mcp-server-provider 为服务名
yml 配置如下
server: port: 8080 spring: application: name: mcp-client-webflux _ _ai: alibaba: mcp: nacos: enabled: true server-addr: 127.0.0.1:8848 username: nacos password: nacos client: sse: connections: nacos-server1: mcp-server-provider mcp: client: enabled: true name: mcp-client-webflux version: 0.0.1 initialized: true request-timeout: 600s nacos-enabled: true
我们能发现已经成功注入 LoadbalancedMcpSyncClient 类,其中 mcp-server-provider 有两个实例,对应的两个 McpSyncClient
我们停掉其中的 MCP Server19001 端口的服务,通过 removeInstances 获取移除的实例列表,同步在 mcpSyncClientList 移除对应的 McpSyncClient
我们再新启动 MCP Server19001 端口的服务,通过 addInstances 获取新增的实例列表,同步在 mcpSyncClientList 新增对应的 McpSyncClient