From 8636ff97bffec9f2130628bf09c9d0fbb371e2bc Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 10 三月 2026 16:53:24 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/ai/service/LlmChatService.java |  433 ++---------------------------------------------------
 1 files changed, 20 insertions(+), 413 deletions(-)

diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index c92ede3..e2eddd6 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -1,22 +1,15 @@
 package com.zy.ai.service;
 
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
 import com.zy.ai.entity.ChatCompletionRequest;
 import com.zy.ai.entity.ChatCompletionResponse;
 import com.zy.ai.entity.LlmCallLog;
 import com.zy.ai.entity.LlmRouteConfig;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.ai.openai.api.OpenAiApi;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestClientResponseException;
-import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 
@@ -25,7 +18,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +33,7 @@
 
     private final LlmRoutingService llmRoutingService;
     private final LlmCallLogService llmCallLogService;
+    private final LlmSpringAiClientService llmSpringAiClientService;
 
     @Value("${llm.base-url:}")
     private String fallbackBaseUrl;
@@ -264,45 +257,11 @@
         drain.setDaemon(true);
         drain.start();
 
-        boolean springAiStreaming = canUseSpringAi(routeReq);
-        Flux<String> streamSource = springAiStreaming ? streamFluxWithSpringAi(route, routeReq) : streamFlux(route, routeReq);
+        Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq);
         streamSource.subscribe(payload -> {
             if (payload == null || payload.isEmpty()) return;
-            if (springAiStreaming) {
-                queue.offer(payload);
-                appendLimited(outputBuffer, payload);
-                return;
-            }
-            String[] events = payload.split("\\r?\\n\\r?\\n");
-            for (String part : events) {
-                String s = part;
-                if (s == null || s.isEmpty()) continue;
-                if (s.startsWith("data:")) {
-                    s = s.substring(5);
-                    if (s.startsWith(" ")) s = s.substring(1);
-                }
-                if ("[DONE]".equals(s.trim())) {
-                    doneSeen.set(true);
-                    continue;
-                }
-                try {
-                    JSONObject obj = JSON.parseObject(s);
-                    JSONArray choices = obj.getJSONArray("choices");
-                    if (choices != null && !choices.isEmpty()) {
-                        JSONObject c0 = choices.getJSONObject(0);
-                        JSONObject delta = c0.getJSONObject("delta");
-                        if (delta != null) {
-                            String content = delta.getString("content");
-                            if (content != null) {
-                                queue.offer(content);
-                                appendLimited(outputBuffer, content);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    log.warn("瑙f瀽 LLM stream 鐗囨澶辫触: {}", e.getMessage());
-                }
-            }
+            queue.offer(payload);
+            appendLimited(outputBuffer, payload);
         }, err -> {
             errorSeen.set(true);
             doneSeen.set(true);
@@ -319,100 +278,27 @@
             }
             if (onError != null) onError.accept(err);
         }, () -> {
-            if (!doneSeen.get()) {
-                RuntimeException ex = new RuntimeException("LLM 娴佹剰澶栧畬鎴�");
-                errorSeen.set(true);
-                doneSeen.set(true);
-                boolean canSwitch = shouldSwitch(route, false);
-                markFailure(route, ex, canSwitch);
-                recordCall(traceId, scene, true, index + 1, route, false, 200,
-                        System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
-                        "error", ex, "unexpected_stream_end");
-                if (!emitted.get() && canSwitch && index < routes.size() - 1) {
-                    log.warn("LLM 璺敱娴佸紓甯稿畬鎴愶紝鑷姩鍒囨崲锛宑urrent={}", route.tag());
-                    attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
-                } else {
-                    if (onError != null) onError.accept(ex);
-                }
-            } else {
-                markSuccess(route);
-                recordCall(traceId, scene, true, index + 1, route, true, 200,
-                        System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
-                        "none", null, null);
-                doneSeen.set(true);
-            }
+            markSuccess(route);
+            recordCall(traceId, scene, true, index + 1, route, true, 200,
+                    System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+                    "none", null, null);
+            doneSeen.set(true);
         });
     }
 
-    private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) {
-        WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
-        return client.post()
-                .uri("/chat/completions")
-                .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
-                .contentType(MediaType.APPLICATION_JSON)
-                .accept(MediaType.TEXT_EVENT_STREAM)
-                .bodyValue(req)
-                .exchangeToFlux(resp -> {
-                    int status = resp.rawStatusCode();
-                    if (status >= 200 && status < 300) {
-                        return resp.bodyToFlux(String.class);
-                    }
-                    return resp.bodyToMono(String.class)
-                            .defaultIfEmpty("")
-                            .flatMapMany(body -> Flux.error(new LlmRouteException(status, body)));
-                })
-                .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触, route={}", route.tag(), ex));
-    }
-
     private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
