From 51877df13075ad10ef51107f15bcd21f1661febe Mon Sep 17 00:00:00 2001
From: zhou zhou <3272660260@qq.com>
Date: 星期二, 17 三月 2026 09:48:01 +0800
Subject: [PATCH] #AI
---
rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 218 insertions(+), 8 deletions(-)
diff --git a/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java b/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java
index da04faa..070bb04 100644
--- a/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java
+++ b/rsf-ai-gateway/src/main/java/com/vincent/rsf/ai/gateway/service/AiGatewayService.java
@@ -5,12 +5,15 @@
import com.vincent.rsf.ai.gateway.dto.GatewayChatRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -23,6 +26,8 @@
@Service
public class AiGatewayService {
+ private static final Logger logger = LoggerFactory.getLogger(AiGatewayService.class);
+
@Resource
private AiGatewayProperties aiGatewayProperties;
@Resource
@@ -34,7 +39,20 @@
public void stream(GatewayChatRequest request, EventConsumer consumer) throws Exception {
AiGatewayProperties.ModelConfig modelConfig = resolveModel(request);
+ logger.info("AI gateway stream start: sessionId={}, routeCode={}, attemptNo={}, requestModelCode={}, resolvedModelCode={}, provider={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ request.getModelCode(),
+ modelConfig == null ? null : modelConfig.getCode(),
+ modelConfig == null ? null : modelConfig.getProvider());
if (modelConfig == null || modelConfig.getChatUrl() == null || modelConfig.getChatUrl().trim().isEmpty()) {
+ logger.info("AI gateway use mock stream: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, provider={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig == null ? request.getModelCode() : modelConfig.getCode(),
+ modelConfig == null ? "mock" : modelConfig.getProvider());
mockStream(request, modelConfig, consumer);
return;
}
@@ -87,6 +105,7 @@
private void mockStream(GatewayChatRequest request, AiGatewayProperties.ModelConfig modelConfig,
EventConsumer consumer) throws Exception {
+ long requestTime = System.currentTimeMillis();
String modelCode = modelConfig == null ? aiGatewayProperties.getDefaultModelCode() : modelConfig.getCode();
String lastQuestion = "";
List<GatewayChatMessage> messages = request.getMessages();
@@ -98,22 +117,58 @@
}
}
String answer = "褰撳墠涓烘紨绀烘ā寮忥紝妯″瀷[" + modelCode + "]宸叉敹鍒颁綘鐨勯棶棰橈細" + lastQuestion;
+ logger.info("AI gateway mock stream emitting response: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, answerLength={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelCode,
+ answer.length());
for (char c : answer.toCharArray()) {
consumer.accept(new GatewayStreamEvent()
.setType("delta")
.setModelCode(modelCode)
.setContent(String.valueOf(c)));
- Thread.sleep(20L);
+ try {
+ Thread.sleep(20L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
}
consumer.accept(new GatewayStreamEvent()
.setType("done")
- .setModelCode(modelCode));
+ .setModelCode(modelCode)
+ .setSuccess(true)
+ .setRequestTime(requestTime)
+ .setResponseTime(System.currentTimeMillis())
+ .setDurationMs(System.currentTimeMillis() - requestTime));
+ logger.info("AI gateway mock stream completed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, durationMs={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelCode,
+ System.currentTimeMillis() - requestTime);
}
private void openAiCompatibleStream(GatewayChatRequest request, AiGatewayProperties.ModelConfig modelConfig,
EventConsumer consumer) throws Exception {
HttpURLConnection connection = null;
+ long requestTime = System.currentTimeMillis();
+ boolean terminalEventSent = false;
+ int eventLineCount = 0;
+ int deltaCount = 0;
+ int contentChars = 0;
+ boolean firstDeltaLogged = false;
+ String normalizedUrl = modelConfig == null ? null : modelConfig.getChatUrl();
try {
+ logger.info("AI gateway opening upstream stream: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, provider={}, url={}, modelName={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig == null ? null : modelConfig.getCode(),
+ modelConfig == null ? null : modelConfig.getProvider(),
+ normalizedUrl,
+ modelConfig == null ? null : modelConfig.getModelName());
connection = (HttpURLConnection) new URL(modelConfig.getChatUrl()).openConnection();
connection.setConnectTimeout(aiGatewayProperties.getConnectTimeoutMillis());
connection.setReadTimeout(aiGatewayProperties.getReadTimeoutMillis());
@@ -136,19 +191,49 @@
}
int statusCode = connection.getResponseCode();
+ logger.info("AI gateway upstream response received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, statusCode={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig.getCode(),
+ statusCode);
InputStream inputStream = statusCode >= 400 ? connection.getErrorStream() : connection.getInputStream();
if (inputStream == null) {
+ logger.warn("AI gateway upstream returned empty stream: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, statusCode={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig.getCode(),
+ normalizedUrl,
+ statusCode);
consumer.accept(new GatewayStreamEvent()
.setType("error")
.setModelCode(modelConfig.getCode())
- .setMessage("妯″瀷鏈嶅姟鏃犲搷搴�"));
+ .setMessage("妯″瀷鏈嶅姟鏃犲搷搴�")
+ .setSuccess(false)
+ .setRequestTime(requestTime)
+ .setResponseTime(System.currentTimeMillis())
+ .setDurationMs(System.currentTimeMillis() - requestTime));
+ terminalEventSent = true;
return;
}
if (statusCode >= 400) {
+ logger.warn("AI gateway upstream http error: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, statusCode={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig.getCode(),
+ normalizedUrl,
+ statusCode);
consumer.accept(new GatewayStreamEvent()
.setType("error")
.setModelCode(modelConfig.getCode())
- .setMessage(readErrorMessage(inputStream, statusCode)));
+ .setMessage(readErrorMessage(inputStream, statusCode))
+ .setSuccess(false)
+ .setRequestTime(requestTime)
+ .setResponseTime(System.currentTimeMillis())
+ .setDurationMs(System.currentTimeMillis() - requestTime));
+ terminalEventSent = true;
return;
}
@@ -158,11 +243,27 @@
if (line.trim().isEmpty() || !line.startsWith("data:")) {
continue;
}
+ eventLineCount++;
String payload = line.substring(5).trim();
if ("[DONE]".equals(payload)) {
+ long responseTime = System.currentTimeMillis();
+ logger.info("AI gateway upstream done marker received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, eventLines={}, deltaCount={}, contentChars={}, durationMs={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig.getCode(),
+ eventLineCount,
+ deltaCount,
+ contentChars,
+ responseTime - requestTime);
consumer.accept(new GatewayStreamEvent()
.setType("done")
- .setModelCode(modelConfig.getCode()));
+ .setModelCode(modelConfig.getCode())
+ .setSuccess(true)
+ .setRequestTime(requestTime)
+ .setResponseTime(responseTime)
+ .setDurationMs(responseTime - requestTime));
+ terminalEventSent = true;
break;
}
JsonNode root = objectMapper.readTree(payload);
@@ -170,29 +271,117 @@
JsonNode delta = choice.path("delta");
JsonNode contentNode = delta.path("content");
if (!contentNode.isMissingNode() && !contentNode.isNull()) {
+ String content = contentNode.asText();
+ deltaCount++;
+ contentChars += content.length();
+ if (!firstDeltaLogged) {
+ logger.info("AI gateway upstream first delta received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, afterMs={}, sampleLength={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig.getCode(),
+ System.currentTimeMillis() - requestTime,
+ content.length());
+ firstDeltaLogged = true;
+ }
consumer.accept(new GatewayStreamEvent()
.setType("delta")
.setModelCode(modelConfig.getCode())
- .setContent(contentNode.asText()));
+ .setContent(content));
}
JsonNode finishReason = choice.path("finish_reason");
if (!finishReason.isMissingNode() && !finishReason.isNull()) {
+ long responseTime = System.currentTimeMillis();
+ logger.info("AI gateway upstream finish_reason received: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, finishReason={}, eventLines={}, deltaCount={}, contentChars={}, durationMs={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig.getCode(),
+ finishReason.asText(),
+ eventLineCount,
+ deltaCount,
+ contentChars,
+ responseTime - requestTime);
consumer.accept(new GatewayStreamEvent()
.setType("done")
- .setModelCode(modelConfig.getCode()));
+ .setModelCode(modelConfig.getCode())
+ .setSuccess(true)
+ .setRequestTime(requestTime)
+ .setResponseTime(responseTime)
+ .setDurationMs(responseTime - requestTime));
+ terminalEventSent = true;
break;
}
}
}
+ if (!terminalEventSent) {
+ long responseTime = System.currentTimeMillis();
+ logger.warn("AI gateway upstream ended without terminal event: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, eventLines={}, deltaCount={}, contentChars={}, durationMs={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig.getCode(),
+ normalizedUrl,
+ eventLineCount,
+ deltaCount,
+ contentChars,
+ responseTime - requestTime);
+ consumer.accept(new GatewayStreamEvent()
+ .setType("error")
+ .setModelCode(modelConfig.getCode())
+ .setMessage("妯″瀷娴佸紓甯镐腑鏂�")
+ .setSuccess(false)
+ .setRequestTime(requestTime)
+ .setResponseTime(responseTime)
+ .setDurationMs(responseTime - requestTime));
+ }
} catch (Exception e) {
+ if (isInterruptedError(e)) {
+ logger.warn("AI gateway upstream interrupted: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, stage={}, message={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig == null ? null : modelConfig.getCode(),
+ normalizedUrl,
+ terminalEventSent ? "after_terminal" : "streaming",
+ e.getMessage());
+ if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+ Thread.currentThread().interrupt();
+ }
+ return;
+ }
+ logger.error("AI gateway upstream exception: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, url={}, eventLines={}, deltaCount={}, contentChars={}, message={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig == null ? null : modelConfig.getCode(),
+ normalizedUrl,
+ eventLineCount,
+ deltaCount,
+ contentChars,
+ e.getMessage(),
+ e);
consumer.accept(new GatewayStreamEvent()
.setType("error")
.setModelCode(modelConfig.getCode())
- .setMessage(e.getMessage()));
+ .setMessage(e.getMessage())
+ .setSuccess(false)
+ .setRequestTime(requestTime)
+ .setResponseTime(System.currentTimeMillis())
+ .setDurationMs(System.currentTimeMillis() - requestTime));
} finally {
if (connection != null) {
connection.disconnect();
}
+ logger.info("AI gateway upstream stream closed: sessionId={}, routeCode={}, attemptNo={}, modelCode={}, terminalEventSent={}, eventLines={}, deltaCount={}, contentChars={}",
+ request.getSessionId(),
+ request.getRouteCode(),
+ request.getAttemptNo(),
+ modelConfig == null ? null : modelConfig.getCode(),
+ terminalEventSent,
+ eventLineCount,
+ deltaCount,
+ contentChars);
}
}
@@ -264,4 +453,25 @@
}
}
+ private boolean isInterruptedError(Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof InterruptedException || current instanceof InterruptedIOException) {
+ return true;
+ }
+ String message = current.getMessage();
+ if (message != null) {
+ String normalized = message.toLowerCase();
+ if (normalized.contains("interrupted")
+ || normalized.contains("broken pipe")
+ || normalized.contains("connection reset")
+ || normalized.contains("forcibly closed")) {
+ return true;
+ }
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
}
--
Gitblit v1.9.1