| | |
| | | package com.zy.ai.service; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONArray; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.zy.ai.entity.ChatCompletionRequest; |
| | | import com.zy.ai.entity.ChatCompletionResponse; |
| | | import com.zy.ai.entity.LlmCallLog; |
| | |
| | | import lombok.RequiredArgsConstructor; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.http.HttpHeaders; |
| | | import org.springframework.http.MediaType; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.web.reactive.function.client.WebClient; |
| | | import org.springframework.web.client.RestClientResponseException; |
| | | import org.springframework.web.reactive.function.client.WebClientResponseException; |
| | | import reactor.core.publisher.Flux; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Locale; |
| | | import java.util.UUID; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | |
| | | |
| | | private final LlmRoutingService llmRoutingService; |
| | | private final LlmCallLogService llmCallLogService; |
| | | private final LlmSpringAiClientService llmSpringAiClientService; |
| | | |
| | | @Value("${llm.base-url:}") |
| | | private String fallbackBaseUrl; |
| | |
| | | drain.setDaemon(true); |
| | | drain.start(); |
| | | |
| | | streamFlux(route, routeReq).subscribe(payload -> { |
| | | Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq); |
| | | streamSource.subscribe(payload -> { |
| | | if (payload == null || payload.isEmpty()) return; |
| | | String[] events = payload.split("\\r?\\n\\r?\\n"); |
| | | for (String part : events) { |
| | | String s = part; |
| | | if (s == null || s.isEmpty()) continue; |
| | | if (s.startsWith("data:")) { |
| | | s = s.substring(5); |
| | | if (s.startsWith(" ")) s = s.substring(1); |
| | | } |
| | | if ("[DONE]".equals(s.trim())) { |
| | | doneSeen.set(true); |
| | | continue; |
| | | } |
| | | try { |
| | | JSONObject obj = JSON.parseObject(s); |
| | | JSONArray choices = obj.getJSONArray("choices"); |
| | | if (choices != null && !choices.isEmpty()) { |
| | | JSONObject c0 = choices.getJSONObject(0); |
| | | JSONObject delta = c0.getJSONObject("delta"); |
| | | if (delta != null) { |
| | | String content = delta.getString("content"); |
| | | if (content != null) { |
| | | queue.offer(content); |
| | | appendLimited(outputBuffer, content); |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.warn("解析 LLM stream 片段失败: {}", e.getMessage()); |
| | | } |
| | | } |
| | | queue.offer(payload); |
| | | appendLimited(outputBuffer, payload); |
| | | }, err -> { |
| | | errorSeen.set(true); |
| | | doneSeen.set(true); |
| | |
| | | } |
| | | if (onError != null) onError.accept(err); |
| | | }, () -> { |
| | | if (!doneSeen.get()) { |
| | | RuntimeException ex = new RuntimeException("LLM 流意外完成"); |
| | | errorSeen.set(true); |
| | | doneSeen.set(true); |
| | | boolean canSwitch = shouldSwitch(route, false); |
| | | markFailure(route, ex, canSwitch); |
| | | recordCall(traceId, scene, true, index + 1, route, false, 200, |
| | | System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), |
| | | "error", ex, "unexpected_stream_end"); |
| | | if (!emitted.get() && canSwitch && index < routes.size() - 1) { |
| | | log.warn("LLM 路由流异常完成,自动切换,current={}", route.tag()); |
| | | attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene); |
| | | } else { |
| | | if (onError != null) onError.accept(ex); |
| | | } |
| | | } else { |
| | | markSuccess(route); |
| | | recordCall(traceId, scene, true, index + 1, route, true, 200, |
| | | System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), |
| | | "none", null, null); |
| | | doneSeen.set(true); |
| | | } |
| | | markSuccess(route); |
| | | recordCall(traceId, scene, true, index + 1, route, true, 200, |
| | | System.currentTimeMillis() - start, routeReq, outputBuffer.toString(), |
| | | "none", null, null); |
| | | doneSeen.set(true); |
| | | }); |
| | | } |
| | | |
| | | private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) { |
| | | WebClient client = WebClient.builder().baseUrl(route.baseUrl).build(); |
| | | return client.post() |
| | | .uri("/chat/completions") |
| | | .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey) |
| | | .contentType(MediaType.APPLICATION_JSON) |
| | | .accept(MediaType.TEXT_EVENT_STREAM) |
| | | .bodyValue(req) |
| | | .exchangeToFlux(resp -> { |
| | | int status = resp.rawStatusCode(); |
| | | if (status >= 200 && status < 300) { |
| | | return resp.bodyToFlux(String.class); |
| | | } |
| | | return resp.bodyToMono(String.class) |
| | | .defaultIfEmpty("") |
| | | .flatMapMany(body -> Flux.error(new LlmRouteException(status, body))); |
| | | }) |
| | | .doOnError(ex -> log.error("调用 LLM 流式失败, route={}", route.tag(), ex)); |
| | | private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) { |
| | | return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req) |
| | | .doOnError(ex -> log.error("调用 Spring AI 流式失败, route={}", route.tag(), ex)); |
| | | } |
| | | |
| | | private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) { |
| | | WebClient client = WebClient.builder().baseUrl(route.baseUrl).build(); |
| | | RawCompletionResult raw = client.post() |
| | | .uri("/chat/completions") |
| | | .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey) |
| | | .contentType(MediaType.APPLICATION_JSON) |
| | | .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) |
| | | .bodyValue(req) |
| | | .exchangeToMono(resp -> resp.bodyToFlux(String.class) |
| | | .collectList() |
| | | .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list)))) |
| | | .block(); |
| | | return callCompletionWithSpringAi(route, req); |
| | | } |
| | | |
| | | if (raw == null) { |
| | | throw new RuntimeException("LLM 返回为空"); |
| | | } |
| | | if (raw.statusCode < 200 || raw.statusCode >= 300) { |
| | | throw new LlmRouteException(raw.statusCode, raw.payload); |
| | | } |
| | | return new CompletionCallResult(raw.statusCode, raw.payload, parseCompletion(raw.payload)); |
| | | private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) { |
| | | LlmSpringAiClientService.CompletionCallResult result = |
| | | llmSpringAiClientService.callCompletion(route.baseUrl, route.apiKey, req); |
| | | return new CompletionCallResult(result.getStatusCode(), result.getPayload(), result.getResponse()); |
| | | } |
| | | |
| | | private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) { |
| | |
| | | |
| | | private String errorText(Throwable ex) { |
| | | if (ex == null) return "unknown"; |
| | | if (ex instanceof LlmRouteException) { |
| | | LlmRouteException e = (LlmRouteException) ex; |
| | | String body = e.body == null ? "" : e.body; |
| | | if (body.length() > 240) { |
| | | if (ex instanceof RestClientResponseException) { |
| | | RestClientResponseException e = (RestClientResponseException) ex; |
| | | String body = e.getResponseBodyAsString(); |
| | | if (body != null && body.length() > 240) { |
| | | body = body.substring(0, 240); |
| | | } |
| | | return "status=" + e.statusCode + ", body=" + body; |
| | | return "status=" + e.getStatusCode().value() + ", body=" + body; |
| | | } |
| | | if (ex instanceof WebClientResponseException) { |
| | | WebClientResponseException e = (WebClientResponseException) ex; |
| | | String body = e.getResponseBodyAsString(); |
| | | if (body != null && body.length() > 240) { |
| | | body = body.substring(0, 240); |
| | | } |
| | | return "status=" + e.getStatusCode().value() + ", body=" + body; |
| | | } |
| | | Integer springAiStatus = llmSpringAiClientService.statusCodeOf(ex); |
| | | if (springAiStatus != null) { |
| | | return "status=" + springAiStatus + ", body=" + llmSpringAiClientService.responseBodyOf(ex, 240); |
| | | } |
| | | return ex.getMessage() == null ? ex.toString() : ex.getMessage(); |
| | | } |
| | | |
| | | private boolean isQuotaExhausted(Throwable ex) { |
| | | if (!(ex instanceof LlmRouteException)) return false; |
| | | LlmRouteException e = (LlmRouteException) ex; |
| | | if (e.statusCode == 429) return true; |
| | | String text = (e.body == null ? "" : e.body).toLowerCase(); |
| | | Integer status = statusCodeOf(ex); |
| | | if (status != null && status == 429) { |
| | | return true; |
| | | } |
| | | String text = responseBodyOf(ex); |
| | | text = text == null ? "" : text.toLowerCase(Locale.ROOT); |
| | | return text.contains("insufficient_quota") |
| | | || text.contains("quota") |
| | | || text.contains("余额") |
| | |
| | | return s == null || s.trim().isEmpty(); |
| | | } |
| | | |
| | | private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) { |
| | | if (payload == null || payload.isEmpty()) return acc; |
| | | String[] events = payload.split("\\r?\\n\\r?\\n"); |
| | | for (String part : events) { |
| | | String s = part; |
| | | if (s == null || s.isEmpty()) continue; |
| | | if (s.startsWith("data:")) { |
| | | s = s.substring(5); |
| | | if (s.startsWith(" ")) s = s.substring(1); |
| | | } |
| | | if ("[DONE]".equals(s.trim())) { |
| | | continue; |
| | | } |
| | | try { |
| | | JSONObject obj = JSON.parseObject(s); |
| | | if (obj == null) continue; |
| | | JSONArray choices = obj.getJSONArray("choices"); |
| | | if (choices != null && !choices.isEmpty()) { |
| | | JSONObject c0 = choices.getJSONObject(0); |
| | | if (acc.getChoices() == null || acc.getChoices().isEmpty()) { |
| | | ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice(); |
| | | ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message(); |
| | | choice.setMessage(msg); |
| | | ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>(); |
| | | list.add(choice); |
| | | acc.setChoices(list); |
| | | } |
| | | ChatCompletionResponse.Choice choice = acc.getChoices().get(0); |
| | | ChatCompletionRequest.Message msg = choice.getMessage(); |
| | | if (msg.getRole() == null || msg.getRole().isEmpty()) { |
| | | msg.setRole("assistant"); |
| | | } |
| | | JSONObject delta = c0.getJSONObject("delta"); |
| | | if (delta != null) { |
| | | String c = delta.getString("content"); |
| | | if (c != null) { |
| | | String prev = msg.getContent(); |
| | | msg.setContent(prev == null ? c : prev + c); |
| | | } |
| | | String role = delta.getString("role"); |
| | | if (role != null && !role.isEmpty()) msg.setRole(role); |
| | | } |
| | | JSONObject message = c0.getJSONObject("message"); |
| | | if (message != null) { |
| | | String c = message.getString("content"); |
| | | if (c != null) { |
| | | String prev = msg.getContent(); |
| | | msg.setContent(prev == null ? c : prev + c); |
| | | } |
| | | String role = message.getString("role"); |
| | | if (role != null && !role.isEmpty()) msg.setRole(role); |
| | | } |
| | | String fr = c0.getString("finish_reason"); |
| | | if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr); |
| | | } |
| | | String id = obj.getString("id"); |
| | | if (id != null && !id.isEmpty()) acc.setId(id); |
| | | Long created = obj.getLong("created"); |
| | | if (created != null) acc.setCreated(created); |
| | | String object = obj.getString("object"); |
| | | if (object != null && !object.isEmpty()) acc.setObjectName(object); |
| | | } catch (Exception ignore) { |
| | | } |
| | | } |
| | | return acc; |
| | | } |
| | | |
| | | private ChatCompletionResponse parseCompletion(String payload) { |
| | | if (payload == null) return null; |
| | | try { |
| | | ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class); |
| | | if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) { |
| | | return r; |
| | | } |
| | | } catch (Exception ignore) { |
| | | } |
| | | ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload); |
| | | if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) { |
| | | return sse; |
| | | } |
| | | ChatCompletionResponse r = new ChatCompletionResponse(); |
| | | ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice(); |
| | | ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message(); |
| | | msg.setRole("assistant"); |
| | | msg.setContent(payload); |
| | | choice.setMessage(msg); |
| | | ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>(); |
| | | list.add(choice); |
| | | r.setChoices(list); |
| | | return r; |
| | | } |
| | | |
| | | private String nextTraceId() { |
| | | return UUID.randomUUID().toString().replace("-", ""); |
| | | } |
| | |
| | | } |
| | | |
| | | private Integer statusCodeOf(Throwable ex) { |
| | | if (ex instanceof LlmRouteException) { |
| | | return ((LlmRouteException) ex).statusCode; |
| | | if (ex instanceof RestClientResponseException) { |
| | | return ((RestClientResponseException) ex).getStatusCode().value(); |
| | | } |
| | | return null; |
| | | if (ex instanceof WebClientResponseException) { |
| | | return ((WebClientResponseException) ex).getStatusCode().value(); |
| | | } |
| | | return llmSpringAiClientService.statusCodeOf(ex); |
| | | } |
| | | |
| | | private String responseBodyOf(Throwable ex) { |
| | | if (ex instanceof LlmRouteException) { |
| | | return cut(((LlmRouteException) ex).body, LOG_TEXT_LIMIT); |
| | | if (ex instanceof RestClientResponseException) { |
| | | return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT); |
| | | } |
| | | return null; |
| | | if (ex instanceof WebClientResponseException) { |
| | | return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT); |
| | | } |
| | | return cut(llmSpringAiClientService.responseBodyOf(ex, LOG_TEXT_LIMIT), LOG_TEXT_LIMIT); |
| | | } |
| | | |
| | | private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) { |
| | |
| | | this.statusCode = statusCode; |
| | | this.payload = payload; |
| | | this.response = response; |
| | | } |
| | | } |
| | | |
| | | private static class RawCompletionResult { |
| | | private final int statusCode; |
| | | private final String payload; |
| | | |
| | | private RawCompletionResult(int statusCode, String payload) { |
| | | this.statusCode = statusCode; |
| | | this.payload = payload; |
| | | } |
| | | } |
| | | |
| | | private static class LlmRouteException extends RuntimeException { |
| | | private final int statusCode; |
| | | private final String body; |
| | | |
| | | private LlmRouteException(int statusCode, String body) { |
| | | super("http status=" + statusCode); |
| | | this.statusCode = statusCode; |
| | | this.body = body; |
| | | } |
| | | } |
| | | |