#
Junjie
1 天以前 8636ff97bffec9f2130628bf09c9d0fbb371e2bc
src/main/java/com/zy/ai/service/LlmChatService.java
@@ -1,8 +1,6 @@
package com.zy.ai.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zy.ai.entity.ChatCompletionRequest;
import com.zy.ai.entity.ChatCompletionResponse;
import com.zy.ai.entity.LlmCallLog;
@@ -10,15 +8,16 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.client.RestClientResponseException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -34,6 +33,7 @@
    private final LlmRoutingService llmRoutingService;
    private final LlmCallLogService llmCallLogService;
    private final LlmSpringAiClientService llmSpringAiClientService;
    @Value("${llm.base-url:}")
    private String fallbackBaseUrl;
@@ -257,38 +257,11 @@
        drain.setDaemon(true);
        drain.start();
        streamFlux(route, routeReq).subscribe(payload -> {
        Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq);
        streamSource.subscribe(payload -> {
            if (payload == null || payload.isEmpty()) return;
            String[] events = payload.split("\\r?\\n\\r?\\n");
            for (String part : events) {
                String s = part;
                if (s == null || s.isEmpty()) continue;
                if (s.startsWith("data:")) {
                    s = s.substring(5);
                    if (s.startsWith(" ")) s = s.substring(1);
                }
                if ("[DONE]".equals(s.trim())) {
                    doneSeen.set(true);
                    continue;
                }
                try {
                    JSONObject obj = JSON.parseObject(s);
                    JSONArray choices = obj.getJSONArray("choices");
                    if (choices != null && !choices.isEmpty()) {
                        JSONObject c0 = choices.getJSONObject(0);
                        JSONObject delta = c0.getJSONObject("delta");
                        if (delta != null) {
                            String content = delta.getString("content");
                            if (content != null) {
                                queue.offer(content);
                                appendLimited(outputBuffer, content);
                            }
                        }
                    }
                } catch (Exception e) {
                    log.warn("解析 LLM stream 片段失败: {}", e.getMessage());
                }
            }
            queue.offer(payload);
            appendLimited(outputBuffer, payload);
        }, err -> {
            errorSeen.set(true);
            doneSeen.set(true);
@@ -305,71 +278,27 @@
            }
            if (onError != null) onError.accept(err);
        }, () -> {
            if (!doneSeen.get()) {
                RuntimeException ex = new RuntimeException("LLM 流意外完成");
                errorSeen.set(true);
                doneSeen.set(true);
                boolean canSwitch = shouldSwitch(route, false);
                markFailure(route, ex, canSwitch);
                recordCall(traceId, scene, true, index + 1, route, false, 200,
                        System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
                        "error", ex, "unexpected_stream_end");
                if (!emitted.get() && canSwitch && index < routes.size() - 1) {
                    log.warn("LLM 路由流异常完成,自动切换,current={}", route.tag());
                    attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
                } else {
                    if (onError != null) onError.accept(ex);
                }
            } else {
                markSuccess(route);
                recordCall(traceId, scene, true, index + 1, route, true, 200,
                        System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
                        "none", null, null);
                doneSeen.set(true);
            }
            markSuccess(route);
            recordCall(traceId, scene, true, index + 1, route, true, 200,
                    System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
                    "none", null, null);
            doneSeen.set(true);
        });
    }
    private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) {
        WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
        return client.post()
                .uri("/chat/completions")
                .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .bodyValue(req)
                .exchangeToFlux(resp -> {
                    int status = resp.rawStatusCode();
                    if (status >= 200 && status < 300) {
                        return resp.bodyToFlux(String.class);
                    }
                    return resp.bodyToMono(String.class)
                            .defaultIfEmpty("")
                            .flatMapMany(body -> Flux.error(new LlmRouteException(status, body)));
                })
                .doOnError(ex -> log.error("调用 LLM 流式失败, route={}", route.tag(), ex));
    private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
        return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req)
                .doOnError(ex -> log.error("调用 Spring AI 流式失败, route={}", route.tag(), ex));
    }
    private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
        WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
        RawCompletionResult raw = client.post()
                .uri("/chat/completions")
                .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
                .bodyValue(req)
                .exchangeToMono(resp -> resp.bodyToFlux(String.class)
                        .collectList()
                        .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list))))
                .block();
        return callCompletionWithSpringAi(route, req);
    }
        if (raw == null) {
            throw new RuntimeException("LLM 返回为空");
        }
        if (raw.statusCode < 200 || raw.statusCode >= 300) {
            throw new LlmRouteException(raw.statusCode, raw.payload);
        }
        return new CompletionCallResult(raw.statusCode, raw.payload, parseCompletion(raw.payload));
    private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
        LlmSpringAiClientService.CompletionCallResult result =
                llmSpringAiClientService.callCompletion(route.baseUrl, route.apiKey, req);
        return new CompletionCallResult(result.getStatusCode(), result.getPayload(), result.getResponse());
    }
    private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
