From 4227b39d7a4fc6bc3ba012a09b59f653ab92ab23 Mon Sep 17 00:00:00 2001
From: Junjie <DELL@qq.com>
Date: 星期一, 15 十二月 2025 16:40:54 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/ai/service/LlmChatService.java |  180 +++++++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 159 insertions(+), 21 deletions(-)

diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index 0736951..d599775 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -14,6 +14,9 @@
 
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
@@ -43,14 +46,20 @@
         req.setMessages(messages);
         req.setTemperature(temperature != null ? temperature : 0.3);
         req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
+        req.setStream(false);
 
         ChatCompletionResponse response = llmWebClient.post()
                 .uri("/chat/completions")
                 .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
                 .contentType(MediaType.APPLICATION_JSON)
+                .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
                 .bodyValue(req)
-                .retrieve()
-                .bodyToMono(ChatCompletionResponse.class)
+                .exchangeToMono(resp -> resp.bodyToFlux(String.class)
+                        .collectList()
+                        .map(list -> {
+                            String payload = String.join("\n\n", list);
+                            return parseCompletion(payload);
+                        }))
                 .doOnError(ex -> log.error("璋冪敤 LLM 澶辫触", ex))
                 .onErrorResume(ex -> Mono.empty())
                 .block();
@@ -58,9 +67,10 @@
         if (response == null ||
                 response.getChoices() == null ||
                 response.getChoices().isEmpty() ||
-                response.getChoices().get(0).getMessage() == null) {
-
-            return "AI 璇婃柇澶辫触锛氭湭鑾峰彇鍒版湁鏁堝洖澶嶃��";
+                response.getChoices().get(0).getMessage() == null ||
+                response.getChoices().get(0).getMessage().getContent() == null ||
+                response.getChoices().get(0).getMessage().getContent().isEmpty()) {
+            return null;
         }
 
         return response.getChoices().get(0).getMessage().getContent();
@@ -80,7 +90,6 @@
         req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
         req.setStream(true);
 
-        System.out.println(JSON.toJSONString(req));
 
         Flux<String> flux = llmWebClient.post()
                 .uri("/chat/completions")
@@ -92,35 +101,164 @@
                 .bodyToFlux(String.class)
                 .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
 
