From 8636ff97bffec9f2130628bf09c9d0fbb371e2bc Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 10 三月 2026 16:53:24 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/ai/service/LlmChatService.java |  280 +++++++++++--------------------------------------------
 1 files changed, 58 insertions(+), 222 deletions(-)

diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index 3e25561..e2eddd6 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -1,8 +1,6 @@
 package com.zy.ai.service;
 
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
 import com.zy.ai.entity.ChatCompletionRequest;
 import com.zy.ai.entity.ChatCompletionResponse;
 import com.zy.ai.entity.LlmCallLog;
@@ -10,15 +8,16 @@
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
 import org.springframework.stereotype.Service;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.client.RestClientResponseException;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +33,7 @@
 
     private final LlmRoutingService llmRoutingService;
     private final LlmCallLogService llmCallLogService;
+    private final LlmSpringAiClientService llmSpringAiClientService;
 
     @Value("${llm.base-url:}")
     private String fallbackBaseUrl;
@@ -257,38 +257,11 @@
         drain.setDaemon(true);
         drain.start();
 
-        streamFlux(route, routeReq).subscribe(payload -> {
+        Flux<String> streamSource = streamFluxWithSpringAi(route, routeReq);
+        streamSource.subscribe(payload -> {
             if (payload == null || payload.isEmpty()) return;
-            String[] events = payload.split("\\r?\\n\\r?\\n");
-            for (String part : events) {
-                String s = part;
-                if (s == null || s.isEmpty()) continue;
-                if (s.startsWith("data:")) {
-                    s = s.substring(5);
-                    if (s.startsWith(" ")) s = s.substring(1);
-                }
-                if ("[DONE]".equals(s.trim())) {
-                    doneSeen.set(true);
-                    continue;
-                }
-                try {
-                    JSONObject obj = JSON.parseObject(s);
-                    JSONArray choices = obj.getJSONArray("choices");
-                    if (choices != null && !choices.isEmpty()) {
-                        JSONObject c0 = choices.getJSONObject(0);
-                        JSONObject delta = c0.getJSONObject("delta");
-                        if (delta != null) {
-                            String content = delta.getString("content");
-                            if (content != null) {
-                                queue.offer(content);
-                                appendLimited(outputBuffer, content);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    log.warn("瑙f瀽 LLM stream 鐗囨澶辫触: {}", e.getMessage());
-                }
-            }
+            queue.offer(payload);
+            appendLimited(outputBuffer, payload);
         }, err -> {
             errorSeen.set(true);
             doneSeen.set(true);
@@ -305,71 +278,27 @@
             }
             if (onError != null) onError.accept(err);
         }, () -> {
-            if (!doneSeen.get()) {
-                RuntimeException ex = new RuntimeException("LLM 娴佹剰澶栧畬鎴�");
-                errorSeen.set(true);
-                doneSeen.set(true);
-                boolean canSwitch = shouldSwitch(route, false);
-                markFailure(route, ex, canSwitch);
-                recordCall(traceId, scene, true, index + 1, route, false, 200,
-                        System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
-                        "error", ex, "unexpected_stream_end");
-                if (!emitted.get() && canSwitch && index < routes.size() - 1) {
-                    log.warn("LLM 璺敱娴佸紓甯稿畬鎴愶紝鑷姩鍒囨崲锛宑urrent={}", route.tag());
-                    attemptStream(routes, index + 1, req, onChunk, onComplete, onError, traceId, scene);
-                } else {
-                    if (onError != null) onError.accept(ex);
-                }
-            } else {
-                markSuccess(route);
-                recordCall(traceId, scene, true, index + 1, route, true, 200,
-                        System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
-                        "none", null, null);
-                doneSeen.set(true);
-            }
+            markSuccess(route);
+            recordCall(traceId, scene, true, index + 1, route, true, 200,
+                    System.currentTimeMillis() - start, routeReq, outputBuffer.toString(),
+                    "none", null, null);
+            doneSeen.set(true);
         });
     }
 
-    private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) {
-        WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
-        return client.post()
-                .uri("/chat/completions")
-                .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
-                .contentType(MediaType.APPLICATION_JSON)
-                .accept(MediaType.TEXT_EVENT_STREAM)
-                .bodyValue(req)
-                .exchangeToFlux(resp -> {
-                    int status = resp.rawStatusCode();
-                    if (status >= 200 && status < 300) {
-                        return resp.bodyToFlux(String.class);
-                    }
-                    return resp.bodyToMono(String.class)
-                            .defaultIfEmpty("")
-                            .flatMapMany(body -> Flux.error(new LlmRouteException(status, body)));
-                })
-                .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触, route={}", route.tag(), ex));
+    private Flux<String> streamFluxWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
+        return llmSpringAiClientService.streamCompletion(route.baseUrl, route.apiKey, req)
+                .doOnError(ex -> log.error("璋冪敤 Spring AI 娴佸紡澶辫触, route={}", route.tag(), ex));
     }
 
     private CompletionCallResult callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
-        WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
-        RawCompletionResult raw = client.post()
-                .uri("/chat/completions")
-                .header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
-                .contentType(MediaType.APPLICATION_JSON)
-                .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
-                .bodyValue(req)
-                .exchangeToMono(resp -> resp.bodyToFlux(String.class)
-                        .collectList()
-                        .map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list))))
-                .block();
+        return callCompletionWithSpringAi(route, req);
+    }
 
