zhou zhou
15 小时以前 3d81df739dc45599c257d8cdefe0996f66ccdeae
rsf-server/src/main/java/com/vincent/rsf/server/ai/service/impl/AiMcpMountServiceImpl.java
@@ -1,10 +1,12 @@
package com.vincent.rsf.server.ai.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vincent.rsf.framework.exception.CoolException;
import com.vincent.rsf.server.ai.config.AiDefaults;
import com.vincent.rsf.server.ai.dto.AiMcpConnectivityTestDto;
import com.vincent.rsf.server.ai.dto.AiMcpToolPreviewDto;
import com.vincent.rsf.server.ai.dto.AiMcpToolTestDto;
import com.vincent.rsf.server.ai.dto.AiMcpToolTestRequest;
@@ -22,6 +24,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -34,45 +37,83 @@
    private final ObjectMapper objectMapper;
    @Override
    public List<AiMcpMount> listActiveMounts() {
    public List<AiMcpMount> listActiveMounts(Long tenantId) {
        ensureTenantId(tenantId);
        return this.list(new LambdaQueryWrapper<AiMcpMount>()
                .eq(AiMcpMount::getTenantId, tenantId)
                .eq(AiMcpMount::getStatus, StatusType.ENABLE.val)
                .eq(AiMcpMount::getDeleted, 0)
                .orderByAsc(AiMcpMount::getSort)
                .orderByAsc(AiMcpMount::getId));
    }
    @Override
    public void validateBeforeSave(AiMcpMount aiMcpMount) {
    public void validateBeforeSave(AiMcpMount aiMcpMount, Long tenantId) {
        ensureTenantId(tenantId);
        aiMcpMount.setTenantId(tenantId);
        fillDefaults(aiMcpMount);
        ensureRequiredFields(aiMcpMount);
        ensureRequiredFields(aiMcpMount, tenantId);
    }
    @Override
    public void validateBeforeUpdate(AiMcpMount aiMcpMount) {
    public void validateBeforeUpdate(AiMcpMount aiMcpMount, Long tenantId) {
        ensureTenantId(tenantId);
        fillDefaults(aiMcpMount);
        if (aiMcpMount.getId() == null) {
            throw new CoolException("MCP 挂载 ID 不能为空");
        }
        ensureRequiredFields(aiMcpMount);
        AiMcpMount current = requireMount(aiMcpMount.getId(), tenantId);
        aiMcpMount.setTenantId(current.getTenantId());
        ensureRequiredFields(aiMcpMount, tenantId);
    }
    @Override
    public List<AiMcpToolPreviewDto> previewTools(Long mountId, Long userId, Long tenantId) {
        AiMcpMount mount = requireMount(mountId);
        AiMcpMount mount = requireMount(mountId, tenantId);
        long startedAt = System.currentTimeMillis();
        try (McpMountRuntimeFactory.McpMountRuntime runtime = mcpMountRuntimeFactory.create(List.of(mount), userId)) {
            List<AiMcpToolPreviewDto> tools = new ArrayList<>();
            for (ToolCallback callback : runtime.getToolCallbacks()) {
                if (callback == null || callback.getToolDefinition() == null) {
                    continue;
                }
                tools.add(AiMcpToolPreviewDto.builder()
                        .name(callback.getToolDefinition().name())
                        .description(callback.getToolDefinition().description())
                        .inputSchema(callback.getToolDefinition().inputSchema())
                        .returnDirect(callback.getToolMetadata() == null ? null : callback.getToolMetadata().returnDirect())
                        .build());
            List<AiMcpToolPreviewDto> tools = buildToolPreviewDtos(runtime.getToolCallbacks());
            if (!runtime.getErrors().isEmpty()) {
                String message = String.join(";", runtime.getErrors());
                updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_UNHEALTHY, message, System.currentTimeMillis() - startedAt);
                throw new CoolException(message);
            }
            updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_HEALTHY,
                    "工具解析成功,共 " + tools.size() + " 个工具", System.currentTimeMillis() - startedAt);
            return tools;
        } catch (CoolException e) {
            throw e;
        } catch (Exception e) {
            updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_UNHEALTHY,
                    "工具解析失败: " + e.getMessage(), System.currentTimeMillis() - startedAt);
            throw new CoolException("获取工具列表失败: " + e.getMessage());
        }
    }
    @Override
    public AiMcpConnectivityTestDto testConnectivity(Long mountId, Long userId, Long tenantId) {
        AiMcpMount mount = requireMount(mountId, tenantId);
        long startedAt = System.currentTimeMillis();
        try (McpMountRuntimeFactory.McpMountRuntime runtime = mcpMountRuntimeFactory.create(List.of(mount), userId)) {
            long elapsedMs = System.currentTimeMillis() - startedAt;
            if (!runtime.getErrors().isEmpty()) {
                String message = String.join(";", runtime.getErrors());
                updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_UNHEALTHY, message, elapsedMs);
                AiMcpMount latest = requireMount(mount.getId(), tenantId);
                return buildConnectivityDto(latest, message, elapsedMs, runtime.getToolCallbacks().length);
            }
            String message = "连通性测试成功,解析出 " + runtime.getToolCallbacks().length + " 个工具";
            updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_HEALTHY, message, elapsedMs);
            AiMcpMount latest = requireMount(mount.getId(), tenantId);
            return buildConnectivityDto(latest, message, elapsedMs, runtime.getToolCallbacks().length);
        } catch (CoolException e) {
            throw e;
        } catch (Exception e) {
            long elapsedMs = System.currentTimeMillis() - startedAt;
            String message = "连通性测试失败: " + e.getMessage();
            updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_UNHEALTHY, message, elapsedMs);
            AiMcpMount latest = requireMount(mount.getId(), tenantId);
            return buildConnectivityDto(latest, message, elapsedMs, 0);
        }
    }