+        AtomicBoolean doneSeen = new AtomicBoolean(false);
+        AtomicBoolean errorSeen = new AtomicBoolean(false);
+        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
+
+        Thread drain = new Thread(() -> {
+            try {
+                while (true) {
+                    String s = queue.poll(2, TimeUnit.SECONDS);
+                    if (s != null) {
+                        try { onChunk.accept(s); } catch (Exception ignore) {}
+                    }
+                    if (doneSeen.get() && queue.isEmpty()) {
+                        if (!errorSeen.get()) {
+                            try { if (onComplete != null) onComplete.run(); } catch (Exception ignore) {}
+                        }
+                        break;
+                    }
+                }
+            } catch (InterruptedException ignore) {
+                ignore.printStackTrace();
+            }
+        });
+        drain.setDaemon(true);
+        drain.start();
+
         flux.subscribe(payload -> {
-            String s = payload;
-            if (s == null || s.isEmpty()) return;
+            if (payload == null || payload.isEmpty()) return;
+            String[] events = payload.split("\\r?\\n\\r?\\n");
+            for (String part : events) {
+                String s = part;
+                if (s == null || s.isEmpty()) continue;
+                if (s.startsWith("data:")) {
+                    s = s.substring(5);
+                    if (s.startsWith(" ")) s = s.substring(1);
+                }
+                if ("[DONE]".equals(s.trim())) {
+                    doneSeen.set(true);
+                    continue;
+                }
+                try {
+                    JSONObject obj = JSON.parseObject(s);
+                    JSONArray choices = obj.getJSONArray("choices");
+                    if (choices != null && !choices.isEmpty()) {
+                        JSONObject c0 = choices.getJSONObject(0);
+                        JSONObject delta = c0.getJSONObject("delta");
+                        if (delta != null) {
+                            String content = delta.getString("content");
+                            if (content != null) {
+                                try { queue.offer(content); } catch (Exception ignore) {}
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, err -> {
+            errorSeen.set(true);
+            doneSeen.set(true);
+            if (onError != null) onError.accept(err);
+        }, () -> {
+            if (!doneSeen.get()) {
+                errorSeen.set(true);
+                doneSeen.set(true);
+                if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
+            } else {
+                doneSeen.set(true);
+            }
+        });
+    }
+
+    private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
+        if (payload == null || payload.isEmpty()) return acc;
+        String[] events = payload.split("\\r?\\n\\r?\\n");
+        for (String part : events) {
+            String s = part;
+            if (s == null || s.isEmpty()) continue;
             if (s.startsWith("data:")) {
                 s = s.substring(5);
                 if (s.startsWith(" ")) s = s.substring(1);
             }
-            // 淇濈暀妯″瀷杈撳嚭涓殑鎹㈣锛屽彧鍦ㄥ垽鏂粨鏉熸爣璁版椂蹇界暐绌虹櫧
-            if ("[DONE]".equals(s.trim())) return;
+            if ("[DONE]".equals(s.trim())) {
+                continue;
+            }
             try {
                 JSONObject obj = JSON.parseObject(s);
+                if (obj == null) continue;
                 JSONArray choices = obj.getJSONArray("choices");
                 if (choices != null && !choices.isEmpty()) {
                     JSONObject c0 = choices.getJSONObject(0);
+                    if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
+                        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
+                        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
+                        choice.setMessage(msg);
+                        java.util.ArrayList<ChatCompletionResponse.Choice> list = new java.util.ArrayList<>();
+                        list.add(choice);
+                        acc.setChoices(list);
+                    }
+                    ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
+                    ChatCompletionRequest.Message msg = choice.getMessage();
+                    if (msg.getRole() == null || msg.getRole().isEmpty()) {
+                        msg.setRole("assistant");
+                    }
                     JSONObject delta = c0.getJSONObject("delta");
                     if (delta != null) {
-                        String content = delta.getString("content");
-//                        log.info("chunk = [{}] len = {}", content, content.length());
-//                        for (char ch : content.toCharArray()) {
-//                            log.info("char: {} ({})", (int) ch, ch == '\n' ? "\\n" : ch);
-//                        }
-                        if (content != null) onChunk.accept(content);
+                        String c = delta.getString("content");
+                        if (c != null) {
+                            String prev = msg.getContent();
+                            msg.setContent(prev == null ? c : prev + c);
+                        }
+                        String role = delta.getString("role");
+                        if (role != null && !role.isEmpty()) msg.setRole(role);
                     }
+                    JSONObject message = c0.getJSONObject("message");
+                    if (message != null) {
+                        String c = message.getString("content");
+                        if (c != null) {
+                            String prev = msg.getContent();
+                            msg.setContent(prev == null ? c : prev + c);
+                        }
+                        String role = message.getString("role");
+                        if (role != null && !role.isEmpty()) msg.setRole(role);
+                    }
+                    String fr = c0.getString("finish_reason");
+                    if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
                 }
+                String id = obj.getString("id");
+                if (id != null && !id.isEmpty()) acc.setId(id);
+                Long created = obj.getLong("created");
+                if (created != null) acc.setCreated(created);
+                String object = obj.getString("object");
+                if (object != null && !object.isEmpty()) acc.setObjectName(object);
             } catch (Exception ignore) {}
-        }, err -> {
-            if (onError != null) onError.accept(err);
-        }, () -> {
-            if (onComplete != null) onComplete.run();
-        });
+        }
+        return acc;
+    }
+
+    private ChatCompletionResponse parseCompletion(String payload) {
+        if (payload == null) return null;
+        try {
+            ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
+            if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
+                return r;
+            }
+        } catch (Exception ignore) {}
+        ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
+        if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
+            return sse;
+        }
+        ChatCompletionResponse r = new ChatCompletionResponse();
+        ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
+        ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
+        msg.setRole("assistant");
+        msg.setContent(payload);
+        choice.setMessage(msg);
+        java.util.ArrayList<ChatCompletionResponse.Choice> list = new java.util.ArrayList<>();
+        list.add(choice);
+        r.setChoices(list);
+        return r;
     }
 }

--
Gitblit v1.9.1