From 2d2fd991826837d7189cc488aee6f309a6f5e216 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期一, 09 三月 2026 22:48:46 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/ai/service/LlmChatService.java | 980 ++++++++++++++++++++++++++++++++++++++++++---------------
1 files changed, 715 insertions(+), 265 deletions(-)
diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index ddb333a..c92ede3 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -1,45 +1,58 @@
package com.zy.ai.service;
-import com.zy.ai.entity.ChatCompletionRequest;
-import com.zy.ai.entity.ChatCompletionResponse;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.stereotype.Service;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Flux;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import com.zy.ai.entity.ChatCompletionRequest;
+import com.zy.ai.entity.ChatCompletionResponse;
+import com.zy.ai.entity.LlmCallLog;
+import com.zy.ai.entity.LlmRouteConfig;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.ai.openai.api.OpenAiApi;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestClientResponseException;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
@Slf4j
@Service
@RequiredArgsConstructor
public class LlmChatService {
- private final WebClient llmWebClient;
+ private static final int LOG_TEXT_LIMIT = 16000;
- @Value("${llm.api-key}")
- private String apiKey;
+ private final LlmRoutingService llmRoutingService;
+ private final LlmCallLogService llmCallLogService;
- @Value("${llm.model}")
- private String model;
+ @Value("${llm.base-url:}")
+ private String fallbackBaseUrl;
- @Value("${llm.pythonPlatformUrl}")
- private String pythonPlatformUrl;
+ @Value("${llm.api-key:}")
+ private String fallbackApiKey;
- @Value("${llm.thinking}")
- private String thinking;
+ @Value("${llm.model:}")
+ private String fallbackModel;
+
+ @Value("${llm.thinking:false}")
+ private String fallbackThinking;
/**
* 閫氱敤瀵硅瘽鏂规硶锛氫紶鍏� messages锛岃繑鍥炲ぇ妯″瀷鏂囨湰鍥炲
@@ -49,27 +62,12 @@
Integer maxTokens) {
ChatCompletionRequest req = new ChatCompletionRequest();
- req.setModel(model);
req.setMessages(messages);
req.setTemperature(temperature != null ? temperature : 0.3);
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(false);
- ChatCompletionResponse response = llmWebClient.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .exchangeToMono(resp -> resp.bodyToFlux(String.class)
- .collectList()
- .map(list -> {
- String payload = String.join("\n\n", list);
- return parseCompletion(payload);
- }))
- .doOnError(ex -> log.error("璋冪敤 LLM 澶辫触", ex))
- .onErrorResume(ex -> Mono.empty())
- .block();
+ ChatCompletionResponse response = complete(req, "chat");
if (response == null ||
response.getChoices() == null ||
@@ -88,45 +86,81 @@
Integer maxTokens,
List<Object> tools) {
ChatCompletionRequest req = new ChatCompletionRequest();
- req.setModel(model);
req.setMessages(messages);
req.setTemperature(temperature != null ? temperature : 0.3);
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(false);
-
- if(thinking.equals("enable")) {
- ChatCompletionRequest.Thinking thinking = new ChatCompletionRequest.Thinking();
- thinking.setType("enable");
- req.setThinking(thinking);
- }
if (tools != null && !tools.isEmpty()) {
req.setTools(tools);
req.setTool_choice("auto");
}
- return complete(req);
+ return complete(req, tools != null && !tools.isEmpty() ? "chat_completion_tools" : "chat_completion");
}
public ChatCompletionResponse complete(ChatCompletionRequest req) {
- try {
- return llmWebClient.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .exchangeToMono(resp -> resp.bodyToFlux(String.class)
- .collectList()
- .map(list -> {
- String payload = String.join("\n\n", list);
- return parseCompletion(payload);
- }))
- .doOnError(ex -> log.error("璋冪敤 LLM 澶辫触", ex))
- .onErrorResume(ex -> Mono.empty())
- .block();
- } catch (Exception e) {
- log.error("璋冪敤 LLM 澶辫触", e);
+ return complete(req, "completion");
+ }
+
+ private ChatCompletionResponse complete(ChatCompletionRequest req, String scene) {
+ String traceId = nextTraceId();
+ List<ResolvedRoute> routes = resolveRoutes();
+ if (routes.isEmpty()) {
+ log.error("璋冪敤 LLM 澶辫触: 鏈厤缃彲鐢� LLM 璺敱");
+ recordCall(traceId, scene, false, 1, null, false, null, 0L, req, null, "none",
+ new RuntimeException("鏈厤缃彲鐢� LLM 璺敱"), "no_route");
return null;
}
+
+ Throwable last = null;
+ for (int i = 0; i < routes.size(); i++) {
+ ResolvedRoute route = routes.get(i);
+ boolean hasNext = i < routes.size() - 1;
+ ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, false);
+ long start = System.currentTimeMillis();
+ try {
+ CompletionCallResult callResult = callCompletion(route, routeReq);
+ ChatCompletionResponse resp = callResult.response;
+ if (!isValidCompletion(resp)) {
+ RuntimeException ex = new RuntimeException("LLM 鍝嶅簲涓虹┖");
+ boolean canSwitch = shouldSwitch(route, false);
+ markFailure(route, ex, canSwitch);
+ recordCall(traceId, scene, false, i + 1, route, false, callResult.statusCode,
+ System.currentTimeMillis() - start, routeReq, callResult.payload, "error", ex,
+ "invalid_completion");
+ if (hasNext && canSwitch) {
+ log.warn("LLM 鍒囨崲鍒颁笅涓�璺敱, current={}, reason={}", route.tag(), ex.getMessage());
+ continue;
+ }
+ log.error("璋冪敤 LLM 澶辫触, route={}", route.tag(), ex);
+ last = ex;
+ break;
+ }
+ markSuccess(route);
+ recordCall(traceId, scene, false, i + 1, route, true, callResult.statusCode,
+ System.currentTimeMillis() - start, routeReq, buildResponseText(resp, callResult.payload),
+ "none", null, null);
+ return resp;
+ } catch (Throwable ex) {
+ last = ex;
+ boolean quota = isQuotaExhausted(ex);
+ boolean canSwitch = shouldSwitch(route, quota);
+ markFailure(route, ex, canSwitch);
+ recordCall(traceId, scene, false, i + 1, route, false, statusCodeOf(ex),
+ System.currentTimeMillis() - start, routeReq, responseBodyOf(ex),
+ quota ? "quota" : "error", ex, null);
+ if (hasNext && canSwitch) {
+ log.warn("LLM 鍒囨崲鍒颁笅涓�璺敱, current={}, reason={}", route.tag(), errorText(ex));
+ continue;
+ }
+ log.error("璋冪敤 LLM 澶辫触, route={}", route.tag(), ex);
+ break;
+ }
+ }
+
+ if (last != null) {
+ log.error("璋冪敤 LLM 鍏ㄩ儴璺敱澶辫触: {}", errorText(last));
+ }
+ return null;
}
public void chatStream(List<ChatCompletionRequest.Message> messages,
@@ -137,92 +171,12 @@
Consumer<Throwable> onError) {
ChatCompletionRequest req = new ChatCompletionRequest();
- req.setModel(model);
req.setMessages(messages);
req.setTemperature(temperature != null ? temperature : 0.3);
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(true);
-
- Flux<String> flux = llmWebClient.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .retrieve()
- .bodyToFlux(String.class)
- .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
-
- AtomicBoolean doneSeen = new AtomicBoolean(false);
- AtomicBoolean errorSeen = new AtomicBoolean(false);
- LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
-
- Thread drain = new Thread(() -> {
- try {
- while (true) {
- String s = queue.poll(2, TimeUnit.SECONDS);
- if (s != null) {
- try { onChunk.accept(s); } catch (Exception ignore) {}
- }
- if (doneSeen.get() && queue.isEmpty()) {
- if (!errorSeen.get()) {
- try { if (onComplete != null) onComplete.run(); } catch (Exception ignore) {}
- }
- break;
- }
- }
- } catch (InterruptedException ignore) {
- ignore.printStackTrace();
- }
- });
- drain.setDaemon(true);
- drain.start();
-
- flux.subscribe(payload -> {
- if (payload == null || payload.isEmpty()) return;
- String[] events = payload.split("\\r?\\n\\r?\\n");
- for (String part : events) {
- String s = part;
- if (s == null || s.isEmpty()) continue;
- if (s.startsWith("data:")) {
- s = s.substring(5);
- if (s.startsWith(" ")) s = s.substring(1);
- }
- if ("[DONE]".equals(s.trim())) {
- doneSeen.set(true);
- continue;
- }
- try {
- JSONObject obj = JSON.parseObject(s);
- JSONArray choices = obj.getJSONArray("choices");
- if (choices != null && !choices.isEmpty()) {
- JSONObject c0 = choices.getJSONObject(0);
- JSONObject delta = c0.getJSONObject("delta");
- if (delta != null) {
- String content = delta.getString("content");
- if (content != null) {
- try { queue.offer(content); } catch (Exception ignore) {}
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, err -> {
- errorSeen.set(true);
- doneSeen.set(true);
- if (onError != null) onError.accept(err);
- }, () -> {
- if (!doneSeen.get()) {
- errorSeen.set(true);
- doneSeen.set(true);
- if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
- } else {
- doneSeen.set(true);
- }
- });
+ streamWithFailover(req, onChunk, onComplete, onError, "chat_stream");
}
public void chatStreamWithTools(List<ChatCompletionRequest.Message> messages,
@@ -233,120 +187,54 @@
Runnable onComplete,
Consumer<Throwable> onError) {
ChatCompletionRequest req = new ChatCompletionRequest();
- req.setModel(model);
req.setMessages(messages);
req.setTemperature(temperature != null ? temperature : 0.3);
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(true);
- if(thinking.equals("enable")) {
- ChatCompletionRequest.Thinking thinking = new ChatCompletionRequest.Thinking();
- thinking.setType("enable");
- req.setThinking(thinking);
- }
if (tools != null && !tools.isEmpty()) {
req.setTools(tools);
req.setTool_choice("auto");
}
- Flux<String> flux = llmWebClient.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .retrieve()
- .bodyToFlux(String.class)
- .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
-
- AtomicBoolean doneSeen = new AtomicBoolean(false);
- AtomicBoolean errorSeen = new AtomicBoolean(false);
- LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
-
- Thread drain = new Thread(() -> {
- try {
- while (true) {
- String s = queue.poll(5, TimeUnit.SECONDS);
- if (s != null) {
- try { onChunk.accept(s); } catch (Exception ignore) {}
- }
- if (doneSeen.get() && queue.isEmpty()) {
- if (!errorSeen.get()) {
- try { if (onComplete != null) onComplete.run(); } catch (Exception ignore) {}
- }
- break;
- }
- }
- } catch (InterruptedException ignore) {
- ignore.printStackTrace();
- }
- });
- drain.setDaemon(true);
- drain.start();
-
- flux.subscribe(payload -> {
- if (payload == null || payload.isEmpty()) return;
- String[] events = payload.split("\\r?\\n\\r?\\n");
- for (String part : events) {
- String s = part;
- if (s == null || s.isEmpty()) continue;
- if (s.startsWith("data:")) {
- s = s.substring(5);
- if (s.startsWith(" ")) s = s.substring(1);
- }
- if ("[DONE]".equals(s.trim())) {
- doneSeen.set(true);
- continue;
- }
- try {
- JSONObject obj = JSON.parseObject(s);
- JSONArray choices = obj.getJSONArray("choices");
- if (choices != null && !choices.isEmpty()) {
- JSONObject c0 = choices.getJSONObject(0);
- JSONObject delta = c0.getJSONObject("delta");
- if (delta != null) {
- String content = delta.getString("content");
- if (content != null) {
- try { queue.offer(content); } catch (Exception ignore) {}
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, err -> {
- errorSeen.set(true);
- doneSeen.set(true);
- if (onError != null) onError.accept(err);
- }, () -> {
- if (!doneSeen.get()) {
- errorSeen.set(true);
- doneSeen.set(true);
- if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
- } else {
- doneSeen.set(true);
- }
- });
+ streamWithFailover(req, onChunk, onComplete, onError, tools != null && !tools.isEmpty() ? "chat_stream_tools" : "chat_stream");
}
- public void chatStreamRunPython(String prompt, String chatId, Consumer<String> onChunk,
+ private void streamWithFailover(ChatCompletionRequest req,
+ Consumer<String> onChunk,
Runnable onComplete,
- Consumer<Throwable> onError) {
- HashMap<String, Object> req = new HashMap<>();
- req.put("prompt", prompt);
- req.put("chatId", chatId);
+ Consumer<Throwable> onError,
+ String scene) {
+ String traceId = nextTraceId();
+ List<ResolvedRoute> routes = resolveRoutes();
+ if (routes.isEmpty()) {
+ recordCall(traceId, scene, true, 1, null, false, null, 0L, req, null, "none",
+ new RuntimeException("鏈厤缃彲鐢� LLM 璺敱"), "no_route");
+ if (onError != null) onError.accept(new RuntimeException("鏈厤缃彲鐢� LLM 璺敱"));
+ return;
+ }
+ attemptStream(routes, 0, req, onChunk, onComplete, onError, traceId, scene);
+ }
- Flux<String> flux = llmWebClient.post()
- .uri(pythonPlatformUrl)
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .retrieve()
- .bodyToFlux(String.class)
- .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
+ private void attemptStream(List<ResolvedRoute> routes,
+ int index,
+ ChatCompletionRequest req,
+ Consumer<String> onChunk,
+ Runnable onComplete,
+ Consumer<Throwable> onError,
+ String traceId,
+ String scene) {
+ if (index >= routes.size()) {
+ if (onError != null) onError.accept(new RuntimeException("LLM 璺敱鍏ㄩ儴澶辫触"));
+ return;
+ }
+
+ ResolvedRoute route = routes.get(index);
+ ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, true);
+ long start = System.currentTimeMillis();
+ StringBuilder outputBuffer = new StringBuilder();
AtomicBoolean doneSeen = new AtomicBoolean(false);
AtomicBoolean errorSeen = new AtomicBoolean(false);
+ AtomicBoolean emitted = new AtomicBoolean(false);
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
Thread drain = new Thread(() -> {
@@ -354,6 +242,7 @@
while (true) {
String s = queue.poll(2, TimeUnit.SECONDS);
if (s != null) {
+ emitted.set(true);
try {
onChunk.accept(s);
} catch (Exception ignore) {
@@ -370,14 +259,20 @@
}
}
} catch (InterruptedException ignore) {
- ignore.printStackTrace();
}
});
drain.setDaemon(true);
drain.start();
- flux.subscribe(payload -> {
+ boolean springAiStreaming = canUseSpringAi(routeReq);
+ Flux<String> streamSource = springAiStreaming ? streamFluxWithSpringAi(route, routeReq) : streamFlux(route, routeReq);
+ streamSource.subscribe(payload -> {
if (payload == null || payload.isEmpty()) return;
+ if (springAiStreaming) {
+ queue.offer(payload);
+ appendLimited(outputBuffer, payload);
+ return;
+ }
String[] events = payload.split("\\r?\\n\\r?\\n");
for (String part : events) {
String s = part;
@@ -390,10 +285,6 @@
doneSeen.set(true);
continue;
}
- if("<think>".equals(s.trim()) || "</think>".equals(s.trim())) {
- queue.offer(s.trim());
- continue;
- }
try {
JSONObject obj = JSON.parseObject(s);
JSONArray choices = obj.getJSONArray("choices");
@@ -403,30 +294,251 @@
if (delta != null) {
String content = delta.getString("content");
if (content != null) {
- try {
- queue.offer(content);
- } catch (Exception ignore) {
- }
+ queue.offer(content);
+ appendLimited(outputBuffer, content);
}
}
}
} catch (Exception e) {
- e.printStackTrace();
+ log.warn("瑙f瀽 LLM stream 鐗囨澶辫触: {}", e.getMessage());
}
}
}, err -> {
errorSeen.set(true);
doneSeen.set(true);
+ boolean quota = isQuotaExhausted(err);
+ boolean canSwitch = shouldSwitch(route, quota);
+ markFailure(route, err, canSwitch);
+ recordCall(traceId, scene, true, index + 1, route, false, statusCodeOf(err),
+ System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+ quota ? "quota" : "error", err, "emitted=" + emitted.get());
+ if (!emitted.get() && canSwitch && index < routes.size() - 1) {
+ log.warn("LLM 璺敱澶辫触锛岃嚜鍔ㄥ垏鎹紝current={}, reason={}", route.tag(), errorText(err));
+ attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
+ return;
+ }
if (onError != null) onError.accept(err);
}, () -> {
if (!doneSeen.get()) {
+ RuntimeException ex = new RuntimeException("LLM 娴佹剰澶栧畬鎴�");
errorSeen.set(true);
doneSeen.set(true);
- if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
+ boolean canSwitch = shouldSwitch(route, false);
+ markFailure(route, ex, canSwitch);
+ recordCall(traceId, scene, true, index + 1, route, false, 200,
+ System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+ "error", ex, "unexpected_stream_end");
+ if (!emitted.get() && canSwitch && index < routes.size() - 1) {
+ log.warn("LLM 璺敱娴佸紓甯稿畬鎴愶紝鑷姩鍒囨崲锛宑urrent={}", route.tag());
+ attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
+ } else {
+ if (onError != null) onError.accept(ex);
+ }
} else {
+ markSuccess(route);
+ recordCall(traceId, scene, true, index + 1, route, true, 200,
+ System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+ "none", null, null);
doneSeen.set(true);
}
});
+ }
+
+ private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) {
+ WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
+ return client.post()
+ .uri("/chat/completions")
+ .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .bodyValue(req)
+ .exchangeToFlux(resp -> {
+ int status = resp.rawStatusCode();
+ if (status >= 200 && status < 300) {
+ return resp.bodyToFlux(String.class);
+ }
+ return resp.bodyToMono(String.class)
+ .defaultIfEmpty("")
+ .flatMapMany(body -> Flux.error(new LlmRouteException(status, body)));
+ })
+ .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触, route={}", route.tag(), ex));
+ }
+
+ private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
+ OpenAiApi api = buildOpenAiApi(route);
+ OpenAiApi.ChatCompletionRequest springReq = buildSpringAiRequest(route, req, true);
+ return api.chatCompletionStream(springReq)
+ .flatMapIterable(chunk -> chunk == null || chunk.choices() == null ? List.<OpenAiApi.ChatCompletionChunk.ChunkChoice>of() : chunk.choices())
+ .map(OpenAiApi.ChatCompletionChunk.ChunkChoice::delta)
+ .filter(Objects::nonNull)
+ .map(this::extractSpringAiContent)
+ .filter(text -> text != null && !text.isEmpty())
+ .doOnError(ex -> log.error("璋冪敤 Spring AI 娴佸紡澶辫触, route={}", route.tag(), ex));
+ }
+
+ private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
+ if (canUseSpringAi(req)) {
+ return callCompletionWithSpringAi(route, req);
+ }
+ return callCompletionWithWebClient(route, req);
+ }
+
+ private CompletionCallResult callCompletionWithWebClient(ResolvedRoute route, ChatCompletionRequest req) {
+ WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
+ RawCompletionResult raw = client.post()
+ .uri("/chat/completions")
+ .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
+ .bodyValue(req)
+ .exchangeToMono(resp -> resp.bodyToFlux(String.class)
+ .collectList()
+ .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list))))
+ .block();
+
+ if (raw == null) {
+ throw new RuntimeException("LLM 杩斿洖涓虹┖");
+ }
+ if (raw.statusCode < 200 || raw.statusCode >= 300) {
+ throw new LlmRouteException(raw.statusCode, raw.payload);
+ }
+ return new CompletionCallResult(raw.statusCode, raw.payload, parseCompletion(raw.payload));
+ }
+
+ private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
+ OpenAiApi api = buildOpenAiApi(route);
+ OpenAiApi.ChatCompletionRequest springReq = buildSpringAiRequest(route, req, false);
+ ResponseEntity<OpenAiApi.ChatCompletion> entity = api.chatCompletionEntity(springReq);
+ OpenAiApi.ChatCompletion body = entity.getBody();
+ return new CompletionCallResult(entity.getStatusCode().value(),
+ body == null ? null : JSON.toJSONString(body),
+ toLegacyResponse(body));
+ }
+
+ private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
+ req.setModel(route.model);
+ req.setStream(stream);
+ if (route.thinkingEnabled) {
+ ChatCompletionRequest.Thinking t = new ChatCompletionRequest.Thinking();
+ t.setType("enable");
+ req.setThinking(t);
+ } else {
+ req.setThinking(null);
+ }
+ return req;
+ }
+
+ private ChatCompletionRequest cloneRequest(ChatCompletionRequest src) {
+ ChatCompletionRequest req = new ChatCompletionRequest();
+ req.setModel(src.getModel());
+ req.setMessages(src.getMessages());
+ req.setTemperature(src.getTemperature());
+ req.setMax_tokens(src.getMax_tokens());
+ req.setStream(src.getStream());
+ req.setTools(src.getTools());
+ req.setTool_choice(src.getTool_choice());
+ req.setThinking(src.getThinking());
+ return req;
+ }
+
+ private boolean isValidCompletion(ChatCompletionResponse response) {
+ if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) {
+ return false;
+ }
+ ChatCompletionRequest.Message message = response.getChoices().get(0).getMessage();
+ if (message == null) {
+ return false;
+ }
+ if (!isBlank(message.getContent())) {
+ return true;
+ }
+ return message.getTool_calls() != null && !message.getTool_calls().isEmpty();
+ }
+
+ private boolean shouldSwitch(ResolvedRoute route, boolean quota) {
+ return quota ? route.switchOnQuota : route.switchOnError;
+ }
+
+ private boolean canUseSpringAi(ChatCompletionRequest req) {
+ return req != null && (req.getTools() == null || req.getTools().isEmpty());
+ }
+
+ private void markSuccess(ResolvedRoute route) {
+ if (route.id != null) {
+ llmRoutingService.markSuccess(route.id);
+ }
+ }
+
+ private void markFailure(ResolvedRoute route, Throwable ex, boolean enterCooldown) {
+ if (route.id != null) {
+ llmRoutingService.markFailure(route.id, errorText(ex), enterCooldown, route.cooldownSeconds);
+ }
+ }
+
+ private String errorText(Throwable ex) {
+ if (ex == null) return "unknown";
+ if (ex instanceof LlmRouteException) {
+ LlmRouteException e = (LlmRouteException) ex;
+ String body = e.body == null ? "" : e.body;
+ if (body.length() > 240) {
+ body = body.substring(0, 240);
+ }
+ return "status=" + e.statusCode + ", body=" + body;
+ }
+ if (ex instanceof RestClientResponseException) {
+ RestClientResponseException e = (RestClientResponseException) ex;
+ String body = e.getResponseBodyAsString();
+ if (body != null && body.length() > 240) {
+ body = body.substring(0, 240);
+ }
+ return "status=" + e.getStatusCode().value() + ", body=" + body;
+ }
+ if (ex instanceof WebClientResponseException) {
+ WebClientResponseException e = (WebClientResponseException) ex;
+ String body = e.getResponseBodyAsString();
+ if (body != null && body.length() > 240) {
+ body = body.substring(0, 240);
+ }
+ return "status=" + e.getStatusCode().value() + ", body=" + body;
+ }
+ return ex.getMessage() == null ? ex.toString() : ex.getMessage();
+ }
+
+ private boolean isQuotaExhausted(Throwable ex) {
+ Integer status = statusCodeOf(ex);
+ if (status != null && status == 429) {
+ return true;
+ }
+ String text = responseBodyOf(ex);
+ text = text == null ? "" : text.toLowerCase(Locale.ROOT);
+ return text.contains("insufficient_quota")
+ || text.contains("quota")
+ || text.contains("浣欓")
+ || text.contains("鐢ㄩ噺")
+ || text.contains("瓒呴檺")
+ || text.contains("rate limit");
+ }
+
+ private List<ResolvedRoute> resolveRoutes() {
+ List<ResolvedRoute> routes = new ArrayList<>();
+ List<LlmRouteConfig> dbRoutes = llmRoutingService.listAvailableRoutes();
+ for (LlmRouteConfig c : dbRoutes) {
+ routes.add(ResolvedRoute.fromDb(c));
+ }
+ // 鍏煎锛氭暟鎹簱涓虹┖鏃讹紝鍥為��鍒� yml
+ if (routes.isEmpty() && !isBlank(fallbackBaseUrl) && !isBlank(fallbackApiKey) && !isBlank(fallbackModel)) {
+ routes.add(ResolvedRoute.fromFallback(fallbackBaseUrl, fallbackApiKey, fallbackModel, isFallbackThinkingEnabled()));
+ }
+ return routes;
+ }
+
+ private boolean isFallbackThinkingEnabled() {
+ String x = fallbackThinking == null ? "" : fallbackThinking.trim().toLowerCase();
+ return "true".equals(x) || "1".equals(x) || "enable".equals(x);
+ }
+
+ private boolean isBlank(String s) {
+ return s == null || s.trim().isEmpty();
}
private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
@@ -452,7 +564,7 @@
ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
choice.setMessage(msg);
- java.util.ArrayList<ChatCompletionResponse.Choice> list = new java.util.ArrayList<>();
+ ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
list.add(choice);
acc.setChoices(list);
}
@@ -490,7 +602,8 @@
if (created != null) acc.setCreated(created);
String object = obj.getString("object");
if (object != null && !object.isEmpty()) acc.setObjectName(object);
- } catch (Exception ignore) {}
+ } catch (Exception ignore) {
+ }
}
return acc;
}
@@ -502,7 +615,8 @@
if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
return r;
}
- } catch (Exception ignore) {}
+ } catch (Exception ignore) {
+ }
ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
return sse;
@@ -513,9 +627,345 @@
msg.setRole("assistant");
msg.setContent(payload);
choice.setMessage(msg);
- java.util.ArrayList<ChatCompletionResponse.Choice> list = new java.util.ArrayList<>();
+ ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
list.add(choice);
r.setChoices(list);
return r;
}
+
+ private String nextTraceId() {
+ return UUID.randomUUID().toString().replace("-", "");
+ }
+
+ private void appendLimited(StringBuilder sb, String text) {
+ if (sb == null || text == null || text.isEmpty()) {
+ return;
+ }
+ int remain = LOG_TEXT_LIMIT - sb.length();
+ if (remain <= 0) {
+ return;
+ }
+ if (text.length() <= remain) {
+ sb.append(text);
+ } else {
+ sb.append(text, 0, remain);
+ }
+ }
+
+ private Integer statusCodeOf(Throwable ex) {
+ if (ex instanceof LlmRouteException) {
+ return ((LlmRouteException) ex).statusCode;
+ }
+ if (ex instanceof RestClientResponseException) {
+ return ((RestClientResponseException) ex).getStatusCode().value();
+ }
+ if (ex instanceof WebClientResponseException) {
+ return ((WebClientResponseException) ex).getStatusCode().value();
+ }
+ return null;
+ }
+
+ private String responseBodyOf(Throwable ex) {
+ if (ex instanceof LlmRouteException) {
+ return cut(((LlmRouteException) ex).body, LOG_TEXT_LIMIT);
+ }
+ if (ex instanceof RestClientResponseException) {
+ return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
+ }
+ if (ex instanceof WebClientResponseException) {
+ return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
+ }
+ return null;
+ }
+
+ private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) {
+ if (resp != null && resp.getChoices() != null && !resp.getChoices().isEmpty()
+ && resp.getChoices().get(0) != null && resp.getChoices().get(0).getMessage() != null) {
+ ChatCompletionRequest.Message m = resp.getChoices().get(0).getMessage();
+ if (!isBlank(m.getContent())) {
+ return cut(m.getContent(), LOG_TEXT_LIMIT);
+ }
+ if (m.getTool_calls() != null && !m.getTool_calls().isEmpty()) {
+ return cut(JSON.toJSONString(m), LOG_TEXT_LIMIT);
+ }
+ }
+ return cut(fallbackPayload, LOG_TEXT_LIMIT);
+ }
+
+ private String safeName(Throwable ex) {
+ return ex == null ? null : ex.getClass().getSimpleName();
+ }
+
+ private OpenAiApi buildOpenAiApi(ResolvedRoute route) {
+ return OpenAiApi.builder()
+ .baseUrl(route.baseUrl)
+ .apiKey(route.apiKey)
+ .build();
+ }
+
+ private OpenAiApi.ChatCompletionRequest buildSpringAiRequest(ResolvedRoute route,
+ ChatCompletionRequest req,
+ boolean stream) {
+ HashMap<String, Object> extraBody = new HashMap<>();
+ if (route.thinkingEnabled || req.getThinking() != null) {
+ HashMap<String, Object> thinking = new HashMap<>();
+ thinking.put("type", req.getThinking() != null && req.getThinking().getType() != null
+ ? req.getThinking().getType()
+ : "enable");
+ extraBody.put("thinking", thinking);
+ }
+ return new OpenAiApi.ChatCompletionRequest(
+ toSpringAiMessages(req.getMessages()),
+ route.model,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ req.getMax_tokens(),
+ null,
+ 1,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ stream,
+ stream ? OpenAiApi.ChatCompletionRequest.StreamOptions.INCLUDE_USAGE : null,
+ req.getTemperature(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ extraBody.isEmpty() ? null : extraBody
+ );
+ }
+
+ private List<OpenAiApi.ChatCompletionMessage> toSpringAiMessages(List<ChatCompletionRequest.Message> messages) {
+ ArrayList<OpenAiApi.ChatCompletionMessage> result = new ArrayList<>();
+ if (messages == null) {
+ return result;
+ }
+ for (ChatCompletionRequest.Message message : messages) {
+ if (message == null) {
+ continue;
+ }
+ result.add(new OpenAiApi.ChatCompletionMessage(
+ message.getContent(),
+ toSpringAiRole(message.getRole())
+ ));
+ }
+ return result;
+ }
+
+ private OpenAiApi.ChatCompletionMessage.Role toSpringAiRole(String role) {
+ if (role == null) {
+ return OpenAiApi.ChatCompletionMessage.Role.USER;
+ }
+ switch (role.trim().toLowerCase(Locale.ROOT)) {
+ case "system":
+ return OpenAiApi.ChatCompletionMessage.Role.SYSTEM;
+ case "assistant":
+ return OpenAiApi.ChatCompletionMessage.Role.ASSISTANT;
+ case "tool":
+ return OpenAiApi.ChatCompletionMessage.Role.TOOL;
+ default:
+ return OpenAiApi.ChatCompletionMessage.Role.USER;
+ }
+ }
+
+ private ChatCompletionResponse toLegacyResponse(OpenAiApi.ChatCompletion completion) {
+ if (completion == null) {
+ return null;
+ }
+ ChatCompletionResponse response = new ChatCompletionResponse();
+ response.setId(completion.id());
+ response.setCreated(completion.created());
+ response.setObjectName(completion.object());
+ if (completion.usage() != null) {
+ ChatCompletionResponse.Usage usage = new ChatCompletionResponse.Usage();
+ usage.setPromptTokens(completion.usage().promptTokens());
+ usage.setCompletionTokens(completion.usage().completionTokens());
+ usage.setTotalTokens(completion.usage().totalTokens());
+ response.setUsage(usage);
+ }
+ if (completion.choices() != null) {
+ ArrayList<ChatCompletionResponse.Choice> choices = new ArrayList<>();
+ for (OpenAiApi.ChatCompletion.Choice choice : completion.choices()) {
+ ChatCompletionResponse.Choice item = new ChatCompletionResponse.Choice();
+ item.setIndex(choice.index());
+ if (choice.finishReason() != null) {
+ item.setFinishReason(choice.finishReason().name().toLowerCase(Locale.ROOT));
+ }
+ item.setMessage(toLegacyMessage(choice.message()));
+ choices.add(item);
+ }
+ response.setChoices(choices);
+ }
+ return response;
+ }
+
+ private ChatCompletionRequest.Message toLegacyMessage(OpenAiApi.ChatCompletionMessage message) {
+ if (message == null) {
+ return null;
+ }
+ ChatCompletionRequest.Message result = new ChatCompletionRequest.Message();
+ result.setContent(extractSpringAiContent(message));
+ if (message.role() != null) {
+ result.setRole(message.role().name().toLowerCase(Locale.ROOT));
+ }
+ result.setName(message.name());
+ result.setTool_call_id(message.toolCallId());
+ return result;
+ }
+
+ private String extractSpringAiContent(OpenAiApi.ChatCompletionMessage message) {
+ if (message == null || message.rawContent() == null) {
+ return null;
+ }
+ Object content = message.rawContent();
+ if (content instanceof String) {
+ return (String) content;
+ }
+ if (content instanceof List) {
+ try {
+ @SuppressWarnings("unchecked")
+ List<OpenAiApi.ChatCompletionMessage.MediaContent> media =
+ (List<OpenAiApi.ChatCompletionMessage.MediaContent>) content;
+ return OpenAiApi.getTextContent(media);
+ } catch (ClassCastException ignore) {
+ }
+ }
+ return String.valueOf(content);
+ }
+
+ private String cut(String text, int maxLen) {
+ if (text == null) return null;
+ String clean = text.replace("\r", " ");
+ return clean.length() > maxLen ? clean.substring(0, maxLen) : clean;
+ }
+
+ private void recordCall(String traceId,
+ String scene,
+ boolean stream,
+ int attemptNo,
+ ResolvedRoute route,
+ boolean success,
+ Integer httpStatus,
+ long latencyMs,
+ ChatCompletionRequest req,
+ String response,
+ String switchMode,
+ Throwable err,
+ String extra) {
+ LlmCallLog item = new LlmCallLog();
+ item.setTraceId(cut(traceId, 64));
+ item.setScene(cut(scene, 64));
+ item.setStream((short) (stream ? 1 : 0));
+ item.setAttemptNo(attemptNo);
+ if (route != null) {
+ item.setRouteId(route.id);
+ item.setRouteName(cut(route.name, 128));
+ item.setBaseUrl(cut(route.baseUrl, 255));
+ item.setModel(cut(route.model, 128));
+ }
+ item.setSuccess((short) (success ? 1 : 0));
+ item.setHttpStatus(httpStatus);
+ item.setLatencyMs(latencyMs < 0 ? 0 : latencyMs);
+ item.setSwitchMode(cut(switchMode, 32));
+ item.setRequestContent(cut(JSON.toJSONString(req), LOG_TEXT_LIMIT));
+ item.setResponseContent(cut(response, LOG_TEXT_LIMIT));
+ item.setErrorType(cut(safeName(err), 128));
+ item.setErrorMessage(err == null ? null : cut(errorText(err), 1024));
+ item.setExtra(cut(extra, 512));
+ item.setCreateTime(new Date());
+ llmCallLogService.saveIgnoreError(item);
+ }
+
+ private static class CompletionCallResult {
+ private final int statusCode;
+ private final String payload;
+ private final ChatCompletionResponse response;
+
+ private CompletionCallResult(int statusCode, String payload, ChatCompletionResponse response) {
+ this.statusCode = statusCode;
+ this.payload = payload;
+ this.response = response;
+ }
+ }
+
+ private static class RawCompletionResult {
+ private final int statusCode;
+ private final String payload;
+
+ private RawCompletionResult(int statusCode, String payload) {
+ this.statusCode = statusCode;
+ this.payload = payload;
+ }
+ }
+
+ private static class LlmRouteException extends RuntimeException {
+ private final int statusCode;
+ private final String body;
+
+ private LlmRouteException(int statusCode, String body) {
+ super("http status=" + statusCode);
+ this.statusCode = statusCode;
+ this.body = body;
+ }
+ }
+
+ private static class ResolvedRoute {
+ private Long id;
+ private String name;
+ private String baseUrl;
+ private String apiKey;
+ private String model;
+ private boolean thinkingEnabled;
+ private boolean switchOnQuota;
+ private boolean switchOnError;
+ private Integer cooldownSeconds;
+
+ private static ResolvedRoute fromDb(LlmRouteConfig c) {
+ ResolvedRoute r = new ResolvedRoute();
+ r.id = c.getId();
+ r.name = c.getName();
+ r.baseUrl = c.getBaseUrl();
+ r.apiKey = c.getApiKey();
+ r.model = c.getModel();
+ r.thinkingEnabled = c.getThinking() != null && c.getThinking() == 1;
+ r.switchOnQuota = c.getSwitchOnQuota() == null || c.getSwitchOnQuota() == 1;
+ r.switchOnError = c.getSwitchOnError() == null || c.getSwitchOnError() == 1;
+ r.cooldownSeconds = c.getCooldownSeconds();
+ return r;
+ }
+
+ private static ResolvedRoute fromFallback(String baseUrl, String apiKey, String model, boolean thinkingEnabled) {
+ ResolvedRoute r = new ResolvedRoute();
+ r.name = "fallback-yml";
+ r.baseUrl = baseUrl;
+ r.apiKey = apiKey;
+ r.model = model;
+ r.thinkingEnabled = thinkingEnabled;
+ r.switchOnQuota = true;
+ r.switchOnError = true;
+ r.cooldownSeconds = 300;
+ return r;
+ }
+
+ private String tag() {
+ String showName = name == null ? "unnamed" : name;
+ String showModel = model == null ? "" : (" model=" + model);
+ return showName + showModel;
+ }
+ }
}
--
Gitblit v1.9.1