From 8636ff97bffec9f2130628bf09c9d0fbb371e2bc Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 10 三月 2026 16:53:24 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/ai/service/LlmChatService.java | 407 ++++++++++++++++++++++++++++-----------------------------
1 files changed, 200 insertions(+), 207 deletions(-)
diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index 4e6bf19..e2eddd6 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -1,22 +1,24 @@
package com.zy.ai.service;
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
import com.zy.ai.entity.ChatCompletionRequest;
import com.zy.ai.entity.ChatCompletionResponse;
+import com.zy.ai.entity.LlmCallLog;
import com.zy.ai.entity.LlmRouteConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.client.RestClientResponseException;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,7 +29,11 @@
@RequiredArgsConstructor
public class LlmChatService {
+ private static final int LOG_TEXT_LIMIT = 16000;
+
private final LlmRoutingService llmRoutingService;
+ private final LlmCallLogService llmCallLogService;
+ private final LlmSpringAiClientService llmSpringAiClientService;
@Value("${llm.base-url:}")
private String fallbackBaseUrl;
@@ -54,7 +60,7 @@
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(false);
- ChatCompletionResponse response = complete(req);
+ ChatCompletionResponse response = complete(req, "chat");
if (response == null ||
response.getChoices() == null ||
@@ -81,13 +87,20 @@
req.setTools(tools);
req.setTool_choice("auto");
}
- return complete(req);
+ return complete(req, tools != null && !tools.isEmpty() ? "chat_completion_tools" : "chat_completion");
}
public ChatCompletionResponse complete(ChatCompletionRequest req) {
+ return complete(req, "completion");
+ }
+
+ private ChatCompletionResponse complete(ChatCompletionRequest req, String scene) {
+ String traceId = nextTraceId();
List<ResolvedRoute> routes = resolveRoutes();
if (routes.isEmpty()) {
log.error("璋冪敤 LLM 澶辫触: 鏈厤缃彲鐢� LLM 璺敱");
+ recordCall(traceId, scene, false, 1, null, false, null, 0L, req, null, "none",
+ new RuntimeException("鏈厤缃彲鐢� LLM 璺敱"), "no_route");
return null;
}
@@ -95,19 +108,39 @@
for (int i = 0; i < routes.size(); i++) {
ResolvedRoute route = routes.get(i);
boolean hasNext = i < routes.size() - 1;
+ ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, false);
+ long start = System.currentTimeMillis();
try {
- ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, false);
- ChatCompletionResponse resp = callCompletion(route, routeReq);
+ CompletionCallResult callResult = callCompletion(route, routeReq);
+ ChatCompletionResponse resp = callResult.response;
if (!isValidCompletion(resp)) {
- throw new RuntimeException("LLM 鍝嶅簲涓虹┖");
+ RuntimeException ex = new RuntimeException("LLM 鍝嶅簲涓虹┖");
+ boolean canSwitch = shouldSwitch(route, false);
+ markFailure(route, ex, canSwitch);
+ recordCall(traceId, scene, false, i + 1, route, false, callResult.statusCode,
+ System.currentTimeMillis() - start, routeReq, callResult.payload, "error", ex,
+ "invalid_completion");
+ if (hasNext && canSwitch) {
+ log.warn("LLM 鍒囨崲鍒颁笅涓�璺敱, current={}, reason={}", route.tag(), ex.getMessage());
+ continue;
+ }
+ log.error("璋冪敤 LLM 澶辫触, route={}", route.tag(), ex);
+ last = ex;
+ break;
}
markSuccess(route);
+ recordCall(traceId, scene, false, i + 1, route, true, callResult.statusCode,
+ System.currentTimeMillis() - start, routeReq, buildResponseText(resp, callResult.payload),
+ "none", null, null);
return resp;
} catch (Throwable ex) {
last = ex;
boolean quota = isQuotaExhausted(ex);
boolean canSwitch = shouldSwitch(route, quota);
markFailure(route, ex, canSwitch);
+ recordCall(traceId, scene, false, i + 1, route, false, statusCodeOf(ex),
+ System.currentTimeMillis() - start, routeReq, responseBodyOf(ex),
+ quota ? "quota" : "error", ex, null);
if (hasNext && canSwitch) {
log.warn("LLM 鍒囨崲鍒颁笅涓�璺敱, current={}, reason={}", route.tag(), errorText(ex));
continue;
@@ -136,7 +169,7 @@
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(true);
- streamWithFailover(req, onChunk, onComplete, onError);
+ streamWithFailover(req, onChunk, onComplete, onError, "chat_stream");
}
public void chatStreamWithTools(List<ChatCompletionRequest.Message> messages,
@@ -155,19 +188,23 @@
req.setTools(tools);
req.setTool_choice("auto");
}
- streamWithFailover(req, onChunk, onComplete, onError);
+ streamWithFailover(req, onChunk, onComplete, onError, tools != null && !tools.isEmpty() ? "chat_stream_tools" : "chat_stream");
}
private void streamWithFailover(ChatCompletionRequest req,
Consumer<String> onChunk,
Runnable onComplete,
- Consumer<Throwable> onError) {
+ Consumer<Throwable> onError,
+ String scene) {
+ String traceId = nextTraceId();
List<ResolvedRoute> routes = resolveRoutes();
if (routes.isEmpty()) {
+ recordCall(traceId, scene, true, 1, null, false, null, 0L, req, null, "none",
+ new RuntimeException("鏈厤缃彲鐢� LLM 璺敱"), "no_route");
if (onError != null) onError.accept(new RuntimeException("鏈厤缃彲鐢� LLM 璺敱"));
return;
}
- attemptStream(routes, 0, req, onChunk, onComplete, onError);
+ attemptStream(routes, 0, req, onChunk, onComplete, onError, traceId, scene);
}
private void attemptStream(List<ResolvedRoute> routes,
@@ -175,7 +212,9 @@
ChatCompletionRequest req,
Consumer<String> onChunk,
Runnable onComplete,
- Consumer<Throwable> onError) {
+ Consumer<Throwable> onError,
+ String traceId,
+ String scene) {
if (index >= routes.size()) {
if (onError != null) onError.accept(new RuntimeException("LLM 璺敱鍏ㄩ儴澶辫触"));
return;
@@ -183,6 +222,8 @@
ResolvedRoute route = routes.get(index);
ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, true);
+ long start = System.currentTimeMillis();
+ StringBuilder outputBuffer = new StringBuilder();
AtomicBoolean doneSeen = new AtomicBoolean(false);
AtomicBoolean errorSeen = new AtomicBoolean(false);
@@ -216,109 +257,48 @@
drain.setDaemon(true);
drain.start();
- streamFlux(route, routeReq).subscribe(payload -> {
+ Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq);
+ streamSource.subscribe(payload -> {
if (payload == null || payload.isEmpty()) return;
- String[] events = payload.split("\\r?\\n\\r?\\n");
- for (String part : events) {
- String s = part;
- if (s == null || s.isEmpty()) continue;
- if (s.startsWith("data:")) {
- s = s.substring(5);
- if (s.startsWith(" ")) s = s.substring(1);
- }
- if ("[DONE]".equals(s.trim())) {
- doneSeen.set(true);
- continue;
- }
- try {
- JSONObject obj = JSON.parseObject(s);
- JSONArray choices = obj.getJSONArray("choices");
- if (choices != null && !choices.isEmpty()) {
- JSONObject c0 = choices.getJSONObject(0);
- JSONObject delta = c0.getJSONObject("delta");
- if (delta != null) {
- String content = delta.getString("content");
- if (content != null) {
- queue.offer(content);
- }
- }
- }
- } catch (Exception e) {
- log.warn("瑙f瀽 LLM stream 鐗囨澶辫触: {}", e.getMessage());
- }
- }
+ queue.offer(payload);
+ appendLimited(outputBuffer, payload);
}, err -> {
errorSeen.set(true);
doneSeen.set(true);
boolean quota = isQuotaExhausted(err);
boolean canSwitch = shouldSwitch(route, quota);
markFailure(route, err, canSwitch);
+ recordCall(traceId, scene, true, index + 1, route, false, statusCodeOf(err),
+ System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+ quota ? "quota" : "error", err, "emitted=" + emitted.get());
if (!emitted.get() && canSwitch && index < routes.size() - 1) {
log.warn("LLM 璺敱澶辫触锛岃嚜鍔ㄥ垏鎹紝current={}, reason={}", route.tag(), errorText(err));
- attemptStream(routes, index + 1, req, onChunk, onComplete, onError);
+ attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
return;
}
if (onError != null) onError.accept(err);
}, () -> {
- if (!doneSeen.get()) {
- RuntimeException ex = new RuntimeException("LLM 娴佹剰澶栧畬鎴�");
- errorSeen.set(true);
- doneSeen.set(true);
- boolean canSwitch = shouldSwitch(route, false);
- markFailure(route, ex, canSwitch);
- if (!emitted.get() && canSwitch && index < routes.size() - 1) {
- log.warn("LLM 璺敱娴佸紓甯稿畬鎴愶紝鑷姩鍒囨崲锛宑urrent={}", route.tag());
- attemptStream(routes, index + 1, req, onChunk, onComplete, onError);
- } else {
- if (onError != null) onError.accept(ex);
- }
- } else {
- markSuccess(route);
- doneSeen.set(true);
- }
+ markSuccess(route);
+ recordCall(traceId, scene, true, index + 1, route, true, 200,
+ System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+ "none", null, null);
+ doneSeen.set(true);
});
}
- private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) {
- WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
- return client.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .exchangeToFlux(resp -> {
- int status = resp.rawStatusCode();
- if (status >= 200 && status < 300) {
- return resp.bodyToFlux(String.class);
- }
- return resp.bodyToMono(String.class)
- .defaultIfEmpty("")
- .flatMapMany(body -> Flux.error(new LlmRouteException(status, body)));
- })
- .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触, route={}", route.tag(), ex));
+ private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
+ return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req)
+ .doOnError(ex -> log.error("璋冪敤 Spring AI 娴佸紡澶辫触, route={}", route.tag(), ex));
}
- private ChatCompletionResponse callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
- WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
- RawCompletionResult raw = client.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .exchangeToMono(resp -> resp.bodyToFlux(String.class)
- .collectList()
- .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list))))
- .block();
+ private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
+ return callCompletionWithSpringAi(route, req);
+ }
- if (raw == null) {
- throw new RuntimeException("LLM 杩斿洖涓虹┖");
- }
- if (raw.statusCode < 200 || raw.statusCode >= 300) {
- throw new LlmRouteException(raw.statusCode, raw.payload);
- }
- return parseCompletion(raw.payload);
+ private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
+ LlmSpringAiClientService.CompletionCallResult result =
+ llmSpringAiClientService.callCompletion(route.baseUrl, route.apiKey, req);
+ return new CompletionCallResult(result.getStatusCode(), result.getPayload(), result.getResponse());
}
private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
@@ -379,22 +359,36 @@
private String errorText(Throwable ex) {
if (ex == null) return "unknown";
- if (ex instanceof LlmRouteException) {
- LlmRouteException e = (LlmRouteException) ex;
- String body = e.body == null ? "" : e.body;
- if (body.length() > 240) {
+ if (ex instanceof RestClientResponseException) {
+ RestClientResponseException e = (RestClientResponseException) ex;
+ String body = e.getResponseBodyAsString();
+ if (body != null && body.length() > 240) {
body = body.substring(0, 240);
}
- return "status=" + e.statusCode + ", body=" + body;
+ return "status=" + e.getStatusCode().value() + ", body=" + body;
+ }
+ if (ex instanceof WebClientResponseException) {
+ WebClientResponseException e = (WebClientResponseException) ex;
+ String body = e.getResponseBodyAsString();
+ if (body != null && body.length() > 240) {
+ body = body.substring(0, 240);
+ }
+ return "status=" + e.getStatusCode().value() + ", body=" + body;
+ }
+ Integer springAiStatus = llmSpringAiClientService.statusCodeOf(ex);
+ if (springAiStatus != null) {
+ return "status=" + springAiStatus + ", body=" + llmSpringAiClientService.responseBodyOf(ex, 240);
}
return ex.getMessage() == null ? ex.toString() : ex.getMessage();
}
private boolean isQuotaExhausted(Throwable ex) {
- if (!(ex instanceof LlmRouteException)) return false;
- LlmRouteException e = (LlmRouteException) ex;
- if (e.statusCode == 429) return true;
- String text = (e.body == null ? "" : e.body).toLowerCase();
+ Integer status = statusCodeOf(ex);
+ if (status != null && status == 429) {
+ return true;
+ }
+ String text = responseBodyOf(ex);
+ text = text == null ? "" : text.toLowerCase(Locale.ROOT);
return text.contains("insufficient_quota")
|| text.contains("quota")
|| text.contains("浣欓")
@@ -425,116 +419,115 @@
return s == null || s.trim().isEmpty();
}
- private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
- if (payload == null || payload.isEmpty()) return acc;
- String[] events = payload.split("\\r?\\n\\r?\\n");
- for (String part : events) {
- String s = part;
- if (s == null || s.isEmpty()) continue;
- if (s.startsWith("data:")) {
- s = s.substring(5);
- if (s.startsWith(" ")) s = s.substring(1);
- }
- if ("[DONE]".equals(s.trim())) {
- continue;
- }
- try {
- JSONObject obj = JSON.parseObject(s);
- if (obj == null) continue;
- JSONArray choices = obj.getJSONArray("choices");
- if (choices != null && !choices.isEmpty()) {
- JSONObject c0 = choices.getJSONObject(0);
- if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
- ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
- ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
- choice.setMessage(msg);
- ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
- list.add(choice);
- acc.setChoices(list);
- }
- ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
- ChatCompletionRequest.Message msg = choice.getMessage();
- if (msg.getRole() == null || msg.getRole().isEmpty()) {
- msg.setRole("assistant");
- }
- JSONObject delta = c0.getJSONObject("delta");
- if (delta != null) {
- String c = delta.getString("content");
- if (c != null) {
- String prev = msg.getContent();
- msg.setContent(prev == null ? c : prev + c);
- }
- String role = delta.getString("role");
- if (role != null && !role.isEmpty()) msg.setRole(role);
- }
- JSONObject message = c0.getJSONObject("message");
- if (message != null) {
- String c = message.getString("content");
- if (c != null) {
- String prev = msg.getContent();
- msg.setContent(prev == null ? c : prev + c);
- }
- String role = message.getString("role");
- if (role != null && !role.isEmpty()) msg.setRole(role);
- }
- String fr = c0.getString("finish_reason");
- if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
- }
- String id = obj.getString("id");
- if (id != null && !id.isEmpty()) acc.setId(id);
- Long created = obj.getLong("created");
- if (created != null) acc.setCreated(created);
- String object = obj.getString("object");
- if (object != null && !object.isEmpty()) acc.setObjectName(object);
- } catch (Exception ignore) {
- }
- }
- return acc;
+ private String nextTraceId() {
+ return UUID.randomUUID().toString().replace("-", "");
}
- private ChatCompletionResponse parseCompletion(String payload) {
- if (payload == null) return null;
- try {
- ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
- if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
- return r;
- }
- } catch (Exception ignore) {
+ private void appendLimited(StringBuilder sb, String text) {
+ if (sb == null || text == null || text.isEmpty()) {
+ return;
}
- ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
- if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
- return sse;
+ int remain = LOG_TEXT_LIMIT - sb.length();
+ if (remain <= 0) {
+ return;
}
- ChatCompletionResponse r = new ChatCompletionResponse();
- ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
- ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
- msg.setRole("assistant");
- msg.setContent(payload);
- choice.setMessage(msg);
- ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
- list.add(choice);
- r.setChoices(list);
- return r;
+ if (text.length() <= remain) {
+ sb.append(text);
+ } else {
+ sb.append(text, 0, remain);
+ }
}
- private static class RawCompletionResult {
+ private Integer statusCodeOf(Throwable ex) {
+ if (ex instanceof RestClientResponseException) {
+ return ((RestClientResponseException) ex).getStatusCode().value();
+ }
+ if (ex instanceof WebClientResponseException) {
+ return ((WebClientResponseException) ex).getStatusCode().value();
+ }
+ return llmSpringAiClientService.statusCodeOf(ex);
+ }
+
+ private String responseBodyOf(Throwable ex) {
+ if (ex instanceof RestClientResponseException) {
+ return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
+ }
+ if (ex instanceof WebClientResponseException) {
+ return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
+ }
+ return cut(llmSpringAiClientService.responseBodyOf(ex, LOG_TEXT_LIMIT), LOG_TEXT_LIMIT);
+ }
+
+ private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) {
+ if (resp != null && resp.getChoices() != null && !resp.getChoices().isEmpty()
+ && resp.getChoices().get(0) != null && resp.getChoices().get(0).getMessage() != null) {
+ ChatCompletionRequest.Message m = resp.getChoices().get(0).getMessage();
+ if (!isBlank(m.getContent())) {
+ return cut(m.getContent(), LOG_TEXT_LIMIT);
+ }
+ if (m.getTool_calls() != null && !m.getTool_calls().isEmpty()) {
+ return cut(JSON.toJSONString(m), LOG_TEXT_LIMIT);
+ }
+ }
+ return cut(fallbackPayload, LOG_TEXT_LIMIT);
+ }
+
+ private String safeName(Throwable ex) {
+ return ex == null ? null : ex.getClass().getSimpleName();
+ }
+
+ private String cut(String text, int maxLen) {
+ if (text == null) return null;
+ String clean = text.replace("\r", " ");
+ return clean.length() > maxLen ? clean.substring(0, maxLen) : clean;
+ }
+
+ private void recordCall(String traceId,
+ String scene,
+ boolean stream,
+ int attemptNo,
+ ResolvedRoute route,
+ boolean success,
+ Integer httpStatus,
+ long latencyMs,
+ ChatCompletionRequest req,
+ String response,
+ String switchMode,
+ Throwable err,
+ String extra) {
+ LlmCallLog item = new LlmCallLog();
+ item.setTraceId(cut(traceId, 64));
+ item.setScene(cut(scene, 64));
+ item.setStream((short) (stream ? 1 : 0));
+ item.setAttemptNo(attemptNo);
+ if (route != null) {
+ item.setRouteId(route.id);
+ item.setRouteName(cut(route.name, 128));
+ item.setBaseUrl(cut(route.baseUrl, 255));
+ item.setModel(cut(route.model, 128));
+ }
+ item.setSuccess((short) (success ? 1 : 0));
+ item.setHttpStatus(httpStatus);
+ item.setLatencyMs(latencyMs < 0 ? 0 : latencyMs);
+ item.setSwitchMode(cut(switchMode, 32));
+ item.setRequestContent(cut(JSON.toJSONString(req), LOG_TEXT_LIMIT));
+ item.setResponseContent(cut(response, LOG_TEXT_LIMIT));
+ item.setErrorType(cut(safeName(err), 128));
+ item.setErrorMessage(err == null ? null : cut(errorText(err), 1024));
+ item.setExtra(cut(extra, 512));
+ item.setCreateTime(new Date());
+ llmCallLogService.saveIgnoreError(item);
+ }
+
+ private static class CompletionCallResult {
private final int statusCode;
private final String payload;
+ private final ChatCompletionResponse response;
- private RawCompletionResult(int statusCode, String payload) {
+ private CompletionCallResult(int statusCode, String payload, ChatCompletionResponse response) {
this.statusCode = statusCode;
this.payload = payload;
- }
- }
-
- private static class LlmRouteException extends RuntimeException {
- private final int statusCode;
- private final String body;
-
- private LlmRouteException(int statusCode, String body) {
- super("http status=" + statusCode);
- this.statusCode = statusCode;
- this.body = body;
+ this.response = response;
}
}
--
Gitblit v1.9.1