@@ -98,7 +139,8 @@
        } catch (Exception e) {
            throw new CoolException("工具输入 JSON 格式错误: " + e.getMessage());
        }
        AiMcpMount mount = requireMount(mountId);
        AiMcpMount mount = requireMount(mountId, tenantId);
        long startedAt = System.currentTimeMillis();
        try (McpMountRuntimeFactory.McpMountRuntime runtime = mcpMountRuntimeFactory.create(List.of(mount), userId)) {
            ToolCallback callback = Arrays.stream(runtime.getToolCallbacks())
                    .filter(item -> item != null && item.getToolDefinition() != null)
@@ -109,11 +151,21 @@
                    request.getInputJson(),
                    new ToolContext(Map.of("userId", userId, "tenantId", tenantId, "mountId", mountId))
            );
            updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_HEALTHY,
                    "工具测试成功: " + request.getToolName(), System.currentTimeMillis() - startedAt);
            return AiMcpToolTestDto.builder()
                    .toolName(request.getToolName())
                    .inputJson(request.getInputJson())
                    .output(output)
                    .build();
        } catch (CoolException e) {
            updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_UNHEALTHY,
                    "工具测试失败: " + e.getMessage(), System.currentTimeMillis() - startedAt);
            throw e;
        } catch (Exception e) {
            updateHealthStatus(mount.getId(), AiDefaults.MCP_HEALTH_UNHEALTHY,
                    "工具测试失败: " + e.getMessage(), System.currentTimeMillis() - startedAt);
            throw new CoolException("工具测试失败: " + e.getMessage());
        }
    }