-        if (raw == null) {
-            throw new RuntimeException("LLM 杩斿洖涓虹┖");
-        }
-        if (raw.statusCode < 200 || raw.statusCode >= 300) {
-            throw new LlmRouteException(raw.statusCode, raw.payload);
-        }
-        return new CompletionCallResult(raw.statusCode, raw.payload, parseCompletion(raw.payload));
+    private CompletionCallResult callCompletionWithSpringAi(ResolvedRoute route, ChatCompletionRequest req) {
+        LlmSpringAiClientService.CompletionCallResult result =
+                llmSpringAiClientService.callCompletion(route.baseUrl, route.apiKey, req);
+        return new CompletionCallResult(result.getStatusCode(), result.getPayload(), result.getResponse());
     }
 
     private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
@@ -430,22 +359,36 @@
 
     private String errorText(Throwable ex) {
         if (ex == null) return "unknown";
-        if (ex instanceof LlmRouteException) {
-            LlmRouteException e = (LlmRouteException) ex;
-            String body = e.body == null ? "" : e.body;
-            if (body.length() > 240) {
+        if (ex instanceof RestClientResponseException) {
+            RestClientResponseException e = (RestClientResponseException) ex;
+            String body = e.getResponseBodyAsString();
+            if (body != null && body.length() > 240) {
                 body = body.substring(0, 240);
             }
-            return "status=" + e.statusCode + ", body=" + body;
+            return "status=" + e.getStatusCode().value() + ", body=" + body;
+        }
+        if (ex instanceof WebClientResponseException) {
+            WebClientResponseException e = (WebClientResponseException) ex;
+            String body = e.getResponseBodyAsString();
+            if (body != null && body.length() > 240) {
+                body = body.substring(0, 240);
+            }
+            return "status=" + e.getStatusCode().value() + ", body=" + body;
+        }
+        Integer springAiStatus = llmSpringAiClientService.statusCodeOf(ex);
+        if (springAiStatus != null) {
+            return "status=" + springAiStatus + ", body=" + llmSpringAiClientService.responseBodyOf(ex, 240);
         }
         return ex.getMessage() == null ? ex.toString() : ex.getMessage();
     }
 
     private boolean isQuotaExhausted(Throwable ex) {
-        if (!(ex instanceof LlmRouteException)) return false;
-        LlmRouteException e = (LlmRouteException) ex;
-        if (e.statusCode == 429) return true;
-        String text = (e.body == null ? "" : e.body).toLowerCase();
+        Integer status = statusCodeOf(ex);
+        if (status != null && status == 429) {
+            return true;
+        }
+        String text = responseBodyOf(ex);
+        text = text == null ? "" : text.toLowerCase(Locale.ROOT);
         return text.contains("insufficient_quota")
                 || text.contains("quota")
                 || text.contains("浣欓")
@@ -476,98 +419,6 @@
         return s == null || s.trim().isEmpty();
     }
 
-    private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
-        if (payload == null || payload.isEmpty()) return acc;
-        String[] events = payload.split("\\r?\\n\\r?\\n");
-        for (String part : events) {
-            String s = part;
-            if (s == null || s.isEmpty()) continue;
-            if (s.startsWith("data:")) {
-                s = s.substring(5);
-                if (s.startsWith(" ")) s = s.substring(1);
-            }
-            if ("[DONE]".equals(s.trim())) {
-                continue;
-            }
-            try {
-                JSONObject obj = JSON.parseObject(s);
-                if (obj == null) continue;
-                JSONArray choices = obj.getJSONArray("choices");
-                if (choices != null && !choices.isEmpty()) {
-                    JSONObject c0 = choices.getJSONObject(0);
-                    if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
-                        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
-                        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
-                        choice.setMessage(msg);
-                        ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
-                        list.add(choice);
-                        acc.setChoices(list);
-                    }
-                    ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
-                    ChatCompletionRequest.Message msg = choice.getMessage();
-                    if (msg.getRole() == null || msg.getRole().isEmpty()) {
-                        msg.setRole("assistant");
-                    }
-                    JSONObject delta = c0.getJSONObject("delta");
-                    if (delta != null) {
-                        String c = delta.getString("content");
-                        if (c != null) {
-                            String prev = msg.getContent();
-                            msg.setContent(prev == null ? c : prev + c);
-                        }
-                        String role = delta.getString("role");
-                        if (role != null && !role.isEmpty()) msg.setRole(role);
-                    }
-                    JSONObject message = c0.getJSONObject("message");
-                    if (message != null) {
-                        String c = message.getString("content");
-                        if (c != null) {
-                            String prev = msg.getContent();
-                            msg.setContent(prev == null ? c : prev + c);
-                        }
-                        String role = message.getString("role");
-                        if (role != null && !role.isEmpty()) msg.setRole(role);
-                    }
-                    String fr = c0.getString("finish_reason");
-                    if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
-                }
-                String id = obj.getString("id");
-                if (id != null && !id.isEmpty()) acc.setId(id);
-                Long created = obj.getLong("created");
-                if (created != null) acc.setCreated(created);
-                String object = obj.getString("object");
-                if (object != null && !object.isEmpty()) acc.setObjectName(object);
-            } catch (Exception ignore) {
-            }
-        }
-        return acc;
-    }
-
-    private ChatCompletionResponse parseCompletion(String payload) {
-        if (payload == null) return null;
-        try {
-            ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
-            if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
-                return r;
-            }
-        } catch (Exception ignore) {
-        }
-        ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
-        if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
-            return sse;
-        }
-        ChatCompletionResponse r = new ChatCompletionResponse();
-        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
-        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
-        msg.setRole("assistant");
-        msg.setContent(payload);
-        choice.setMessage(msg);
-        ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
-        list.add(choice);
-        r.setChoices(list);
-        return r;
-    }
-
     private String nextTraceId() {
         return UUID.randomUUID().toString().replace("-", "");
     }
