From 51877df13075ad10ef51107f15bcd21f1661febe Mon Sep 17 00:00:00 2001
From: zhou zhou <3272660260@qq.com>
Date: 星期二, 17 三月 2026 09:48:01 +0800
Subject: [PATCH] #AI

---
 rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java |  226 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 218 insertions(+), 8 deletions(-)

diff --git a/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java b/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java
index da04faa..070bb04 100644
--- a/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java
+++ b/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java
@@ -5,12 +5,15 @@
 import com.vincent.rsf.ai.gateway.dto.GatewayChatRequest;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
@@ -23,6 +26,8 @@
 @Service
 public class AiGatewayService {
 
+    private static final Logger logger = LoggerFactory.getLogger(AiGatewayService.class);
+
     @Resource
     private AiGatewayProperties aiGatewayProperties;
     @Resource
@@ -34,7 +39,20 @@
 
     public void stream(GatewayChatRequest request, EventConsumer consumer) throws Exception {
         AiGatewayProperties.ModelConfig modelConfig = resolveModel(request);
+        logger.info("AI gateway stream start: sessionId={}, routeCode={}, attemptNo={}, requestModelCode={}, resolvedModelCode={}, provider={}",
+                request.getSessionId(),
+                request.getRouteCode(),
+                request.getAttemptNo(),
+                request.getModelCode(),
+                modelConfig == null ? null : modelConfig.getCode(),
+                modelConfig == null ? null : modelConfig.getProvider());
         if (modelConfig == null || modelConfig.getChatUrl() == null || modelConfig.getChatUrl().trim().isEmpty()) {
+            logger.info("AI gateway use mock stream: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, provider={}",
+                    request.getSessionId(),
+                    request.getRouteCode(),
+                    request.getAttemptNo(),
+                    modelConfig == null ? request.getModelCode() : modelConfig.getCode(),
+                    modelConfig == null ? "mock" : modelConfig.getProvider());
             mockStream(request, modelConfig, consumer);
             return;
         }
@@ -87,6 +105,7 @@
 
     private void mockStream(GatewayChatRequest request, AiGatewayProperties.ModelConfig modelConfig,
                             EventConsumer consumer) throws Exception {
+        long requestTime = System.currentTimeMillis();
         String modelCode = modelConfig == null ? aiGatewayProperties.getDefaultModelCode() : modelConfig.getCode();
         String lastQuestion = "";
         List<GatewayChatMessage> messages = request.getMessages();
@@ -98,22 +117,58 @@
             }
         }
         String answer = "褰撳墠涓烘紨绀烘ā寮忥紝妯″瀷[" + modelCode + "]宸叉敹鍒颁綘鐨勯棶棰橈細" + lastQuestion;
+        logger.info("AI gateway mock stream emitting response: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, answerLength={}",
+                request.getSessionId(),
+                request.getRouteCode(),
+                request.getAttemptNo(),
+                modelCode,
+                answer.length());
         for (char c : answer.toCharArray()) {
             consumer.accept(new GatewayStreamEvent()
                     .setType("delta")
                     .setModelCode(modelCode)
                     .setContent(String.valueOf(c)));
-            Thread.sleep(20L);
+            try {
+                Thread.sleep(20L);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
+            }
         }
         consumer.accept(new GatewayStreamEvent()
                 .setType("done")
-                .setModelCode(modelCode));
+                .setModelCode(modelCode)
+                .setSuccess(true)
+                .setRequestTime(requestTime)
+                .setResponseTime(System.currentTimeMillis())
+                .setDurationMs(System.currentTimeMillis() - requestTime));
+        logger.info("AI gateway mock stream completed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, durationMs={}",
+                request.getSessionId(),
+                request.getRouteCode(),
+                request.getAttemptNo(),
+                modelCode,
+                System.currentTimeMillis() - requestTime);
     }
 
     private void openAiCompatibleStream(GatewayChatRequest request, AiGatewayProperties.ModelConfig modelConfig,
                                         EventConsumer consumer) throws Exception {
         HttpURLConnection connection = null;
+        long requestTime = System.currentTimeMillis();
+        boolean terminalEventSent = false;
+        int eventLineCount = 0;
+        int deltaCount = 0;
+        int contentChars = 0;
+        boolean firstDeltaLogged = false;
+        String normalizedUrl = modelConfig == null ? null : modelConfig.getChatUrl();
         try {
+            logger.info("AI gateway opening upstream stream: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, provider={}, url={}, modelName={}",
+                    request.getSessionId(),
+                    request.getRouteCode(),
+                    request.getAttemptNo(),
+                    modelConfig == null ? null : modelConfig.getCode(),
+                    modelConfig == null ? null : modelConfig.getProvider(),
+                    normalizedUrl,
+                    modelConfig == null ? null : modelConfig.getModelName());
             connection = (HttpURLConnection) new URL(modelConfig.getChatUrl()).openConnection();
             connection.setConnectTimeout(aiGatewayProperties.getConnectTimeoutMillis());
             connection.setReadTimeout(aiGatewayProperties.getReadTimeoutMillis());
@@ -136,19 +191,49 @@
             }
 
             int statusCode = connection.getResponseCode();
+            logger.info("AI gateway upstream response received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, statusCode={}",
+                    request.getSessionId(),
+                    request.getRouteCode(),
+                    request.getAttemptNo(),
+                    modelConfig.getCode(),
+                    statusCode);
             InputStream inputStream = statusCode >= 400 ? connection.getErrorStream() : connection.getInputStream();
             if (inputStream == null) {
+                logger.warn("AI gateway upstream returned empty stream: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, statusCode={}",
+                        request.getSessionId(),
+                        request.getRouteCode(),
+                        request.getAttemptNo(),
+                        modelConfig.getCode(),
+                        normalizedUrl,
+                        statusCode);
                 consumer.accept(new GatewayStreamEvent()
                         .setType("error")
                         .setModelCode(modelConfig.getCode())
-                        .setMessage("妯″瀷鏈嶅姟鏃犲搷搴�"));
+                        .setMessage("妯″瀷鏈嶅姟鏃犲搷搴�")
+                        .setSuccess(false)
+                        .setRequestTime(requestTime)
+                        .setResponseTime(System.currentTimeMillis())
+                        .setDurationMs(System.currentTimeMillis() - requestTime));
+                terminalEventSent = true;
                 return;
             }
             if (statusCode >= 400) {
+                logger.warn("AI gateway upstream http error: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, statusCode={}",
+                        request.getSessionId(),
+                        request.getRouteCode(),
+                        request.getAttemptNo(),
+                        modelConfig.getCode(),
+                        normalizedUrl,
+                        statusCode);
                 consumer.accept(new GatewayStreamEvent()
                         .setType("error")
                         .setModelCode(modelConfig.getCode())
-                        .setMessage(readErrorMessage(inputStream, statusCode)));
+                        .setMessage(readErrorMessage(inputStream, statusCode))
+                        .setSuccess(false)
+                        .setRequestTime(requestTime)
+                        .setResponseTime(System.currentTimeMillis())
+                        .setDurationMs(System.currentTimeMillis() - requestTime));
+                terminalEventSent = true;
                 return;
             }
 
