package com.zy.ai.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.zy.ai.entity.ChatCompletionRequest; import com.zy.ai.entity.ChatCompletionResponse; import com.zy.ai.entity.LlmCallLog; import com.zy.ai.entity.LlmRouteConfig; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @Slf4j @Service @RequiredArgsConstructor public class LlmChatService { private static final int LOG_TEXT_LIMIT = 16000; private final LlmRoutingService llmRoutingService; private final LlmCallLogService llmCallLogService; @Value("${llm.base-url:}") private String fallbackBaseUrl; @Value("${llm.api-key:}") private String fallbackApiKey; @Value("${llm.model:}") private String fallbackModel; @Value("${llm.thinking:false}") private String fallbackThinking; /** * 通用对话方法:传入 messages,返回大模型文本回复 */ public String chat(List messages, Double temperature, Integer maxTokens) { ChatCompletionRequest req = new ChatCompletionRequest(); req.setMessages(messages); req.setTemperature(temperature != null ? temperature : 0.3); req.setMax_tokens(maxTokens != null ? maxTokens : 1024); req.setStream(false); ChatCompletionResponse response = complete(req, "chat"); if (response == null || response.getChoices() == null || response.getChoices().isEmpty() || response.getChoices().get(0).getMessage() == null || response.getChoices().get(0).getMessage().getContent() == null || response.getChoices().get(0).getMessage().getContent().isEmpty()) { return null; } return response.getChoices().get(0).getMessage().getContent(); } public ChatCompletionResponse chatCompletion(List messages, Double temperature, Integer maxTokens, List tools) { ChatCompletionRequest req = new ChatCompletionRequest(); req.setMessages(messages); req.setTemperature(temperature != null ? temperature : 0.3); req.setMax_tokens(maxTokens != null ? maxTokens : 1024); req.setStream(false); if (tools != null && !tools.isEmpty()) { req.setTools(tools); req.setTool_choice("auto"); } return complete(req, tools != null && !tools.isEmpty() ? "chat_completion_tools" : "chat_completion"); } public ChatCompletionResponse complete(ChatCompletionRequest req) { return complete(req, "completion"); } private ChatCompletionResponse complete(ChatCompletionRequest req, String scene) { String traceId = nextTraceId(); List routes = resolveRoutes(); if (routes.isEmpty()) { log.error("调用 LLM 失败: 未配置可用 LLM 路由"); recordCall(traceId, scene, false, 1, null, false, null, 0L, req, null, "none", new RuntimeException("未配置可用 LLM 路由"), "no_route"); return null; } Throwable last = null; for (int i = 0; i < routes.size(); i++) { ResolvedRoute route = routes.get(i); boolean hasNext = i < routes.size() - 1; ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, false); long start = System.currentTimeMillis(); try { CompletionCallResult callResult = callCompletion(route, routeReq); ChatCompletionResponse resp = callResult.response; if (!isValidCompletion(resp)) { RuntimeException ex = new RuntimeException("LLM 响应为空"); boolean canSwitch = shouldSwitch(route, false); markFailure(route, ex, canSwitch); recordCall(traceId, scene, false, i + 1, route, false, callResult.statusCode, System.currentTimeMillis() - start, routeReq, callResult.payload, "error", ex, "invalid_completion"); if (hasNext && canSwitch) { log.warn("LLM 切换到下一路由, current={}, reason={}", route.tag(), ex.getMessage()); continue; } log.error("调用 LLM 失败, route={}", route.tag(), ex); last = ex; break; } markSuccess(route); recordCall(traceId, scene, false, i + 1, route, true, callResult.statusCode, System.currentTimeMillis() - start, routeReq, buildResponseText(resp, callResult.payload), "none", null, null); return resp; } catch (Throwable ex) { last = ex; boolean quota = isQuotaExhausted(ex); boolean canSwitch = shouldSwitch(route, quota); markFailure(route, ex, canSwitch); recordCall(traceId, scene, false, i + 1, route, false, statusCodeOf(ex), System.currentTimeMillis() - start, routeReq, responseBodyOf(ex), quota ? "quota" : "error", ex, null); if (hasNext && canSwitch) { log.warn("LLM 切换到下一路由, current={}, reason={}", route.tag(), errorText(ex)); continue; } log.error("调用 LLM 失败, route={}", route.tag(), ex); break; } } if (last != null) { log.error("调用 LLM 全部路由失败: {}", errorText(last)); } return null; } public void chatStream(List messages, Double temperature, Integer maxTokens, Consumer onChunk, Runnable onComplete, Consumer onError) { ChatCompletionRequest req = new ChatCompletionRequest(); req.setMessages(messages); req.setTemperature(temperature != null ? temperature : 0.3); req.setMax_tokens(maxTokens != null ? maxTokens : 1024); req.setStream(true); streamWithFailover(req, onChunk, onComplete, onError, "chat_stream"); } public void chatStreamWithTools(List messages, Double temperature, Integer maxTokens, List tools, Consumer onChunk, Runnable onComplete, Consumer onError) { ChatCompletionRequest req = new ChatCompletionRequest(); req.setMessages(messages); req.setTemperature(temperature != null ? temperature : 0.3); req.setMax_tokens(maxTokens != null ? maxTokens : 1024); req.setStream(true); if (tools != null && !tools.isEmpty()) { req.setTools(tools); req.setTool_choice("auto"); } streamWithFailover(req, onChunk, onComplete, onError, tools != null && !tools.isEmpty() ? "chat_stream_tools" : "chat_stream"); } private void streamWithFailover(ChatCompletionRequest req, Consumer onChunk, Runnable onComplete, Consumer onError, String scene) { String traceId = nextTraceId(); List routes = resolveRoutes(); if (routes.isEmpty()) { recordCall(traceId, scene, true, 1, null, false, null, 0L, req, null, "none", new RuntimeException("未配置可用 LLM 路由"), "no_route"); if (onError != null) onError.accept(new RuntimeException("未配置可用 LLM 路由")); return; } attemptStream(routes, 0, req, onChunk, onComplete, onError, traceId, scene); } private void attemptStream(List routes, int index, ChatCompletionRequest req, Consumer onChunk, Runnable onComplete, Consumer onError, String traceId, String scene) { if (index >= routes.size()) { if (onError != null) onError.accept(new RuntimeException("LLM 路由全部失败")); return; } ResolvedRoute route = routes.get(index); ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, true); long start = System.currentTimeMillis(); StringBuilder outputBuffer = new StringBuilder(); AtomicBoolean doneSeen = new AtomicBoolean(false); AtomicBoolean errorSeen = new AtomicBoolean(false); AtomicBoolean emitted = new AtomicBoolean(false); LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); Thread drain = new Thread(() -> { try { while (true) { String s = queue.poll(2, TimeUnit.SECONDS); if (s != null) { emitted.set(true); try { onChunk.accept(s); } catch (Exception ignore) { } } if (doneSeen.get() && queue.isEmpty()) { if (!errorSeen.get()) { try { if (onComplete != null) onComplete.run(); } catch (Exception ignore) { } } break; } } } catch (InterruptedException ignore) { } }); drain.setDaemon(true); drain.start(); streamFlux(route, routeReq).subscribe(payload -> { if (payload == null || payload.isEmpty()) return; String[] events = payload.split("\\r?\\n\\r?\\n"); for (String part : events) { String s = part; if (s == null || s.isEmpty()) continue; if (s.startsWith("data:")) { s = s.substring(5); if (s.startsWith(" ")) s = s.substring(1); } if ("[DONE]".equals(s.trim())) { doneSeen.set(true); continue; } try { JSONObject obj = JSON.parseObject(s); JSONArray choices = obj.getJSONArray("choices"); if (choices != null && !choices.isEmpty()) { JSONObject c0 = choices.getJSONObject(0); JSONObject delta = c0.getJSONObject("delta"); if (delta != null) { String content = delta.getString("content"); if (content != null) { queue.offer(content); appendLimited(outputBuffer, content); } } } } catch (Exception e) { log.warn("解析 LLM stream 片段失败: {}", e.getMessage()); } } }, err -> { errorSeen.set(true); doneSeen.set(true); boolean quota = isQuotaExhausted(err); boolean canSwitch = shouldSwitch(route, quota); markFailure(route, err, canSwitch); recordCall(traceId, scene, true, index + 1, route, false, statusCodeOf(err), System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), quota ? "quota" : "error", err, "emitted=" + emitted.get()); if (!emitted.get() && canSwitch && index < routes.size() - 1) { log.warn("LLM 路由失败,自动切换,current={}, reason={}", route.tag(), errorText(err)); attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene); return; } if (onError != null) onError.accept(err); }, () -> { if (!doneSeen.get()) { RuntimeException ex = new RuntimeException("LLM 流意外完成"); errorSeen.set(true); doneSeen.set(true); boolean canSwitch = shouldSwitch(route, false); markFailure(route, ex, canSwitch); recordCall(traceId, scene, true, index + 1, route, false, 200, System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), "error", ex, "unexpected_stream_end"); if (!emitted.get() && canSwitch && index < routes.size() - 1) { log.warn("LLM 路由流异常完成,自动切换,current={}", route.tag()); attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene); } else { if (onError != null) onError.accept(ex); } } else { markSuccess(route); recordCall(traceId, scene, true, index + 1, route, true, 200, System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), "none", null, null); doneSeen.set(true); } }); } private Flux streamFlux(ResolvedRoute route, ChatCompletionRequest req) { WebClient client = WebClient.builder().baseUrl(route.baseUrl).build(); return client.post() .uri("/chat/completions") .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey) .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.TEXT_EVENT_STREAM) .bodyValue(req) .exchangeToFlux(resp -> { int status = resp.rawStatusCode(); if (status >= 200 && status < 300) { return resp.bodyToFlux(String.class); } return resp.bodyToMono(String.class) .defaultIfEmpty("") .flatMapMany(body -> Flux.error(new LlmRouteException(status, body))); }) .doOnError(ex -> log.error("调用 LLM 流式失败, route={}", route.tag(), ex)); } private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) { WebClient client = WebClient.builder().baseUrl(route.baseUrl).build(); RawCompletionResult raw = client.post() .uri("/chat/completions") .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey) .contentType(MediaType.APPLICATION_JSON) .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) .bodyValue(req) .exchangeToMono(resp -> resp.bodyToFlux(String.class) .collectList() .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list)))) .block(); if (raw == null) { throw new RuntimeException("LLM 返回为空"); } if (raw.statusCode < 200 || raw.statusCode >= 300) { throw new LlmRouteException(raw.statusCode, raw.payload); } return new CompletionCallResult(raw.statusCode, raw.payload, parseCompletion(raw.payload)); } private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) { req.setModel(route.model); req.setStream(stream); if (route.thinkingEnabled) { ChatCompletionRequest.Thinking t = new ChatCompletionRequest.Thinking(); t.setType("enable"); req.setThinking(t); } else { req.setThinking(null); } return req; } private ChatCompletionRequest cloneRequest(ChatCompletionRequest src) { ChatCompletionRequest req = new ChatCompletionRequest(); req.setModel(src.getModel()); req.setMessages(src.getMessages()); req.setTemperature(src.getTemperature()); req.setMax_tokens(src.getMax_tokens()); req.setStream(src.getStream()); req.setTools(src.getTools()); req.setTool_choice(src.getTool_choice()); req.setThinking(src.getThinking()); return req; } private boolean isValidCompletion(ChatCompletionResponse response) { if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) { return false; } ChatCompletionRequest.Message message = response.getChoices().get(0).getMessage(); if (message == null) { return false; } if (!isBlank(message.getContent())) { return true; } return message.getTool_calls() != null && !message.getTool_calls().isEmpty(); } private boolean shouldSwitch(ResolvedRoute route, boolean quota) { return quota ? route.switchOnQuota : route.switchOnError; } private void markSuccess(ResolvedRoute route) { if (route.id != null) { llmRoutingService.markSuccess(route.id); } } private void markFailure(ResolvedRoute route, Throwable ex, boolean enterCooldown) { if (route.id != null) { llmRoutingService.markFailure(route.id, errorText(ex), enterCooldown, route.cooldownSeconds); } } private String errorText(Throwable ex) { if (ex == null) return "unknown"; if (ex instanceof LlmRouteException) { LlmRouteException e = (LlmRouteException) ex; String body = e.body == null ? "" : e.body; if (body.length() > 240) { body = body.substring(0, 240); } return "status=" + e.statusCode + ", body=" + body; } return ex.getMessage() == null ? ex.toString() : ex.getMessage(); } private boolean isQuotaExhausted(Throwable ex) { if (!(ex instanceof LlmRouteException)) return false; LlmRouteException e = (LlmRouteException) ex; if (e.statusCode == 429) return true; String text = (e.body == null ? "" : e.body).toLowerCase(); return text.contains("insufficient_quota") || text.contains("quota") || text.contains("余额") || text.contains("用量") || text.contains("超限") || text.contains("rate limit"); } private List resolveRoutes() { List routes = new ArrayList<>(); List dbRoutes = llmRoutingService.listAvailableRoutes(); for (LlmRouteConfig c : dbRoutes) { routes.add(ResolvedRoute.fromDb(c)); } // 兼容:数据库为空时,回退到 yml if (routes.isEmpty() && !isBlank(fallbackBaseUrl) && !isBlank(fallbackApiKey) && !isBlank(fallbackModel)) { routes.add(ResolvedRoute.fromFallback(fallbackBaseUrl, fallbackApiKey, fallbackModel, isFallbackThinkingEnabled())); } return routes; } private boolean isFallbackThinkingEnabled() { String x = fallbackThinking == null ? "" : fallbackThinking.trim().toLowerCase(); return "true".equals(x) || "1".equals(x) || "enable".equals(x); } private boolean isBlank(String s) { return s == null || s.trim().isEmpty(); } private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) { if (payload == null || payload.isEmpty()) return acc; String[] events = payload.split("\\r?\\n\\r?\\n"); for (String part : events) { String s = part; if (s == null || s.isEmpty()) continue; if (s.startsWith("data:")) { s = s.substring(5); if (s.startsWith(" ")) s = s.substring(1); } if ("[DONE]".equals(s.trim())) { continue; } try { JSONObject obj = JSON.parseObject(s); if (obj == null) continue; JSONArray choices = obj.getJSONArray("choices"); if (choices != null && !choices.isEmpty()) { JSONObject c0 = choices.getJSONObject(0); if (acc.getChoices() == null || acc.getChoices().isEmpty()) { ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice(); ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message(); choice.setMessage(msg); ArrayList list = new ArrayList<>(); list.add(choice); acc.setChoices(list); } ChatCompletionResponse.Choice choice = acc.getChoices().get(0); ChatCompletionRequest.Message msg = choice.getMessage(); if (msg.getRole() == null || msg.getRole().isEmpty()) { msg.setRole("assistant"); } JSONObject delta = c0.getJSONObject("delta"); if (delta != null) { String c = delta.getString("content"); if (c != null) { String prev = msg.getContent(); msg.setContent(prev == null ? c : prev + c); } String role = delta.getString("role"); if (role != null && !role.isEmpty()) msg.setRole(role); } JSONObject message = c0.getJSONObject("message"); if (message != null) { String c = message.getString("content"); if (c != null) { String prev = msg.getContent(); msg.setContent(prev == null ? c : prev + c); } String role = message.getString("role"); if (role != null && !role.isEmpty()) msg.setRole(role); } String fr = c0.getString("finish_reason"); if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr); } String id = obj.getString("id"); if (id != null && !id.isEmpty()) acc.setId(id); Long created = obj.getLong("created"); if (created != null) acc.setCreated(created); String object = obj.getString("object"); if (object != null && !object.isEmpty()) acc.setObjectName(object); } catch (Exception ignore) { } } return acc; } private ChatCompletionResponse parseCompletion(String payload) { if (payload == null) return null; try { ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class); if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) { return r; } } catch (Exception ignore) { } ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload); if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) { return sse; } ChatCompletionResponse r = new ChatCompletionResponse(); ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice(); ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message(); msg.setRole("assistant"); msg.setContent(payload); choice.setMessage(msg); ArrayList list = new ArrayList<>(); list.add(choice); r.setChoices(list); return r; } private String nextTraceId() { return UUID.randomUUID().toString().replace("-", ""); } private void appendLimited(StringBuilder sb, String text) { if (sb == null || text == null || text.isEmpty()) { return; } int remain = LOG_TEXT_LIMIT - sb.length(); if (remain <= 0) { return; } if (text.length() <= remain) { sb.append(text); } else { sb.append(text, 0, remain); } } private Integer statusCodeOf(Throwable ex) { if (ex instanceof LlmRouteException) { return ((LlmRouteException) ex).statusCode; } return null; } private String responseBodyOf(Throwable ex) { if (ex instanceof LlmRouteException) { return cut(((LlmRouteException) ex).body, LOG_TEXT_LIMIT); } return null; } private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) { if (resp != null && resp.getChoices() != null && !resp.getChoices().isEmpty() && resp.getChoices().get(0) != null && resp.getChoices().get(0).getMessage() != null) { ChatCompletionRequest.Message m = resp.getChoices().get(0).getMessage(); if (!isBlank(m.getContent())) { return cut(m.getContent(), LOG_TEXT_LIMIT); } if (m.getTool_calls() != null && !m.getTool_calls().isEmpty()) { return cut(JSON.toJSONString(m), LOG_TEXT_LIMIT); } } return cut(fallbackPayload, LOG_TEXT_LIMIT); } private String safeName(Throwable ex) { return ex == null ? null : ex.getClass().getSimpleName(); } private String cut(String text, int maxLen) { if (text == null) return null; String clean = text.replace("\r", " "); return clean.length() > maxLen ? clean.substring(0, maxLen) : clean; } private void recordCall(String traceId, String scene, boolean stream, int attemptNo, ResolvedRoute route, boolean success, Integer httpStatus, long latencyMs, ChatCompletionRequest req, String response, String switchMode, Throwable err, String extra) { LlmCallLog item = new LlmCallLog(); item.setTraceId(cut(traceId, 64)); item.setScene(cut(scene, 64)); item.setStream((short) (stream ? 1 : 0)); item.setAttemptNo(attemptNo); if (route != null) { item.setRouteId(route.id); item.setRouteName(cut(route.name, 128)); item.setBaseUrl(cut(route.baseUrl, 255)); item.setModel(cut(route.model, 128)); } item.setSuccess((short) (success ? 1 : 0)); item.setHttpStatus(httpStatus); item.setLatencyMs(latencyMs < 0 ? 0 : latencyMs); item.setSwitchMode(cut(switchMode, 32)); item.setRequestContent(cut(JSON.toJSONString(req), LOG_TEXT_LIMIT)); item.setResponseContent(cut(response, LOG_TEXT_LIMIT)); item.setErrorType(cut(safeName(err), 128)); item.setErrorMessage(err == null ? null : cut(errorText(err), 1024)); item.setExtra(cut(extra, 512)); item.setCreateTime(new Date()); llmCallLogService.saveIgnoreError(item); } private static class CompletionCallResult { private final int statusCode; private final String payload; private final ChatCompletionResponse response; private CompletionCallResult(int statusCode, String payload, ChatCompletionResponse response) { this.statusCode = statusCode; this.payload = payload; this.response = response; } } private static class RawCompletionResult { private final int statusCode; private final String payload; private RawCompletionResult(int statusCode, String payload) { this.statusCode = statusCode; this.payload = payload; } } private static class LlmRouteException extends RuntimeException { private final int statusCode; private final String body; private LlmRouteException(int statusCode, String body) { super("http status=" + statusCode); this.statusCode = statusCode; this.body = body; } } private static class ResolvedRoute { private Long id; private String name; private String baseUrl; private String apiKey; private String model; private boolean thinkingEnabled; private boolean switchOnQuota; private boolean switchOnError; private Integer cooldownSeconds; private static ResolvedRoute fromDb(LlmRouteConfig c) { ResolvedRoute r = new ResolvedRoute(); r.id = c.getId(); r.name = c.getName(); r.baseUrl = c.getBaseUrl(); r.apiKey = c.getApiKey(); r.model = c.getModel(); r.thinkingEnabled = c.getThinking() != null && c.getThinking() == 1; r.switchOnQuota = c.getSwitchOnQuota() == null || c.getSwitchOnQuota() == 1; r.switchOnError = c.getSwitchOnError() == null || c.getSwitchOnError() == 1; r.cooldownSeconds = c.getCooldownSeconds(); return r; } private static ResolvedRoute fromFallback(String baseUrl, String apiKey, String model, boolean thinkingEnabled) { ResolvedRoute r = new ResolvedRoute(); r.name = "fallback-yml"; r.baseUrl = baseUrl; r.apiKey = apiKey; r.model = model; r.thinkingEnabled = thinkingEnabled; r.switchOnQuota = true; r.switchOnError = true; r.cooldownSeconds = 300; return r; } private String tag() { String showName = name == null ? "unnamed" : name; String showModel = model == null ? "" : (" model=" + model); return showName + showModel; } } }