-        OpenAiApi api = buildOpenAiApi(route);
-        OpenAiApi.ChatCompletionRequest springReq = buildSpringAiRequest(route, req, true);
-        return api.chatCompletionStream(springReq)
-                .flatMapIterable(chunk -> chunk == null || chunk.choices() == null ? List.<OpenAiApi.ChatCompletionChunk.ChunkChoice>of() : chunk.choices())
-                .map(OpenAiApi.ChatCompletionChunk.ChunkChoice::delta)
-                .filter(Objects::nonNull)
-                .map(this::extractSpringAiContent)
-                .filter(text -> text != null && !text.isEmpty())
+        return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req)
                 .doOnError(ex -> log.error("璋冪敤 Spring AI 娴佸紡澶辫触, route={}", route.tag(), ex));
     }
 
     private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
-        if (canUseSpringAi(req)) {
-            return callCompletionWithSpringAi(route, req);
-        }
-        return callCompletionWithWebClient(route, req);
-    }
-
-    private CompletionCallResult callCompletionWithWebClient(ResolvedRoute route, ChatCompletionRequest req) {
-        WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
-        RawCompletionResult raw = client.post()
-                .uri("/chat/completions")
-                .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
-                .contentType(MediaType.APPLICATION_JSON)
-                .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
-                .bodyValue(req)
-                .exchangeToMono(resp -> resp.bodyToFlux(String.class)
-                        .collectList()
-                        .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list))))
-                .block();
-
-        if (raw == null) {
-            throw new RuntimeException("LLM 杩斿洖涓虹┖");
-        }
-        if (raw.statusCode < 200 || raw.statusCode >= 300) {
-            throw new LlmRouteException(raw.statusCode, raw.payload);
-        }
-        return new CompletionCallResult(raw.statusCode, raw.payload, parseCompletion(raw.payload));
+        return callCompletionWithSpringAi(route, req);
     }
 
     private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
-        OpenAiApi api = buildOpenAiApi(route);
-        OpenAiApi.ChatCompletionRequest springReq = buildSpringAiRequest(route, req, false);
-        ResponseEntity<OpenAiApi.ChatCompletion> entity = api.chatCompletionEntity(springReq);
-        OpenAiApi.ChatCompletion body = entity.getBody();
-        return new CompletionCallResult(entity.getStatusCode().value(),
-                body == null ? null : JSON.toJSONString(body),
-                toLegacyResponse(body));
+        LlmSpringAiClientService.CompletionCallResult result =
+                llmSpringAiClientService.callCompletion(route.baseUrl, route.apiKey, req);
+        return new CompletionCallResult(result.getStatusCode(), result.getPayload(), result.getResponse());
     }
 
     private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
@@ -459,10 +345,6 @@
         return quota ? route.switchOnQuota : route.switchOnError;
     }
 
-    private boolean canUseSpringAi(ChatCompletionRequest req) {
-        return req != null && (req.getTools() == null || req.getTools().isEmpty());
-    }
-
     private void markSuccess(ResolvedRoute route) {
         if (route.id != null) {
             llmRoutingService.markSuccess(route.id);
@@ -477,14 +359,6 @@
 
     private String errorText(Throwable ex) {
         if (ex == null) return "unknown";
-        if (ex instanceof LlmRouteException) {
-            LlmRouteException e = (LlmRouteException) ex;
-            String body = e.body == null ? "" : e.body;
-            if (body.length() > 240) {
-                body = body.substring(0, 240);
-            }
-            return "status=" + e.statusCode + ", body=" + body;
-        }
         if (ex instanceof RestClientResponseException) {
             RestClientResponseException e = (RestClientResponseException) ex;
             String body = e.getResponseBodyAsString();
@@ -500,6 +374,10 @@
                 body = body.substring(0, 240);
             }
             return "status=" + e.getStatusCode().value() + ", body=" + body;
+        }
+        Integer springAiStatus = llmSpringAiClientService.statusCodeOf(ex);
+        if (springAiStatus != null) {
+            return "status=" + springAiStatus + ", body=" + llmSpringAiClientService.responseBodyOf(ex, 240);
         }
         return ex.getMessage() == null ? ex.toString() : ex.getMessage();
     }
@@ -541,98 +419,6 @@
         return s == null || s.trim().isEmpty();
     }
 