@@ -158,11 +243,27 @@
                     if (line.trim().isEmpty() || !line.startsWith("data:")) {
                         continue;
                     }
+                    eventLineCount++;
                     String payload = line.substring(5).trim();
                     if ("[DONE]".equals(payload)) {
+                        long responseTime = System.currentTimeMillis();
+                        logger.info("AI gateway upstream done marker received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, eventLines={}, deltaCount={}, contentChars={}, durationMs={}",
+                                request.getSessionId(),
+                                request.getRouteCode(),
+                                request.getAttemptNo(),
+                                modelConfig.getCode(),
+                                eventLineCount,
+                                deltaCount,
+                                contentChars,
+                                responseTime - requestTime);
                         consumer.accept(new GatewayStreamEvent()
                                 .setType("done")
-                                .setModelCode(modelConfig.getCode()));
+                                .setModelCode(modelConfig.getCode())
+                                .setSuccess(true)
+                                .setRequestTime(requestTime)
+                                .setResponseTime(responseTime)
+                                .setDurationMs(responseTime - requestTime));
+                        terminalEventSent = true;
                         break;
                     }
                     JsonNode root = objectMapper.readTree(payload);
@@ -170,29 +271,117 @@
                     JsonNode delta = choice.path("delta");
                     JsonNode contentNode = delta.path("content");
                     if (!contentNode.isMissingNode() && !contentNode.isNull()) {
+                        String content = contentNode.asText();
+                        deltaCount++;
+                        contentChars += content.length();
+                        if (!firstDeltaLogged) {
+                            logger.info("AI gateway upstream first delta received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, afterMs={}, sampleLength={}",
+                                    request.getSessionId(),
+                                    request.getRouteCode(),
+                                    request.getAttemptNo(),
+                                    modelConfig.getCode(),
+                                    System.currentTimeMillis() - requestTime,
+                                    content.length());
+                            firstDeltaLogged = true;
+                        }
                         consumer.accept(new GatewayStreamEvent()
                                 .setType("delta")
                                 .setModelCode(modelConfig.getCode())
-                                .setContent(contentNode.asText()));
+                                .setContent(content));
                     }
                     JsonNode finishReason = choice.path("finish_reason");
                     if (!finishReason.isMissingNode() && !finishReason.isNull()) {
+                        long responseTime = System.currentTimeMillis();
+                        logger.info("AI gateway upstream finish_reason received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, finishReason={}, eventLines={}, deltaCount={}, contentChars={}, durationMs={}",
+                                request.getSessionId(),
+                                request.getRouteCode(),
+                                request.getAttemptNo(),
+                                modelConfig.getCode(),
+                                finishReason.asText(),
+                                eventLineCount,
+                                deltaCount,
+                                contentChars,
+                                responseTime - requestTime);
                         consumer.accept(new GatewayStreamEvent()
                                 .setType("done")
-                                .setModelCode(modelConfig.getCode()));
+                                .setModelCode(modelConfig.getCode())
+                                .setSuccess(true)
+                                .setRequestTime(requestTime)
+                                .setResponseTime(responseTime)
+                                .setDurationMs(responseTime - requestTime));
+                        terminalEventSent = true;
                         break;
                     }
                 }
             }