@@ -588,17 +439,23 @@
     }
 
     private Integer statusCodeOf(Throwable ex) {
-        if (ex instanceof LlmRouteException) {
-            return ((LlmRouteException) ex).statusCode;
+        if (ex instanceof RestClientResponseException) {
+            return ((RestClientResponseException) ex).getStatusCode().value();
         }
-        return null;
+        if (ex instanceof WebClientResponseException) {
+            return ((WebClientResponseException) ex).getStatusCode().value();
+        }
+        return llmSpringAiClientService.statusCodeOf(ex);
     }
 
     private String responseBodyOf(Throwable ex) {
-        if (ex instanceof LlmRouteException) {
-            return cut(((LlmRouteException) ex).body, LOG_TEXT_LIMIT);
+        if (ex instanceof RestClientResponseException) {
+            return cut(((RestClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
         }
-        return null;
+        if (ex instanceof WebClientResponseException) {
+            return cut(((WebClientResponseException) ex).getResponseBodyAsString(), LOG_TEXT_LIMIT);
+        }
+        return cut(llmSpringAiClientService.responseBodyOf(ex, LOG_TEXT_LIMIT), LOG_TEXT_LIMIT);
     }
 
     private String buildResponseText(ChatCompletionResponse resp, String fallbackPayload) {
@@ -671,27 +528,6 @@
             this.statusCode = statusCode;
             this.payload = payload;
             this.response = response;
-        }
-    }
-
-    private static class RawCompletionResult {
-        private final int statusCode;
-        private final String payload;
-
-        private RawCompletionResult(int statusCode, String payload) {
-            this.statusCode = statusCode;
-            this.payload = payload;
-        }
-    }
-
-    private static class LlmRouteException extends RuntimeException {
-        private final int statusCode;
-        private final String body;
-
-        private LlmRouteException(int statusCode, String body) {
-            super("http status=" + statusCode);
-            this.statusCode = statusCode;
-            this.body = body;
         }
     }
 

--
Gitblit v1.9.1