-    private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
-        if (payload == null || payload.isEmpty()) return acc;
-        String[] events = payload.split("\\r?\\n\\r?\\n");
-        for (String part : events) {
-            String s = part;
-            if (s == null || s.isEmpty()) continue;
-            if (s.startsWith("data:")) {
-                s = s.substring(5);
-                if (s.startsWith(" ")) s = s.substring(1);
-            }
-            if ("[DONE]".equals(s.trim())) {
-                continue;
-            }
-            try {
-                JSONObject obj = JSON.parseObject(s);
-                if (obj == null) continue;
-                JSONArray choices = obj.getJSONArray("choices");
-                if (choices != null && !choices.isEmpty()) {
-                    JSONObject c0 = choices.getJSONObject(0);
-                    if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
-                        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
-                        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
-                        choice.setMessage(msg);
-                        ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
-                        list.add(choice);
-                        acc.setChoices(list);
-                    }
-                    ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
-                    ChatCompletionRequest.Message msg = choice.getMessage();
-                    if (msg.getRole() == null || msg.getRole().isEmpty()) {
-                        msg.setRole("assistant");
-                    }
-                    JSONObject delta = c0.getJSONObject("delta");
-                    if (delta != null) {
-                        String c = delta.getString("content");
-                        if (c != null) {
-                            String prev = msg.getContent();
-                            msg.setContent(prev == null ? c : prev + c);
-                        }
-                        String role = delta.getString("role");
-                        if (role != null && !role.isEmpty()) msg.setRole(role);
-                    }
-                    JSONObject message = c0.getJSONObject("message");
-                    if (message != null) {
-                        String c = message.getString("content");
-                        if (c != null) {
-                            String prev = msg.getContent();
-                            msg.setContent(prev == null ? c : prev + c);
-                        }
-                        String role = message.getString("role");
-                        if (role != null && !role.isEmpty()) msg.setRole(role);
-                    }
-                    String fr = c0.getString("finish_reason");
-                    if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
-                }
-                String id = obj.getString("id");
-                if (id != null && !id.isEmpty()) acc.setId(id);
-                Long created = obj.getLong("created");
-                if (created != null) acc.setCreated(created);
-                String object = obj.getString("object");
-                if (object != null && !object.isEmpty()) acc.setObjectName(object);
-            } catch (Exception ignore) {
-            }
-        }
-        return acc;
-    }
-
-    private ChatCompletionResponse parseCompletion(String payload) {
-        if (payload == null) return null;
-        try {
-            ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
-            if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
-                return r;
-            }
-        } catch (Exception ignore) {
-        }
-        ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
-        if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
-            return sse;
-        }
-        ChatCompletionResponse r = new ChatCompletionResponse();
-        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
-        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
-        msg.setRole("assistant");
-        msg.setContent(payload);
-        choice.setMessage(msg);
-        ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
-        list.add(choice);
-        r.setChoices(list);
-        return r;
-    }
-
     private String nextTraceId() {
         return UUID.randomUUID().toString().replace("-", "");
     }