+            if (!terminalEventSent) {
+                long responseTime = System.currentTimeMillis();
+                logger.warn("AI gateway upstream ended without terminal event: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, eventLines={}, deltaCount={}, contentChars={}, durationMs={}",
+                        request.getSessionId(),
+                        request.getRouteCode(),
+                        request.getAttemptNo(),
+                        modelConfig.getCode(),
+                        normalizedUrl,
+                        eventLineCount,
+                        deltaCount,
+                        contentChars,
+                        responseTime - requestTime);
+                consumer.accept(new GatewayStreamEvent()
+                        .setType("error")
+                        .setModelCode(modelConfig.getCode())
+                        .setMessage("妯″瀷娴佸紓甯镐腑鏂�")
+                        .setSuccess(false)
+                        .setRequestTime(requestTime)
+                        .setResponseTime(responseTime)
+                        .setDurationMs(responseTime - requestTime));
+            }
         } catch (Exception e) {
+            if (isInterruptedError(e)) {
+                logger.warn("AI gateway upstream interrupted: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, stage={}, message={}",
+                        request.getSessionId(),
+                        request.getRouteCode(),
+                        request.getAttemptNo(),
+                        modelConfig == null ? null : modelConfig.getCode(),
+                        normalizedUrl,
+                        terminalEventSent ? "after_terminal" : "streaming",
+                        e.getMessage());
+                if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+                    Thread.currentThread().interrupt();
+                }
+                return;
+            }
+            logger.error("AI gateway upstream exception: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, eventLines={}, deltaCount={}, contentChars={}, message={}",
+                    request.getSessionId(),
+                    request.getRouteCode(),
+                    request.getAttemptNo(),
+                    modelConfig == null ? null : modelConfig.getCode(),
+                    normalizedUrl,
+                    eventLineCount,
+                    deltaCount,
+                    contentChars,
+                    e.getMessage(),
+                    e);
             consumer.accept(new GatewayStreamEvent()
                     .setType("error")
                     .setModelCode(modelConfig.getCode())
-                    .setMessage(e.getMessage()));
+                    .setMessage(e.getMessage())
+                    .setSuccess(false)
+                    .setRequestTime(requestTime)
+                    .setResponseTime(System.currentTimeMillis())
+                    .setDurationMs(System.currentTimeMillis() - requestTime));
         } finally {
             if (connection != null) {
                 connection.disconnect();
             }
+            logger.info("AI gateway upstream stream closed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, terminalEventSent={}, eventLines={}, deltaCount={}, contentChars={}",
+                    request.getSessionId(),
+                    request.getRouteCode(),
+                    request.getAttemptNo(),
+                    modelConfig == null ? null : modelConfig.getCode(),
+                    terminalEventSent,
+                    eventLineCount,
+                    deltaCount,
+                    contentChars);
         }
     }
 
@@ -264,4 +453,25 @@
         }
     }
 
+    private boolean isInterruptedError(Throwable throwable) {
+        Throwable current = throwable;
+        while (current != null) {
+            if (current instanceof InterruptedException || current instanceof InterruptedIOException) {
+                return true;
+            }
+            String message = current.getMessage();
+            if (message != null) {
+                String normalized = message.toLowerCase();
+                if (normalized.contains("interrupted")
+                        || normalized.contains("broken pipe")
+                        || normalized.contains("connection reset")
+                        || normalized.contains("forcibly closed")) {
+                    return true;
+                }
+            }
+            current = current.getCause();
+        }
+        return false;
+    }
+
 }

--
Gitblit v1.9.1