| | |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.function.Consumer; |
| | | |
| | | @Slf4j |
| | |
| | | List<ResolvedRoute> routes = resolveRoutes(); |
| | | if (routes.isEmpty()) { |
| | | log.error("调用 LLM 失败: 未配置可用 LLM 路由"); |
| | | recordCall(traceId, scene, false, 1, null, false, null, 0L, req, null, "none", |
| | | recordCall(traceId, scene, false, 1, null, false, null, 0L, req, null, null, "none", |
| | | new RuntimeException("未配置可用 LLM 路由"), "no_route"); |
| | | return null; |
| | | } |
| | |
| | | boolean canSwitch = shouldSwitch(route, false); |
| | | markFailure(route, ex, canSwitch); |
| | | recordCall(traceId, scene, false, i + 1, route, false, callResult.statusCode, |
| | | System.currentTimeMillis() - start, routeReq, callResult.payload, "error", ex, |
| | | System.currentTimeMillis() - start, routeReq, resp, callResult.payload, "error", ex, |
| | | "invalid_completion"); |
| | | if (hasNext && canSwitch) { |
| | | log.warn("LLM 切换到下一路由, current={}, reason={}", route.tag(), ex.getMessage()); |
| | |
| | | } |
| | | markSuccess(route); |
| | | recordCall(traceId, scene, false, i + 1, route, true, callResult.statusCode, |
| | | System.currentTimeMillis() - start, routeReq, buildResponseText(resp, callResult.payload), |
| | | System.currentTimeMillis() - start, routeReq, resp, buildResponseText(resp, callResult.payload), |
| | | "none", null, null); |
| | | return resp; |
| | | } catch (Throwable ex) { |
| | |
| | | boolean canSwitch = shouldSwitch(route, quota); |
| | | markFailure(route, ex, canSwitch); |
| | | recordCall(traceId, scene, false, i + 1, route, false, statusCodeOf(ex), |
| | | System.currentTimeMillis() - start, routeReq, responseBodyOf(ex), |
| | | System.currentTimeMillis() - start, routeReq, null, responseBodyOf(ex), |
| | | quota ? "quota" : "error", ex, null); |
| | | if (hasNext && canSwitch) { |
| | | log.warn("LLM 切换到下一路由, current={}, reason={}", route.tag(), errorText(ex)); |
| | |
| | | req.setMax_tokens(maxTokens != null ? maxTokens : 1024); |
| | | req.setStream(true); |
| | | |
| | | streamWithFailover(req, onChunk, onComplete, onError, "chat_stream"); |
| | | streamWithFailover(req, onChunk, onComplete, onError, null, "chat_stream"); |
| | | } |
| | | |
| | | public void chatStreamWithTools(List<ChatCompletionRequest.Message> messages, |
| | |
| | | List<Object> tools, |
| | | Consumer<String> onChunk, |
| | | Runnable onComplete, |
| | | Consumer<Throwable> onError) { |
| | | Consumer<Throwable> onError, |
| | | Consumer<ChatCompletionResponse.Usage> onUsage) { |
| | | ChatCompletionRequest req = new ChatCompletionRequest(); |
| | | req.setMessages(messages); |
| | | req.setTemperature(temperature != null ? temperature : 0.3); |
| | |
| | | req.setTools(tools); |
| | | req.setTool_choice("auto"); |
| | | } |
| | | streamWithFailover(req, onChunk, onComplete, onError, tools != null && !tools.isEmpty() ? "chat_stream_tools" : "chat_stream"); |
| | | streamWithFailover(req, onChunk, onComplete, onError, onUsage, tools != null && !tools.isEmpty() ? "chat_stream_tools" : "chat_stream"); |
| | | } |
| | | |
| | | private void streamWithFailover(ChatCompletionRequest req, |
| | | Consumer<String> onChunk, |
| | | Runnable onComplete, |
| | | Consumer<Throwable> onError, |
| | | Consumer<ChatCompletionResponse.Usage> onUsage, |
| | | String scene) { |
| | | String traceId = nextTraceId(); |
| | | List<ResolvedRoute> routes = resolveRoutes(); |
| | | if (routes.isEmpty()) { |
| | | recordCall(traceId, scene, true, 1, null, false, null, 0L, req, null, "none", |
| | | recordCall(traceId, scene, true, 1, null, false, null, 0L, req, null, null, "none", |
| | | new RuntimeException("未配置可用 LLM 路由"), "no_route"); |
| | | if (onError != null) onError.accept(new RuntimeException("未配置可用 LLM 路由")); |
| | | return; |
| | | } |
| | | attemptStream(routes, 0, req, onChunk, onComplete, onError, traceId, scene); |
| | | attemptStream(routes, 0, req, onChunk, onComplete, onError, onUsage, traceId, scene); |
| | | } |
| | | |
| | | private void attemptStream(List<ResolvedRoute> routes, |
| | |
| | | Consumer<String> onChunk, |
| | | Runnable onComplete, |
| | | Consumer<Throwable> onError, |
| | | Consumer<ChatCompletionResponse.Usage> onUsage, |
| | | String traceId, |
| | | String scene) { |
| | | if (index >= routes.size()) { |
| | |
| | | AtomicBoolean doneSeen = new AtomicBoolean(false); |
| | | AtomicBoolean errorSeen = new AtomicBoolean(false); |
| | | AtomicBoolean emitted = new AtomicBoolean(false); |
| | | AtomicReference<ChatCompletionResponse.Usage> usageRef = new AtomicReference<>(); |
| | | LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); |
| | | |
| | | Thread drain = new Thread(() -> { |
| | |
| | | drain.setDaemon(true); |
| | | drain.start(); |
| | | |
| | | Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq); |
| | | Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq, usageRef::set); |
| | | streamSource.subscribe(payload -> { |
| | | if (payload == null || payload.isEmpty()) return; |
| | | queue.offer(payload); |
| | |
| | | boolean canSwitch = shouldSwitch(route, quota); |
| | | markFailure(route, err, canSwitch); |
| | | recordCall(traceId, scene, true, index + 1, route, false, statusCodeOf(err), |
| | | System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), |
| | | System.currentTimeMillis() - start, routeReq, usageResponse(usageRef.get()), outputBuffer.toString(), |
| | | quota ? "quota" : "error", err, "emitted=" + emitted.get()); |
| | | if (!emitted.get() && canSwitch && index < routes.size() - 1) { |
| | | log.warn("LLM 路由失败,自动切换,current={}, reason={}", route.tag(), errorText(err)); |
| | | attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene); |
| | | attemptStream(routes, index + 1, req, onChunk, onComplete, onError, onUsage, traceId, scene); |
| | | return; |
| | | } |
| | | if (onError != null) onError.accept(err); |
| | | }, () -> { |
| | | markSuccess(route); |
| | | if (onUsage != null && usageRef.get() != null) { |
| | | try { |
| | | onUsage.accept(usageRef.get()); |
| | | } catch (Exception ignore) { |
| | | } |
| | | } |
| | | recordCall(traceId, scene, true, index + 1, route, true, 200, |
| | | System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), |
| | | System.currentTimeMillis() - start, routeReq, usageResponse(usageRef.get()), outputBuffer.toString(), |
| | | "none", null, null); |
| | | doneSeen.set(true); |
| | | }); |
| | | } |
| | | |
| | | private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) { |
| | | return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req) |
| | | private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, |
| | | ChatCompletionRequest req, |
| | | Consumer<ChatCompletionResponse.Usage> usageConsumer) { |
| | | return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req, usageConsumer) |
| | | .doOnError(ex -> log.error("调用 Spring AI 流式失败, route={}", route.tag(), ex)); |
| | | } |
| | | |
| | |
| | | Integer httpStatus, |
| | | long latencyMs, |
| | | ChatCompletionRequest req, |
| | | ChatCompletionResponse responseObj, |
| | | String response, |
| | | String switchMode, |
| | | Throwable err, |
| | |
| | | item.setResponseContent(cut(response, LOG_TEXT_LIMIT)); |
| | | item.setErrorType(cut(safeName(err), 128)); |
| | | item.setErrorMessage(err == null ? null : cut(errorText(err), 1024)); |
| | | item.setExtra(cut(extra, 512)); |
| | | item.setExtra(cut(buildExtraPayload(responseObj == null ? null : responseObj.getUsage(), extra), 512)); |
| | | item.setCreateTime(new Date()); |
| | | llmCallLogService.saveIgnoreError(item); |
| | | } |
| | | |
| | | private ChatCompletionResponse usageResponse(ChatCompletionResponse.Usage usage) { |
| | | if (usage == null) { |
| | | return null; |
| | | } |
| | | ChatCompletionResponse response = new ChatCompletionResponse(); |
| | | response.setUsage(usage); |
| | | return response; |
| | | } |
| | | |
| | | private String buildExtraPayload(ChatCompletionResponse.Usage usage, String extra) { |
| | | if (usage == null && isBlank(extra)) { |
| | | return null; |
| | | } |
| | | HashMap<String, Object> payload = new HashMap<>(); |
| | | if (usage != null) { |
| | | if (usage.getPromptTokens() != null) { |
| | | payload.put("promptTokens", usage.getPromptTokens()); |
| | | } |
| | | if (usage.getCompletionTokens() != null) { |
| | | payload.put("completionTokens", usage.getCompletionTokens()); |
| | | } |
| | | if (usage.getTotalTokens() != null) { |
| | | payload.put("totalTokens", usage.getTotalTokens()); |
| | | } |
| | | } |
| | | if (!isBlank(extra)) { |
| | | payload.put("note", extra); |
| | | } |
| | | return payload.isEmpty() ? null : JSON.toJSONString(payload); |
| | | } |
| | | |
| | | private static class CompletionCallResult { |
| | | private final int statusCode; |
| | | private final String payload; |