From 8636ff97bffec9f2130628bf09c9d0fbb371e2bc Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 10 三月 2026 16:53:24 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/ai/service/LlmChatService.java | 433 ++---------------------------------------------------
1 files changed, 20 insertions(+), 413 deletions(-)
diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index c92ede3..e2eddd6 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -1,22 +1,15 @@
package com.zy.ai.service;
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
import com.zy.ai.entity.ChatCompletionRequest;
import com.zy.ai.entity.ChatCompletionResponse;
import com.zy.ai.entity.LlmCallLog;
import com.zy.ai.entity.LlmRouteConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.ai.openai.api.OpenAiApi;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientResponseException;
-import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
@@ -25,7 +18,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
-import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -41,6 +33,7 @@
private final LlmRoutingService llmRoutingService;
private final LlmCallLogService llmCallLogService;
+ private final LlmSpringAiClientService llmSpringAiClientService;
@Value("${llm.base-url:}")
private String fallbackBaseUrl;
@@ -264,45 +257,11 @@
drain.setDaemon(true);
drain.start();
- boolean springAiStreaming = canUseSpringAi(routeReq);
- Flux<String> streamSource = springAiStreaming ? streamFluxWithSpringAi(route, routeReq) : streamFlux(route, routeReq);
+ Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq);
streamSource.subscribe(payload -> {
if (payload == null || payload.isEmpty()) return;
- if (springAiStreaming) {
- queue.offer(payload);
- appendLimited(outputBuffer, payload);
- return;
- }
- String[] events = payload.split("\\r?\\n\\r?\\n");
- for (String part : events) {
- String s = part;
- if (s == null || s.isEmpty()) continue;
- if (s.startsWith("data:")) {
- s = s.substring(5);
- if (s.startsWith(" ")) s = s.substring(1);
- }
- if ("[DONE]".equals(s.trim())) {
- doneSeen.set(true);
- continue;
- }
- try {
- JSONObject obj = JSON.parseObject(s);
- JSONArray choices = obj.getJSONArray("choices");
- if (choices != null && !choices.isEmpty()) {
- JSONObject c0 = choices.getJSONObject(0);
- JSONObject delta = c0.getJSONObject("delta");
- if (delta != null) {
- String content = delta.getString("content");
- if (content != null) {
- queue.offer(content);
- appendLimited(outputBuffer, content);
- }
- }
- }
- } catch (Exception e) {
- log.warn("瑙f瀽 LLM stream 鐗囨澶辫触: {}", e.getMessage());
- }
- }
+ queue.offer(payload);
+ appendLimited(outputBuffer, payload);
}, err -> {
errorSeen.set(true);
doneSeen.set(true);
@@ -319,100 +278,27 @@
}
if (onError != null) onError.accept(err);
}, () -> {
- if (!doneSeen.get()) {
- RuntimeException ex = new RuntimeException("LLM 娴佹剰澶栧畬鎴�");
- errorSeen.set(true);
- doneSeen.set(true);
- boolean canSwitch = shouldSwitch(route, false);
- markFailure(route, ex, canSwitch);
- recordCall(traceId, scene, true, index + 1, route, false, 200,
- System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
- "error", ex, "unexpected_stream_end");
- if (!emitted.get() && canSwitch && index < routes.size() - 1) {
- log.warn("LLM 璺敱娴佸紓甯稿畬鎴愶紝鑷姩鍒囨崲锛宑urrent={}", route.tag());
- attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
- } else {
- if (onError != null) onError.accept(ex);
- }
- } else {
- markSuccess(route);
- recordCall(traceId, scene, true, index + 1, route, true, 200,
- System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
- "none", null, null);
- doneSeen.set(true);
- }
+ markSuccess(route);
+ recordCall(traceId, scene, true, index + 1, route, true, 200,
+ System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+ "none", null, null);
+ doneSeen.set(true);
});
}
- private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) {
- WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
- return client.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .exchangeToFlux(resp -> {
- int status = resp.rawStatusCode();
- if (status >= 200 && status < 300) {
- return resp.bodyToFlux(String.class);
- }
- return resp.bodyToMono(String.class)
- .defaultIfEmpty("")
- .flatMapMany(body -> Flux.error(new LlmRouteException(status, body)));
- })
- .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触, route={}", route.tag(), ex));
- }
-
private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
- OpenAiApi api = buildOpenAiApi(route);
- OpenAiApi.ChatCompletionRequest springReq = buildSpringAiRequest(route, req, true);
- return api.chatCompletionStream(springReq)
- .flatMapIterable(chunk -> chunk == null || chunk.choices() == null ? List.<OpenAiApi.ChatCompletionChunk.ChunkChoice>of() : chunk.choices())
- .map(OpenAiApi.ChatCompletionChunk.ChunkChoice::delta)
- .filter(Objects::nonNull)
- .map(this::extractSpringAiContent)
- .filter(text -> text != null && !text.isEmpty())
+ return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req)
.doOnError(ex -> log.error("璋冪敤 Spring AI 娴佸紡澶辫触, route={}", route.tag(), ex));
}
private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
- if (canUseSpringAi(req)) {
- return callCompletionWithSpringAi(route, req);
- }
- return callCompletionWithWebClient(route, req);
- }
-
- private CompletionCallResult callCompletionWithWebClient(ResolvedRoute route, ChatCompletionRequest req) {
- WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
- RawCompletionResult raw = client.post()
- .uri("/chat/completions")
- .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
- .contentType(MediaType.APPLICATION_JSON)
- .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
- .bodyValue(req)
- .exchangeToMono(resp -> resp.bodyToFlux(String.class)
- .collectList()
- .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list))))
- .block();
-
- if (raw == null) {
- throw new RuntimeException("LLM 杩斿洖涓虹┖");
- }
- if (raw.statusCode < 200 || raw.statusCode >= 300) {
- throw new LlmRouteException(raw.statusCode, raw.payload);
- }
- return new CompletionCallResult(raw.statusCode, raw.payload, parseCompletion(raw.payload));
+ return callCompletionWithSpringAi(route, req);
}
private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
- OpenAiApi api = buildOpenAiApi(route);
- OpenAiApi.ChatCompletionRequest springReq = buildSpringAiRequest(route, req, false);
- ResponseEntity<OpenAiApi.ChatCompletion> entity = api.chatCompletionEntity(springReq);
- OpenAiApi.ChatCompletion body = entity.getBody();
- return new CompletionCallResult(entity.getStatusCode().value(),
- body == null ? null : JSON.toJSONString(body),
- toLegacyResponse(body));
+ LlmSpringAiClientService.CompletionCallResult result =
+ llmSpringAiClientService.callCompletion(route.baseUrl, route.apiKey, req);
+ return new CompletionCallResult(result.getStatusCode(), result.getPayload(), result.getResponse());
}
private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
@@ -459,10 +345,6 @@
return quota ? route.switchOnQuota : route.switchOnError;
}
- private boolean canUseSpringAi(ChatCompletionRequest req) {
- return req != null && (req.getTools() == null || req.getTools().isEmpty());
- }
-
private void markSuccess(ResolvedRoute route) {
if (route.id != null) {
llmRoutingService.markSuccess(route.id);
@@ -477,14 +359,6 @@
private String errorText(Throwable ex) {
if (ex == null) return "unknown";
- if (ex instanceof LlmRouteException) {
- LlmRouteException e = (LlmRouteException) ex;
- String body = e.body == null ? "" : e.body;
- if (body.length() > 240) {
- body = body.substring(0, 240);
- }
- return "status=" + e.statusCode + ", body=" + body;
- }
if (ex instanceof RestClientResponseException) {
RestClientResponseException e = (RestClientResponseException) ex;
String body = e.getResponseBodyAsString();
@@ -500,6 +374,10 @@
body = body.substring(0, 240);
}
return "status=" + e.getStatusCode().value() + ", body=" + body;
+ }
+ Integer springAiStatus = llmSpringAiClientService.statusCodeOf(ex);
+ if (springAiStatus != null) {
+ return "status=" + springAiStatus + ", body=" + llmSpringAiClientService.responseBodyOf(ex, 240);
}
return ex.getMessage() == null ? ex.toString() : ex.getMessage();
}
@@ -541,98 +419,6 @@
return s == null || s.trim().isEmpty();
}
- private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
- if (payload == null || payload.isEmpty()) return acc;
- String[] events = payload.split("\\r?\\n\\r?\\n");
- for (String part : events) {
- String s = part;
- if (s == null || s.isEmpty()) continue;
- if (s.startsWith("data:")) {
- s = s.substring(5);
- if (s.startsWith(" ")) s = s.substring(1);
- }
- if ("[DONE]".equals(s.trim())) {
- continue;
- }
- try {
- JSONObject obj = JSON.parseObject(s);
- if (obj == null) continue;
- JSONArray choices = obj.getJSONArray("choices");
- if (choices != null && !choices.isEmpty()) {
- JSONObject c0 = choices.getJSONObject(0);
- if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
- ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
- ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
- choice.setMessage(msg);
- ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
- list.add(choice);
- acc.setChoices(list);
- }
- ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
- ChatCompletionRequest.Message msg = choice.getMessage();
- if (msg.getRole() == null || msg.getRole().isEmpty()) {
- msg.setRole("assistant");
- }
- JSONObject delta = c0.getJSONObject("delta");
- if (delta != null) {
- String c = delta.getString("content");
- if (c != null) {
- String prev = msg.getContent();
- msg.setContent(prev == null ? c : prev + c);
- }
- String role = delta.getString("role");
- if (role != null && !role.isEmpty()) msg.setRole(role);
- }
- JSONObject message = c0.getJSONObject("message");
- if (message != null) {
- String c = message.getString("content");
- if (c != null) {
- String prev = msg.getContent();
- msg.setContent(prev == null ? c : prev + c);
- }
- String role = message.getString("role");
- if (role != null && !role.isEmpty()) msg.setRole(role);
- }
- String fr = c0.getString("finish_reason");
- if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
- }
- String id = obj.getString("id");
- if (id != null && !id.isEmpty()) acc.setId(id);
- Long created = obj.getLong("created");
- if (created != null) acc.setCreated(created);
- String object = obj.getString("object");
- if (object != null && !object.isEmpty()) acc.setObjectName(object);
- } catch (Exception ignore) {
- }
- }
- return acc;
- }
-
- private ChatCompletionResponse parseCompletion(String payload) {
- if (payload == null) return null;
- try {
- ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
- if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
- return r;
- }
- } catch (Exception ignore) {
- }
- ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
- if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
- return sse;
- }
- ChatCompletionResponse r = new ChatCompletionResponse();
- ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
- ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
- msg.setRole("assistant");
- msg.setContent(payload);
- choice.setMessage(msg);
- ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
- list.add(choice);
- r.setChoices(list);
- return r;
- }
-
private String nextTraceId() {
return UUID.randomUUID().toString().replace("-", "");
}
@@ -653,29 +439,23 @@
}
private Integer statusCodeOf(Throwable ex) {
- if (ex instanceof LlmRouteException) {
- return ((LlmRouteException) ex).statusCode;
- }
if (ex instanceof RestClientResponseException) {
return ((RestClientResponseException) ex).getStatusCode().value();
}
if (ex instanceof WebClientResponseException) {
return ((WebClientResponseException) ex).getStatusCode().value();
}
- return null;
+ return llmSpringAiClientService.statusCodeOf(ex);
}
private String responseBodyOf(Throwable ex) {
- if (ex instanceof LlmRouteException) {
- return cut(((LlmRouteException) ex).body, LOG_TEXT_LIMIT);
- }
if (ex instanceof RestClientResponseException) {
return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
}
if (ex instanceof WebClientResponseException) {
return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
}
- return null;
+ return cut(llmSpringAiClientService.responseBodyOf(ex, LOG_TEXT_LIMIT), LOG_TEXT_LIMIT);
}
private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) {
@@ -694,158 +474,6 @@
private String safeName(Throwable ex) {
return ex == null ? null : ex.getClass().getSimpleName();
- }
-
- private OpenAiApi buildOpenAiApi(ResolvedRoute route) {
- return OpenAiApi.builder()
- .baseUrl(route.baseUrl)
- .apiKey(route.apiKey)
- .build();
- }
-
- private OpenAiApi.ChatCompletionRequest buildSpringAiRequest(ResolvedRoute route,
- ChatCompletionRequest req,
- boolean stream) {
- HashMap<String, Object> extraBody = new HashMap<>();
- if (route.thinkingEnabled || req.getThinking() != null) {
- HashMap<String, Object> thinking = new HashMap<>();
- thinking.put("type", req.getThinking() != null && req.getThinking().getType() != null
- ? req.getThinking().getType()
- : "enable");
- extraBody.put("thinking", thinking);
- }
- return new OpenAiApi.ChatCompletionRequest(
- toSpringAiMessages(req.getMessages()),
- route.model,
- null,
- null,
- null,
- null,
- null,
- null,
- req.getMax_tokens(),
- null,
- 1,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- stream,
- stream ? OpenAiApi.ChatCompletionRequest.StreamOptions.INCLUDE_USAGE : null,
- req.getTemperature(),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- extraBody.isEmpty() ? null : extraBody
- );
- }
-
- private List<OpenAiApi.ChatCompletionMessage> toSpringAiMessages(List<ChatCompletionRequest.Message> messages) {
- ArrayList<OpenAiApi.ChatCompletionMessage> result = new ArrayList<>();
- if (messages == null) {
- return result;
- }
- for (ChatCompletionRequest.Message message : messages) {
- if (message == null) {
- continue;
- }
- result.add(new OpenAiApi.ChatCompletionMessage(
- message.getContent(),
- toSpringAiRole(message.getRole())
- ));
- }
- return result;
- }
-
- private OpenAiApi.ChatCompletionMessage.Role toSpringAiRole(String role) {
- if (role == null) {
- return OpenAiApi.ChatCompletionMessage.Role.USER;
- }
- switch (role.trim().toLowerCase(Locale.ROOT)) {
- case "system":
- return OpenAiApi.ChatCompletionMessage.Role.SYSTEM;
- case "assistant":
- return OpenAiApi.ChatCompletionMessage.Role.ASSISTANT;
- case "tool":
- return OpenAiApi.ChatCompletionMessage.Role.TOOL;
- default:
- return OpenAiApi.ChatCompletionMessage.Role.USER;
- }
- }
-
- private ChatCompletionResponse toLegacyResponse(OpenAiApi.ChatCompletion completion) {
- if (completion == null) {
- return null;
- }
- ChatCompletionResponse response = new ChatCompletionResponse();
- response.setId(completion.id());
- response.setCreated(completion.created());
- response.setObjectName(completion.object());
- if (completion.usage() != null) {
- ChatCompletionResponse.Usage usage = new ChatCompletionResponse.Usage();
- usage.setPromptTokens(completion.usage().promptTokens());
- usage.setCompletionTokens(completion.usage().completionTokens());
- usage.setTotalTokens(completion.usage().totalTokens());
- response.setUsage(usage);
- }
- if (completion.choices() != null) {
- ArrayList<ChatCompletionResponse.Choice> choices = new ArrayList<>();
- for (OpenAiApi.ChatCompletion.Choice choice : completion.choices()) {
- ChatCompletionResponse.Choice item = new ChatCompletionResponse.Choice();
- item.setIndex(choice.index());
- if (choice.finishReason() != null) {
- item.setFinishReason(choice.finishReason().name().toLowerCase(Locale.ROOT));
- }
- item.setMessage(toLegacyMessage(choice.message()));
- choices.add(item);
- }
- response.setChoices(choices);
- }
- return response;
- }
-
- private ChatCompletionRequest.Message toLegacyMessage(OpenAiApi.ChatCompletionMessage message) {
- if (message == null) {
- return null;
- }
- ChatCompletionRequest.Message result = new ChatCompletionRequest.Message();
- result.setContent(extractSpringAiContent(message));
- if (message.role() != null) {
- result.setRole(message.role().name().toLowerCase(Locale.ROOT));
- }
- result.setName(message.name());
- result.setTool_call_id(message.toolCallId());
- return result;
- }
-
- private String extractSpringAiContent(OpenAiApi.ChatCompletionMessage message) {
- if (message == null || message.rawContent() == null) {
- return null;
- }
- Object content = message.rawContent();
- if (content instanceof String) {
- return (String) content;
- }
- if (content instanceof List) {
- try {
- @SuppressWarnings("unchecked")
- List<OpenAiApi.ChatCompletionMessage.MediaContent> media =
- (List<OpenAiApi.ChatCompletionMessage.MediaContent>) content;
- return OpenAiApi.getTextContent(media);
- } catch (ClassCastException ignore) {
- }
- }
- return String.valueOf(content);
}
private String cut(String text, int maxLen) {
@@ -900,27 +528,6 @@
this.statusCode = statusCode;
this.payload = payload;
this.response = response;
- }
- }
-
- private static class RawCompletionResult {
- private final int statusCode;
- private final String payload;
-
- private RawCompletionResult(int statusCode, String payload) {
- this.statusCode = statusCode;
- this.payload = payload;
- }
- }
-
- private static class LlmRouteException extends RuntimeException {
- private final int statusCode;
- private final String body;
-
- private LlmRouteException(int statusCode, String body) {
- super("http status=" + statusCode);
- this.statusCode = statusCode;
- this.body = body;
}
}
--
Gitblit v1.9.1