@@ -653,29 +439,23 @@
     }
 
     private Integer statusCodeOf(Throwable ex) {
-        if (ex instanceof LlmRouteException) {
-            return ((LlmRouteException) ex).statusCode;
-        }
         if (ex instanceof RestClientResponseException) {
             return ((RestClientResponseException) ex).getStatusCode().value();
         }
         if (ex instanceof WebClientResponseException) {
             return ((WebClientResponseException) ex).getStatusCode().value();
         }
-        return null;
+        return llmSpringAiClientService.statusCodeOf(ex);
     }
 
     private String responseBodyOf(Throwable ex) {
-        if (ex instanceof LlmRouteException) {
-            return cut(((LlmRouteException) ex).body, LOG_TEXT_LIMIT);
-        }
         if (ex instanceof RestClientResponseException) {
             return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
         }
         if (ex instanceof WebClientResponseException) {
             return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
         }
-        return null;
+        return cut(llmSpringAiClientService.responseBodyOf(ex, LOG_TEXT_LIMIT), LOG_TEXT_LIMIT);
     }
 
     private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) {
@@ -694,158 +474,6 @@
 
     private String safeName(Throwable ex) {
         return ex == null ? null : ex.getClass().getSimpleName();
-    }
-
-    private OpenAiApi buildOpenAiApi(ResolvedRoute route) {
-        return OpenAiApi.builder()
-                .baseUrl(route.baseUrl)
-                .apiKey(route.apiKey)
-                .build();
-    }
-
-    private OpenAiApi.ChatCompletionRequest buildSpringAiRequest(ResolvedRoute route,
-                                                                 ChatCompletionRequest req,
-                                                                 boolean stream) {
-        HashMap<String, Object> extraBody = new HashMap<>();
-        if (route.thinkingEnabled || req.getThinking() != null) {
-            HashMap<String, Object> thinking = new HashMap<>();
-            thinking.put("type", req.getThinking() != null && req.getThinking().getType() != null
-                    ? req.getThinking().getType()
-                    : "enable");
-            extraBody.put("thinking", thinking);
-        }
-        return new OpenAiApi.ChatCompletionRequest(
-                toSpringAiMessages(req.getMessages()),
-                route.model,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                req.getMax_tokens(),
-                null,
-                1,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                stream,
-                stream ? OpenAiApi.ChatCompletionRequest.StreamOptions.INCLUDE_USAGE : null,
-                req.getTemperature(),
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                extraBody.isEmpty() ? null : extraBody
-        );
-    }
-
-    private List<OpenAiApi.ChatCompletionMessage> toSpringAiMessages(List<ChatCompletionRequest.Message> messages) {
-        ArrayList<OpenAiApi.ChatCompletionMessage> result = new ArrayList<>();
-        if (messages == null) {
-            return result;
-        }
-        for (ChatCompletionRequest.Message message : messages) {
-            if (message == null) {
-                continue;
-            }
-            result.add(new OpenAiApi.ChatCompletionMessage(
-                    message.getContent(),
-                    toSpringAiRole(message.getRole())
-            ));
-        }
-        return result;
-    }
-
-    private OpenAiApi.ChatCompletionMessage.Role toSpringAiRole(String role) {
-        if (role == null) {
-            return OpenAiApi.ChatCompletionMessage.Role.USER;
-        }
-        switch (role.trim().toLowerCase(Locale.ROOT)) {
-            case "system":
-                return OpenAiApi.ChatCompletionMessage.Role.SYSTEM;
-            case "assistant":
-                return OpenAiApi.ChatCompletionMessage.Role.ASSISTANT;
-            case "tool":
-                return OpenAiApi.ChatCompletionMessage.Role.TOOL;
-            default:
-                return OpenAiApi.ChatCompletionMessage.Role.USER;
-        }
-    }
-
-    private ChatCompletionResponse toLegacyResponse(OpenAiApi.ChatCompletion completion) {
-        if (completion == null) {
-            return null;
-        }
-        ChatCompletionResponse response = new ChatCompletionResponse();
-        response.setId(completion.id());
-        response.setCreated(completion.created());
-        response.setObjectName(completion.object());
-        if (completion.usage() != null) {
-            ChatCompletionResponse.Usage usage = new ChatCompletionResponse.Usage();
-            usage.setPromptTokens(completion.usage().promptTokens());
-            usage.setCompletionTokens(completion.usage().completionTokens());
-            usage.setTotalTokens(completion.usage().totalTokens());
-            response.setUsage(usage);
-        }
-        if (completion.choices() != null) {
-            ArrayList<ChatCompletionResponse.Choice> choices = new ArrayList<>();
-            for (OpenAiApi.ChatCompletion.Choice choice : completion.choices()) {
-                ChatCompletionResponse.Choice item = new ChatCompletionResponse.Choice();
-                item.setIndex(choice.index());
-                if (choice.finishReason() != null) {
-                    item.setFinishReason(choice.finishReason().name().toLowerCase(Locale.ROOT));
-                }
-                item.setMessage(toLegacyMessage(choice.message()));
-                choices.add(item);
-            }
-            response.setChoices(choices);
-        }
-        return response;
-    }
-
-    private ChatCompletionRequest.Message toLegacyMessage(OpenAiApi.ChatCompletionMessage message) {
-        if (message == null) {
-            return null;
-        }
-        ChatCompletionRequest.Message result = new ChatCompletionRequest.Message();
-        result.setContent(extractSpringAiContent(message));
-        if (message.role() != null) {
-            result.setRole(message.role().name().toLowerCase(Locale.ROOT));
-        }
-        result.setName(message.name());
-        result.setTool_call_id(message.toolCallId());
-        return result;
-    }
-
-    private String extractSpringAiContent(OpenAiApi.ChatCompletionMessage message) {
-        if (message == null || message.rawContent() == null) {
-            return null;
-        }
-        Object content = message.rawContent();
-        if (content instanceof String) {
-            return (String) content;
-        }
-        if (content instanceof List) {
-            try {
-                @SuppressWarnings("unchecked")
-                List<OpenAiApi.ChatCompletionMessage.MediaContent> media =
-                        (List<OpenAiApi.ChatCompletionMessage.MediaContent>) content;
-                return OpenAiApi.getTextContent(media);
-            } catch (ClassCastException ignore) {
-            }
-        }
-        return String.valueOf(content);
     }
 
     private String cut(String text, int maxLen) {
@@ -900,27 +528,6 @@
             this.statusCode = statusCode;
             this.payload = payload;
             this.response = response;
-        }
-    }
-
-    private static class RawCompletionResult {
-        private final int statusCode;
-        private final String payload;
-
-        private RawCompletionResult(int statusCode, String payload) {
-            this.statusCode = statusCode;
-            this.payload = payload;
-        }
-    }
-
-    private static class LlmRouteException extends RuntimeException {
-        private final int statusCode;
-        private final String body;
-
-        private LlmRouteException(int statusCode, String body) {
-            super("http status=" + statusCode);
-            this.statusCode = statusCode;
-            this.body = body;
         }
     }
 

--
Gitblit v1.9.1