#AI
zhou zhou
19 小时以前 51877df13075ad10ef51107f15bcd21f1661febe
rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/controller/AiGatewayController.java
@@ -4,6 +4,8 @@
import com.vincent.rsf.ai.gateway.service.AiGatewayService;
import com.vincent.rsf.ai.gateway.service.GatewayStreamEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -13,11 +15,15 @@
import javax.annotation.Resource;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
@RestController
@RequestMapping("/internal/chat")
public class AiGatewayController {
    private static final Logger logger = LoggerFactory.getLogger(AiGatewayController.class);
    @Resource
    private AiGatewayService aiGatewayService;
@@ -27,16 +33,106 @@
    @PostMapping(value = "/stream", produces = "application/x-ndjson")
    public StreamingResponseBody stream(@RequestBody GatewayChatRequest request) {
        return outputStream -> {
            logger.info("AI gateway controller stream opened: sessionId={}, routeCode={}, attemptNo={}, modelCode={}",
                    request.getSessionId(),
                    request.getRouteCode(),
                    request.getAttemptNo(),
                    request.getModelCode());
            AtomicBoolean streaming = new AtomicBoolean(true);
            Object writeLock = new Object();
            Thread heartbeatThread = new Thread(() -> {
                while (streaming.get()) {
                    try {
                        Thread.sleep(10000L);
                        if (!streaming.get()) {
                            break;
                        }
                        String json = objectMapper.writeValueAsString(new GatewayStreamEvent()
                                .setType("ping")
                                .setModelCode(request.getModelCode())
                                .setResponseTime(System.currentTimeMillis())) + "\n";
                        synchronized (writeLock) {
                            outputStream.write(json.getBytes(StandardCharsets.UTF_8));
                            outputStream.flush();
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.info("AI gateway heartbeat interrupted: sessionId={}, routeCode={}, attemptNo={}, modelCode={}",
                                request.getSessionId(),
                                request.getRouteCode(),
                                request.getAttemptNo(),
                                request.getModelCode());
                        break;
                    } catch (Exception e) {
                        logger.warn("AI gateway heartbeat write failed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, message={}",
                                request.getSessionId(),
                                request.getRouteCode(),
                                request.getAttemptNo(),
                                request.getModelCode(),
                                e.getMessage());
                        break;
                    }
                }
            }, "ai-gateway-heartbeat-" + (request.getSessionId() == null ? "unknown" : request.getSessionId()));
            heartbeatThread.setDaemon(true);
            heartbeatThread.start();
            try {
                aiGatewayService.stream(request, event -> {
                    String json = objectMapper.writeValueAsString(event) + "\n";
                    outputStream.write(json.getBytes(StandardCharsets.UTF_8));
                    outputStream.flush();
                    synchronized (writeLock) {
                        outputStream.write(json.getBytes(StandardCharsets.UTF_8));
                        outputStream.flush();
                    }
                });
            } catch (Exception e) {
                if (isInterruptedError(e)) {
                    logger.warn("AI gateway controller stream interrupted: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, message={}",
                            request.getSessionId(),
                            request.getRouteCode(),
                            request.getAttemptNo(),
                            request.getModelCode(),
                            e.getMessage());
                    return;
                }
                logger.error("AI gateway controller stream failed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, message={}",
                        request.getSessionId(),
                        request.getRouteCode(),
                        request.getAttemptNo(),
                        request.getModelCode(),
                        e.getMessage(),
                        e);
                throw new IOException(e);
            } finally {
                streaming.set(false);
                heartbeatThread.interrupt();
                logger.info("AI gateway controller stream closed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}",
                        request.getSessionId(),
                        request.getRouteCode(),
                        request.getAttemptNo(),
                        request.getModelCode());
            }
        };
    }
    private boolean isInterruptedError(Throwable throwable) {
        Throwable current = throwable;
        while (current != null) {
            if (current instanceof InterruptedException || current instanceof InterruptedIOException) {
                return true;
            }
            String message = current.getMessage();
            if (message != null) {
                String normalized = message.toLowerCase();
                if (normalized.contains("interrupted")
                        || normalized.contains("broken pipe")
                        || normalized.contains("connection reset")
                        || normalized.contains("forcibly closed")) {
                    return true;
                }
            }
            current = current.getCause();
        }
        return false;
    }
}