package com.zy.ai.service;
|
|
import com.alibaba.fastjson.JSON;
|
import com.zy.ai.entity.ChatCompletionRequest;
|
import com.zy.ai.entity.ChatCompletionResponse;
|
import com.zy.ai.entity.LlmCallLog;
|
import com.zy.ai.entity.LlmRouteConfig;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Service;
|
import org.springframework.web.client.RestClientResponseException;
|
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
import reactor.core.publisher.Flux;
|
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Locale;
|
import java.util.UUID;
|
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.function.Consumer;
|
|
@Slf4j
|
@Service
|
@RequiredArgsConstructor
|
public class LlmChatService {
|
|
private static final int LOG_TEXT_LIMIT = 16000;
|
|
private final LlmRoutingService llmRoutingService;
|
private final LlmCallLogService llmCallLogService;
|
private final LlmSpringAiClientService llmSpringAiClientService;
|
|
@Value("${llm.base-url:}")
|
private String fallbackBaseUrl;
|
|
@Value("${llm.api-key:}")
|
private String fallbackApiKey;
|
|
@Value("${llm.model:}")
|
private String fallbackModel;
|
|
@Value("${llm.thinking:false}")
|
private String fallbackThinking;
|
|
/**
|
* 通用对话方法:传入 messages,返回大模型文本回复
|
*/
|
public String chat(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens) {
|
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(false);
|
|
ChatCompletionResponse response = complete(req, "chat");
|
|
if (response == null ||
|
response.getChoices() == null ||
|
response.getChoices().isEmpty() ||
|
response.getChoices().get(0).getMessage() == null ||
|
response.getChoices().get(0).getMessage().getContent() == null ||
|
response.getChoices().get(0).getMessage().getContent().isEmpty()) {
|
return null;
|
}
|
|
return response.getChoices().get(0).getMessage().getContent();
|
}
|
|
public ChatCompletionResponse chatCompletion(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens,
|
List<Object> tools) {
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(false);
|
if (tools != null && !tools.isEmpty()) {
|
req.setTools(tools);
|
req.setTool_choice("auto");
|
}
|
return complete(req, tools != null && !tools.isEmpty() ? "chat_completion_tools" : "chat_completion");
|
}
|
|
public ChatCompletionResponse complete(ChatCompletionRequest req) {
|
return complete(req, "completion");
|
}
|
|
private ChatCompletionResponse complete(ChatCompletionRequest req, String scene) {
|
String traceId = nextTraceId();
|
List<ResolvedRoute> routes = resolveRoutes();
|
if (routes.isEmpty()) {
|
log.error("调用 LLM 失败: 未配置可用 LLM 路由");
|
recordCall(traceId, scene, false, 1, null, false, null, 0L, req, null, "none",
|
new RuntimeException("未配置可用 LLM 路由"), "no_route");
|
return null;
|
}
|
|
Throwable last = null;
|
for (int i = 0; i < routes.size(); i++) {
|
ResolvedRoute route = routes.get(i);
|
boolean hasNext = i < routes.size() - 1;
|
ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, false);
|
long start = System.currentTimeMillis();
|
try {
|
CompletionCallResult callResult = callCompletion(route, routeReq);
|
ChatCompletionResponse resp = callResult.response;
|
if (!isValidCompletion(resp)) {
|
RuntimeException ex = new RuntimeException("LLM 响应为空");
|
boolean canSwitch = shouldSwitch(route, false);
|
markFailure(route, ex, canSwitch);
|
recordCall(traceId, scene, false, i + 1, route, false, callResult.statusCode,
|
System.currentTimeMillis() - start, routeReq, callResult.payload, "error", ex,
|
"invalid_completion");
|
if (hasNext && canSwitch) {
|
log.warn("LLM 切换到下一路由, current={}, reason={}", route.tag(), ex.getMessage());
|
continue;
|
}
|
log.error("调用 LLM 失败, route={}", route.tag(), ex);
|
last = ex;
|
break;
|
}
|
markSuccess(route);
|
recordCall(traceId, scene, false, i + 1, route, true, callResult.statusCode,
|
System.currentTimeMillis() - start, routeReq, buildResponseText(resp, callResult.payload),
|
"none", null, null);
|
return resp;
|
} catch (Throwable ex) {
|
last = ex;
|
boolean quota = isQuotaExhausted(ex);
|
boolean canSwitch = shouldSwitch(route, quota);
|
markFailure(route, ex, canSwitch);
|
recordCall(traceId, scene, false, i + 1, route, false, statusCodeOf(ex),
|
System.currentTimeMillis() - start, routeReq, responseBodyOf(ex),
|
quota ? "quota" : "error", ex, null);
|
if (hasNext && canSwitch) {
|
log.warn("LLM 切换到下一路由, current={}, reason={}", route.tag(), errorText(ex));
|
continue;
|
}
|
log.error("调用 LLM 失败, route={}", route.tag(), ex);
|
break;
|
}
|
}
|
|
if (last != null) {
|
log.error("调用 LLM 全部路由失败: {}", errorText(last));
|
}
|
return null;
|
}
|
|
public void chatStream(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError) {
|
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(true);
|
|
streamWithFailover(req, onChunk, onComplete, onError, "chat_stream");
|
}
|
|
public void chatStreamWithTools(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens,
|
List<Object> tools,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError) {
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(true);
|
if (tools != null && !tools.isEmpty()) {
|
req.setTools(tools);
|
req.setTool_choice("auto");
|
}
|
streamWithFailover(req, onChunk, onComplete, onError, tools != null && !tools.isEmpty() ? "chat_stream_tools" : "chat_stream");
|
}
|
|
private void streamWithFailover(ChatCompletionRequest req,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError,
|
String scene) {
|
String traceId = nextTraceId();
|
List<ResolvedRoute> routes = resolveRoutes();
|
if (routes.isEmpty()) {
|
recordCall(traceId, scene, true, 1, null, false, null, 0L, req, null, "none",
|
new RuntimeException("未配置可用 LLM 路由"), "no_route");
|
if (onError != null) onError.accept(new RuntimeException("未配置可用 LLM 路由"));
|
return;
|
}
|
attemptStream(routes, 0, req, onChunk, onComplete, onError, traceId, scene);
|
}
|
|
private void attemptStream(List<ResolvedRoute> routes,
|
int index,
|
ChatCompletionRequest req,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError,
|
String traceId,
|
String scene) {
|
if (index >= routes.size()) {
|
if (onError != null) onError.accept(new RuntimeException("LLM 路由全部失败"));
|
return;
|
}
|
|
ResolvedRoute route = routes.get(index);
|
ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, true);
|
long start = System.currentTimeMillis();
|
StringBuilder outputBuffer = new StringBuilder();
|
|
AtomicBoolean doneSeen = new AtomicBoolean(false);
|
AtomicBoolean errorSeen = new AtomicBoolean(false);
|
AtomicBoolean emitted = new AtomicBoolean(false);
|
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
|
|
Thread drain = new Thread(() -> {
|
try {
|
while (true) {
|
String s = queue.poll(2, TimeUnit.SECONDS);
|
if (s != null) {
|
emitted.set(true);
|
try {
|
onChunk.accept(s);
|
} catch (Exception ignore) {
|
}
|
}
|
if (doneSeen.get() && queue.isEmpty()) {
|
if (!errorSeen.get()) {
|
try {
|
if (onComplete != null) onComplete.run();
|
} catch (Exception ignore) {
|
}
|
}
|
break;
|
}
|
}
|
} catch (InterruptedException ignore) {
|
}
|
});
|
drain.setDaemon(true);
|
drain.start();
|
|
Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq);
|
streamSource.subscribe(payload -> {
|
if (payload == null || payload.isEmpty()) return;
|
queue.offer(payload);
|
appendLimited(outputBuffer, payload);
|
}, err -> {
|
errorSeen.set(true);
|
doneSeen.set(true);
|
boolean quota = isQuotaExhausted(err);
|
boolean canSwitch = shouldSwitch(route, quota);
|
markFailure(route, err, canSwitch);
|
recordCall(traceId, scene, true, index + 1, route, false, statusCodeOf(err),
|
System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
|
quota ? "quota" : "error", err, "emitted=" + emitted.get());
|
if (!emitted.get() && canSwitch && index < routes.size() - 1) {
|
log.warn("LLM 路由失败,自动切换,current={}, reason={}", route.tag(), errorText(err));
|
attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
|
return;
|
}
|
if (onError != null) onError.accept(err);
|
}, () -> {
|
markSuccess(route);
|
recordCall(traceId, scene, true, index + 1, route, true, 200,
|
System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
|
"none", null, null);
|
doneSeen.set(true);
|
});
|
}
|
|
private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
|
return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req)
|
.doOnError(ex -> log.error("调用 Spring AI 流式失败, route={}", route.tag(), ex));
|
}
|
|
private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
|
return callCompletionWithSpringAi(route, req);
|
}
|
|
private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
|
LlmSpringAiClientService.CompletionCallResult result =
|
llmSpringAiClientService.callCompletion(route.baseUrl, route.apiKey, req);
|
return new CompletionCallResult(result.getStatusCode(), result.getPayload(), result.getResponse());
|
}
|
|
private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
|
req.setModel(route.model);
|
req.setStream(stream);
|
if (route.thinkingEnabled) {
|
ChatCompletionRequest.Thinking t = new ChatCompletionRequest.Thinking();
|
t.setType("enable");
|
req.setThinking(t);
|
} else {
|
req.setThinking(null);
|
}
|
return req;
|
}
|
|
private ChatCompletionRequest cloneRequest(ChatCompletionRequest src) {
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setModel(src.getModel());
|
req.setMessages(src.getMessages());
|
req.setTemperature(src.getTemperature());
|
req.setMax_tokens(src.getMax_tokens());
|
req.setStream(src.getStream());
|
req.setTools(src.getTools());
|
req.setTool_choice(src.getTool_choice());
|
req.setThinking(src.getThinking());
|
return req;
|
}
|
|
private boolean isValidCompletion(ChatCompletionResponse response) {
|
if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) {
|
return false;
|
}
|
ChatCompletionRequest.Message message = response.getChoices().get(0).getMessage();
|
if (message == null) {
|
return false;
|
}
|
if (!isBlank(message.getContent())) {
|
return true;
|
}
|
return message.getTool_calls() != null && !message.getTool_calls().isEmpty();
|
}
|
|
private boolean shouldSwitch(ResolvedRoute route, boolean quota) {
|
return quota ? route.switchOnQuota : route.switchOnError;
|
}
|
|
private void markSuccess(ResolvedRoute route) {
|
if (route.id != null) {
|
llmRoutingService.markSuccess(route.id);
|
}
|
}
|
|
private void markFailure(ResolvedRoute route, Throwable ex, boolean enterCooldown) {
|
if (route.id != null) {
|
llmRoutingService.markFailure(route.id, errorText(ex), enterCooldown, route.cooldownSeconds);
|
}
|
}
|
|
private String errorText(Throwable ex) {
|
if (ex == null) return "unknown";
|
if (ex instanceof RestClientResponseException) {
|
RestClientResponseException e = (RestClientResponseException) ex;
|
String body = e.getResponseBodyAsString();
|
if (body != null && body.length() > 240) {
|
body = body.substring(0, 240);
|
}
|
return "status=" + e.getStatusCode().value() + ", body=" + body;
|
}
|
if (ex instanceof WebClientResponseException) {
|
WebClientResponseException e = (WebClientResponseException) ex;
|
String body = e.getResponseBodyAsString();
|
if (body != null && body.length() > 240) {
|
body = body.substring(0, 240);
|
}
|
return "status=" + e.getStatusCode().value() + ", body=" + body;
|
}
|
Integer springAiStatus = llmSpringAiClientService.statusCodeOf(ex);
|
if (springAiStatus != null) {
|
return "status=" + springAiStatus + ", body=" + llmSpringAiClientService.responseBodyOf(ex, 240);
|
}
|
return ex.getMessage() == null ? ex.toString() : ex.getMessage();
|
}
|
|
private boolean isQuotaExhausted(Throwable ex) {
|
Integer status = statusCodeOf(ex);
|
if (status != null && status == 429) {
|
return true;
|
}
|
String text = responseBodyOf(ex);
|
text = text == null ? "" : text.toLowerCase(Locale.ROOT);
|
return text.contains("insufficient_quota")
|
|| text.contains("quota")
|
|| text.contains("余额")
|
|| text.contains("用量")
|
|| text.contains("超限")
|
|| text.contains("rate limit");
|
}
|
|
private List<ResolvedRoute> resolveRoutes() {
|
List<ResolvedRoute> routes = new ArrayList<>();
|
List<LlmRouteConfig> dbRoutes = llmRoutingService.listAvailableRoutes();
|
for (LlmRouteConfig c : dbRoutes) {
|
routes.add(ResolvedRoute.fromDb(c));
|
}
|
// 兼容:数据库为空时,回退到 yml
|
if (routes.isEmpty() && !isBlank(fallbackBaseUrl) && !isBlank(fallbackApiKey) && !isBlank(fallbackModel)) {
|
routes.add(ResolvedRoute.fromFallback(fallbackBaseUrl, fallbackApiKey, fallbackModel, isFallbackThinkingEnabled()));
|
}
|
return routes;
|
}
|
|
private boolean isFallbackThinkingEnabled() {
|
String x = fallbackThinking == null ? "" : fallbackThinking.trim().toLowerCase();
|
return "true".equals(x) || "1".equals(x) || "enable".equals(x);
|
}
|
|
private boolean isBlank(String s) {
|
return s == null || s.trim().isEmpty();
|
}
|
|
private String nextTraceId() {
|
return UUID.randomUUID().toString().replace("-", "");
|
}
|
|
private void appendLimited(StringBuilder sb, String text) {
|
if (sb == null || text == null || text.isEmpty()) {
|
return;
|
}
|
int remain = LOG_TEXT_LIMIT - sb.length();
|
if (remain <= 0) {
|
return;
|
}
|
if (text.length() <= remain) {
|
sb.append(text);
|
} else {
|
sb.append(text, 0, remain);
|
}
|
}
|
|
private Integer statusCodeOf(Throwable ex) {
|
if (ex instanceof RestClientResponseException) {
|
return ((RestClientResponseException) ex).getStatusCode().value();
|
}
|
if (ex instanceof WebClientResponseException) {
|
return ((WebClientResponseException) ex).getStatusCode().value();
|
}
|
return llmSpringAiClientService.statusCodeOf(ex);
|
}
|
|
private String responseBodyOf(Throwable ex) {
|
if (ex instanceof RestClientResponseException) {
|
return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
|
}
|
if (ex instanceof WebClientResponseException) {
|
return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
|
}
|
return cut(llmSpringAiClientService.responseBodyOf(ex, LOG_TEXT_LIMIT), LOG_TEXT_LIMIT);
|
}
|
|
private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) {
|
if (resp != null && resp.getChoices() != null && !resp.getChoices().isEmpty()
|
&& resp.getChoices().get(0) != null && resp.getChoices().get(0).getMessage() != null) {
|
ChatCompletionRequest.Message m = resp.getChoices().get(0).getMessage();
|
if (!isBlank(m.getContent())) {
|
return cut(m.getContent(), LOG_TEXT_LIMIT);
|
}
|
if (m.getTool_calls() != null && !m.getTool_calls().isEmpty()) {
|
return cut(JSON.toJSONString(m), LOG_TEXT_LIMIT);
|
}
|
}
|
return cut(fallbackPayload, LOG_TEXT_LIMIT);
|
}
|
|
private String safeName(Throwable ex) {
|
return ex == null ? null : ex.getClass().getSimpleName();
|
}
|
|
private String cut(String text, int maxLen) {
|
if (text == null) return null;
|
String clean = text.replace("\r", " ");
|
return clean.length() > maxLen ? clean.substring(0, maxLen) : clean;
|
}
|
|
private void recordCall(String traceId,
|
String scene,
|
boolean stream,
|
int attemptNo,
|
ResolvedRoute route,
|
boolean success,
|
Integer httpStatus,
|
long latencyMs,
|
ChatCompletionRequest req,
|
String response,
|
String switchMode,
|
Throwable err,
|
String extra) {
|
LlmCallLog item = new LlmCallLog();
|
item.setTraceId(cut(traceId, 64));
|
item.setScene(cut(scene, 64));
|
item.setStream((short) (stream ? 1 : 0));
|
item.setAttemptNo(attemptNo);
|
if (route != null) {
|
item.setRouteId(route.id);
|
item.setRouteName(cut(route.name, 128));
|
item.setBaseUrl(cut(route.baseUrl, 255));
|
item.setModel(cut(route.model, 128));
|
}
|
item.setSuccess((short) (success ? 1 : 0));
|
item.setHttpStatus(httpStatus);
|
item.setLatencyMs(latencyMs < 0 ? 0 : latencyMs);
|
item.setSwitchMode(cut(switchMode, 32));
|
item.setRequestContent(cut(JSON.toJSONString(req), LOG_TEXT_LIMIT));
|
item.setResponseContent(cut(response, LOG_TEXT_LIMIT));
|
item.setErrorType(cut(safeName(err), 128));
|
item.setErrorMessage(err == null ? null : cut(errorText(err), 1024));
|
item.setExtra(cut(extra, 512));
|
item.setCreateTime(new Date());
|
llmCallLogService.saveIgnoreError(item);
|
}
|
|
private static class CompletionCallResult {
|
private final int statusCode;
|
private final String payload;
|
private final ChatCompletionResponse response;
|
|
private CompletionCallResult(int statusCode, String payload, ChatCompletionResponse response) {
|
this.statusCode = statusCode;
|
this.payload = payload;
|
this.response = response;
|
}
|
}
|
|
private static class ResolvedRoute {
|
private Long id;
|
private String name;
|
private String baseUrl;
|
private String apiKey;
|
private String model;
|
private boolean thinkingEnabled;
|
private boolean switchOnQuota;
|
private boolean switchOnError;
|
private Integer cooldownSeconds;
|
|
private static ResolvedRoute fromDb(LlmRouteConfig c) {
|
ResolvedRoute r = new ResolvedRoute();
|
r.id = c.getId();
|
r.name = c.getName();
|
r.baseUrl = c.getBaseUrl();
|
r.apiKey = c.getApiKey();
|
r.model = c.getModel();
|
r.thinkingEnabled = c.getThinking() != null && c.getThinking() == 1;
|
r.switchOnQuota = c.getSwitchOnQuota() == null || c.getSwitchOnQuota() == 1;
|
r.switchOnError = c.getSwitchOnError() == null || c.getSwitchOnError() == 1;
|
r.cooldownSeconds = c.getCooldownSeconds();
|
return r;
|
}
|
|
private static ResolvedRoute fromFallback(String baseUrl, String apiKey, String model, boolean thinkingEnabled) {
|
ResolvedRoute r = new ResolvedRoute();
|
r.name = "fallback-yml";
|
r.baseUrl = baseUrl;
|
r.apiKey = apiKey;
|
r.model = model;
|
r.thinkingEnabled = thinkingEnabled;
|
r.switchOnQuota = true;
|
r.switchOnError = true;
|
r.cooldownSeconds = 300;
|
return r;
|
}
|
|
private String tag() {
|
String showName = name == null ? "unnamed" : name;
|
String showModel = model == null ? "" : (" model=" + model);
|
return showName + showModel;
|
}
|
}
|
}
|