From 51877df13075ad10ef51107f15bcd21f1661febe Mon Sep 17 00:00:00 2001
From: zhou zhou <3272660260@qq.com>
Date: 星期二, 17 三月 2026 09:48:01 +0800
Subject: [PATCH] #AI

---
 rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/controller/AiGatewayController.java |  100 +++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 98 insertions(+), 2 deletions(-)

diff --git a/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/controller/AiGatewayController.java b/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/controller/AiGatewayController.java
index f8842d9..f065ad0 100644
--- a/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/controller/AiGatewayController.java
+++ b/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/controller/AiGatewayController.java
@@ -4,6 +4,8 @@
 import com.vincent.rsf.ai.gateway.service.AiGatewayService;
 import com.vincent.rsf.ai.gateway.service.GatewayStreamEvent;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -13,11 +15,15 @@
 
 import javax.annotation.Resource;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @RestController
 @RequestMapping("/internal/chat")
 public class AiGatewayController {
+
+    private static final Logger logger = LoggerFactory.getLogger(AiGatewayController.class);
 
     @Resource
     private AiGatewayService aiGatewayService;
@@ -27,16 +33,106 @@
     @PostMapping(value = "/stream", produces = "application/x-ndjson")
     public StreamingResponseBody stream(@RequestBody GatewayChatRequest request) {
         return outputStream -> {
+            logger.info("AI gateway controller stream opened: sessionId={}, routeCode={}, attemptNo={}, modelCode={}",
+                    request.getSessionId(),
+                    request.getRouteCode(),
+                    request.getAttemptNo(),
+                    request.getModelCode());
+            AtomicBoolean streaming = new AtomicBoolean(true);
+            Object writeLock = new Object();
+            Thread heartbeatThread = new Thread(() -> {
+                while (streaming.get()) {
+                    try {
+                        Thread.sleep(10000L);
+                        if (!streaming.get()) {
+                            break;
+                        }
+                        String json = objectMapper.writeValueAsString(new GatewayStreamEvent()
+                                .setType("ping")
+                                .setModelCode(request.getModelCode())
+                                .setResponseTime(System.currentTimeMillis())) + "\n";
+                        synchronized (writeLock) {
+                            outputStream.write(json.getBytes(StandardCharsets.UTF_8));
+                            outputStream.flush();
+                        }
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        logger.info("AI gateway heartbeat interrupted: sessionId={}, routeCode={}, attemptNo={}, modelCode={}",
+                                request.getSessionId(),
+                                request.getRouteCode(),
+                                request.getAttemptNo(),
+                                request.getModelCode());
+                        break;
+                    } catch (Exception e) {
+                        logger.warn("AI gateway heartbeat write failed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, message={}",
+                                request.getSessionId(),
+                                request.getRouteCode(),
+                                request.getAttemptNo(),
+                                request.getModelCode(),
+                                e.getMessage());
+                        break;
+                    }
+                }
+            }, "ai-gateway-heartbeat-" + (request.getSessionId() == null ? "unknown" : request.getSessionId()));
+            heartbeatThread.setDaemon(true);
+            heartbeatThread.start();
             try {
                 aiGatewayService.stream(request, event -> {
                     String json = objectMapper.writeValueAsString(event) + "\n";
-                    outputStream.write(json.getBytes(StandardCharsets.UTF_8));
-                    outputStream.flush();
+                    synchronized (writeLock) {
+                        outputStream.write(json.getBytes(StandardCharsets.UTF_8));
+                        outputStream.flush();
+                    }
                 });
             } catch (Exception e) {
+                if (isInterruptedError(e)) {
+                    logger.warn("AI gateway controller stream interrupted: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, message={}",
+                            request.getSessionId(),
+                            request.getRouteCode(),
+                            request.getAttemptNo(),
+                            request.getModelCode(),
+                            e.getMessage());
+                    return;
+                }
+                logger.error("AI gateway controller stream failed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, message={}",
+                        request.getSessionId(),
+                        request.getRouteCode(),
+                        request.getAttemptNo(),
+                        request.getModelCode(),
+                        e.getMessage(),
+                        e);
                 throw new IOException(e);
+            } finally {
+                streaming.set(false);
+                heartbeatThread.interrupt();
+                logger.info("AI gateway controller stream closed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}",
+                        request.getSessionId(),
+                        request.getRouteCode(),
+                        request.getAttemptNo(),
+                        request.getModelCode());
             }
         };
     }
 
+    private boolean isInterruptedError(Throwable throwable) {
+        Throwable current = throwable;
+        while (current != null) {
+            if (current instanceof InterruptedException || current instanceof InterruptedIOException) {
+                return true;
+            }
+            String message = current.getMessage();
+            if (message != null) {
+                String normalized = message.toLowerCase();
+                if (normalized.contains("interrupted")
+                        || normalized.contains("broken pipe")
+                        || normalized.contains("connection reset")
+                        || normalized.contains("forcibly closed")) {
+                    return true;
+                }
+            }
+            current = current.getCause();
+        }
+        return false;
+    }
+
 }

--
Gitblit v1.9.1