@@ -130,15 +182,18 @@
        if (aiMcpMount.getStatus() == null) {
            aiMcpMount.setStatus(StatusType.ENABLE.val);
        }
        if (!StringUtils.hasText(aiMcpMount.getHealthStatus())) {
            aiMcpMount.setHealthStatus(AiDefaults.MCP_HEALTH_NOT_TESTED);
        }
    }
    private void ensureRequiredFields(AiMcpMount aiMcpMount) {
    private void ensureRequiredFields(AiMcpMount aiMcpMount, Long tenantId) {
        if (!StringUtils.hasText(aiMcpMount.getName())) {
            throw new CoolException("MCP 挂载名称不能为空");
        }
        if (AiDefaults.MCP_TRANSPORT_BUILTIN.equals(aiMcpMount.getTransportType())) {
            builtinMcpToolRegistry.validateBuiltinCode(aiMcpMount.getBuiltinCode());
            ensureBuiltinConflictFree(aiMcpMount);
            ensureBuiltinConflictFree(aiMcpMount, tenantId);
            return;
        }
        if (AiDefaults.MCP_TRANSPORT_SSE_HTTP.equals(aiMcpMount.getTransportType())) {
@@ -156,18 +211,23 @@
        throw new CoolException("不支持的 MCP 传输类型: " + aiMcpMount.getTransportType());
    }
    private AiMcpMount requireMount(Long mountId) {
    private AiMcpMount requireMount(Long mountId, Long tenantId) {
        ensureTenantId(tenantId);
        if (mountId == null) {
            throw new CoolException("MCP 挂载 ID 不能为空");
        }
        AiMcpMount mount = this.getById(mountId);
        if (mount == null || (mount.getDeleted() != null && mount.getDeleted() == 1)) {
        AiMcpMount mount = this.getOne(new LambdaQueryWrapper<AiMcpMount>()
                .eq(AiMcpMount::getId, mountId)
                .eq(AiMcpMount::getTenantId, tenantId)
                .eq(AiMcpMount::getDeleted, 0)
                .last("limit 1"));
        if (mount == null) {
            throw new CoolException("MCP 挂载不存在");
        }
        return mount;
    }
    private void ensureBuiltinConflictFree(AiMcpMount aiMcpMount) {
    private void ensureBuiltinConflictFree(AiMcpMount aiMcpMount, Long tenantId) {
        if (aiMcpMount.getStatus() == null || aiMcpMount.getStatus() != StatusType.ENABLE.val) {
            return;
        }
@@ -176,8 +236,10 @@
            return;
        }
        LambdaQueryWrapper<AiMcpMount> queryWrapper = new LambdaQueryWrapper<AiMcpMount>()
                .eq(AiMcpMount::getTenantId, tenantId)
                .eq(AiMcpMount::getTransportType, AiDefaults.MCP_TRANSPORT_BUILTIN)
                .eq(AiMcpMount::getStatus, StatusType.ENABLE.val)
                .eq(AiMcpMount::getDeleted, 0)
                .in(AiMcpMount::getBuiltinCode, conflictCodes);
        if (aiMcpMount.getId() != null) {
            queryWrapper.ne(AiMcpMount::getId, aiMcpMount.getId());
@@ -205,4 +267,47 @@
        }
        return codes;
    }
    private void ensureTenantId(Long tenantId) {
        if (tenantId == null) {
            throw new CoolException("当前租户不存在");
        }
    }
    private List<AiMcpToolPreviewDto> buildToolPreviewDtos(ToolCallback[] callbacks) {
        List<AiMcpToolPreviewDto> tools = new ArrayList<>();
        for (ToolCallback callback : callbacks) {
            if (callback == null || callback.getToolDefinition() == null) {
                continue;
            }
            tools.add(AiMcpToolPreviewDto.builder()
                    .name(callback.getToolDefinition().name())
                    .description(callback.getToolDefinition().description())
                    .inputSchema(callback.getToolDefinition().inputSchema())
                    .returnDirect(callback.getToolMetadata() == null ? null : callback.getToolMetadata().returnDirect())
                    .build());
        }
        return tools;
    }
    private void updateHealthStatus(Long mountId, String healthStatus, String message, Long initElapsedMs) {
        this.update(new LambdaUpdateWrapper<AiMcpMount>()
                .eq(AiMcpMount::getId, mountId)
                .set(AiMcpMount::getHealthStatus, healthStatus)
                .set(AiMcpMount::getLastTestTime, new Date())
                .set(AiMcpMount::getLastTestMessage, message)
                .set(AiMcpMount::getLastInitElapsedMs, initElapsedMs));
    }
    private AiMcpConnectivityTestDto buildConnectivityDto(AiMcpMount mount, String message, Long initElapsedMs, Integer toolCount) {
        return AiMcpConnectivityTestDto.builder()
                .mountId(mount.getId())
                .mountName(mount.getName())
                .healthStatus(mount.getHealthStatus())
                .message(message)
                .initElapsedMs(initElapsedMs)
                .toolCount(toolCount)
                .testedAt(mount.getLastTestTime$())
                .build();
    }
}