@@ -430,22 +359,36 @@
    private String errorText(Throwable ex) {
        if (ex == null) return "unknown";
        if (ex instanceof LlmRouteException) {
            LlmRouteException e = (LlmRouteException) ex;
            String body = e.body == null ? "" : e.body;
            if (body.length() > 240) {
        if (ex instanceof RestClientResponseException) {
            RestClientResponseException e = (RestClientResponseException) ex;
            String body = e.getResponseBodyAsString();
            if (body != null && body.length() > 240) {
                body = body.substring(0, 240);
            }
            return "status=" + e.statusCode + ", body=" + body;
            return "status=" + e.getStatusCode().value() + ", body=" + body;
        }
        if (ex instanceof WebClientResponseException) {
            WebClientResponseException e = (WebClientResponseException) ex;
            String body = e.getResponseBodyAsString();
            if (body != null && body.length() > 240) {
                body = body.substring(0, 240);
            }
            return "status=" + e.getStatusCode().value() + ", body=" + body;
        }
        Integer springAiStatus = llmSpringAiClientService.statusCodeOf(ex);
        if (springAiStatus != null) {
            return "status=" + springAiStatus + ", body=" + llmSpringAiClientService.responseBodyOf(ex, 240);
        }
        return ex.getMessage() == null ? ex.toString() : ex.getMessage();
    }
    private boolean isQuotaExhausted(Throwable ex) {
        if (!(ex instanceof LlmRouteException)) return false;
        LlmRouteException e = (LlmRouteException) ex;
        if (e.statusCode == 429) return true;
        String text = (e.body == null ? "" : e.body).toLowerCase();
        Integer status = statusCodeOf(ex);
        if (status != null && status == 429) {
            return true;
        }
        String text = responseBodyOf(ex);
        text = text == null ? "" : text.toLowerCase(Locale.ROOT);
        return text.contains("insufficient_quota")
                || text.contains("quota")
                || text.contains("余额")
@@ -476,98 +419,6 @@
        return s == null || s.trim().isEmpty();
    }
    private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
        if (payload == null || payload.isEmpty()) return acc;
        String[] events = payload.split("\\r?\\n\\r?\\n");
        for (String part : events) {
            String s = part;
            if (s == null || s.isEmpty()) continue;
            if (s.startsWith("data:")) {
                s = s.substring(5);
                if (s.startsWith(" ")) s = s.substring(1);
            }
            if ("[DONE]".equals(s.trim())) {
                continue;
            }
            try {
                JSONObject obj = JSON.parseObject(s);
                if (obj == null) continue;
                JSONArray choices = obj.getJSONArray("choices");
                if (choices != null && !choices.isEmpty()) {
                    JSONObject c0 = choices.getJSONObject(0);
                    if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
                        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
                        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
                        choice.setMessage(msg);
                        ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
                        list.add(choice);
                        acc.setChoices(list);
                    }
                    ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
                    ChatCompletionRequest.Message msg = choice.getMessage();
                    if (msg.getRole() == null || msg.getRole().isEmpty()) {
                        msg.setRole("assistant");
                    }
                    JSONObject delta = c0.getJSONObject("delta");
                    if (delta != null) {
                        String c = delta.getString("content");
                        if (c != null) {
                            String prev = msg.getContent();
                            msg.setContent(prev == null ? c : prev + c);
                        }
                        String role = delta.getString("role");
                        if (role != null && !role.isEmpty()) msg.setRole(role);
                    }
                    JSONObject message = c0.getJSONObject("message");
                    if (message != null) {
                        String c = message.getString("content");
                        if (c != null) {
                            String prev = msg.getContent();
                            msg.setContent(prev == null ? c : prev + c);
                        }
                        String role = message.getString("role");
                        if (role != null && !role.isEmpty()) msg.setRole(role);
                    }
                    String fr = c0.getString("finish_reason");
                    if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
                }
                String id = obj.getString("id");
                if (id != null && !id.isEmpty()) acc.setId(id);
                Long created = obj.getLong("created");
                if (created != null) acc.setCreated(created);
                String object = obj.getString("object");
                if (object != null && !object.isEmpty()) acc.setObjectName(object);
            } catch (Exception ignore) {
            }
        }
        return acc;
    }
    private ChatCompletionResponse parseCompletion(String payload) {
        if (payload == null) return null;
        try {
            ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
            if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
                return r;
            }
        } catch (Exception ignore) {
        }
        ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
        if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
            return sse;
        }
        ChatCompletionResponse r = new ChatCompletionResponse();
        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
        msg.setRole("assistant");
        msg.setContent(payload);
        choice.setMessage(msg);
        ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
        list.add(choice);
        r.setChoices(list);
        return r;
    }
    private String nextTraceId() {
        return UUID.randomUUID().toString().replace("-", "");
    }
@@ -588,17 +439,23 @@
    }
    private Integer statusCodeOf(Throwable ex) {
        if (ex instanceof LlmRouteException) {
            return ((LlmRouteException) ex).statusCode;
        if (ex instanceof RestClientResponseException) {
            return ((RestClientResponseException) ex).getStatusCode().value();
        }
        return null;
        if (ex instanceof WebClientResponseException) {
            return ((WebClientResponseException) ex).getStatusCode().value();
        }
        return llmSpringAiClientService.statusCodeOf(ex);
    }
    private String responseBodyOf(Throwable ex) {
        if (ex instanceof LlmRouteException) {
            return cut(((LlmRouteException) ex).body, LOG_TEXT_LIMIT);
        if (ex instanceof RestClientResponseException) {
            return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
        }
        return null;
        if (ex instanceof WebClientResponseException) {
            return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
        }
        return cut(llmSpringAiClientService.responseBodyOf(ex, LOG_TEXT_LIMIT), LOG_TEXT_LIMIT);
    }
    private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) {
@@ -671,27 +528,6 @@
            this.statusCode = statusCode;
            this.payload = payload;
            this.response = response;
        }
    }
    private static class RawCompletionResult {
        private final int statusCode;
        private final String payload;
        private RawCompletionResult(int statusCode, String payload) {
            this.statusCode = statusCode;
            this.payload = payload;
        }
    }
    private static class LlmRouteException extends RuntimeException {
        private final int statusCode;
        private final String body;
        private LlmRouteException(int statusCode, String body) {
            super("http status=" + statusCode);
            this.statusCode = statusCode;
            this.body = body;
        }
    }