From ba0dae92a0eb14b378fd44c91ed2ccb481c164e3 Mon Sep 17 00:00:00 2001
From: Junjie <DELL@qq.com>
Date: 星期一, 15 十二月 2025 09:53:06 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/ai/service/LlmChatService.java | 91 +++++++++++++++++++++++++++++++++------------
1 files changed, 67 insertions(+), 24 deletions(-)
diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index 5e709af..5077247 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -14,6 +14,9 @@
import java.util.List;
import java.util.function.Consumer;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
@@ -48,7 +51,7 @@
.uri("/chat/completions")
.header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
.contentType(MediaType.APPLICATION_JSON)
- .bodyValue(req) // 2.5.14 宸叉敮鎸� bodyValue
+ .bodyValue(req)
.retrieve()
.bodyToMono(ChatCompletionResponse.class)
.doOnError(ex -> log.error("璋冪敤 LLM 澶辫触", ex))
@@ -80,6 +83,7 @@
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(true);
+
Flux<String> flux = llmWebClient.post()
.uri("/chat/completions")
.header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
@@ -90,35 +94,74 @@
.bodyToFlux(String.class)
.doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
- flux.subscribe(payload -> {
- String s = payload;
- if (s == null || s.isEmpty()) return;
- if (s.startsWith("data:")) {
- s = s.substring(5);
- if (s.startsWith(" ")) s = s.substring(1);
- }
- // 淇濈暀妯″瀷杈撳嚭涓殑鎹㈣锛屽彧鍦ㄥ垽鏂粨鏉熸爣璁版椂蹇界暐绌虹櫧
- if ("[DONE]".equals(s.trim())) return;
+ AtomicBoolean doneSeen = new AtomicBoolean(false);
+ AtomicBoolean errorSeen = new AtomicBoolean(false);
+ LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
+
+ Thread drain = new Thread(() -> {
try {
- JSONObject obj = JSON.parseObject(s);
- JSONArray choices = obj.getJSONArray("choices");
- if (choices != null && !choices.isEmpty()) {
- JSONObject c0 = choices.getJSONObject(0);
- JSONObject delta = c0.getJSONObject("delta");
- if (delta != null) {
- String content = delta.getString("content");
-// log.info("chunk = [{}] len = {}", content, content.length());
-// for (char ch : content.toCharArray()) {
-// log.info("char: {} ({})", (int) ch, ch == '\n' ? "\\n" : ch);
-// }
- if (content != null) onChunk.accept(content);
+ while (true) {
+ String s = queue.poll(2, TimeUnit.SECONDS);
+ if (s != null) {
+ try { onChunk.accept(s); } catch (Exception ignore) {}
+ }
+ if (doneSeen.get() && queue.isEmpty()) {
+ if (!errorSeen.get()) {
+ try { if (onComplete != null) onComplete.run(); } catch (Exception ignore) {}
+ }
+ break;
}
}
- } catch (Exception ignore) {}
+ } catch (InterruptedException ignore) {
+ ignore.printStackTrace();
+ }
+ });
+ drain.setDaemon(true);
+ drain.start();
+
+ flux.subscribe(payload -> {
+ if (payload == null || payload.isEmpty()) return;
+ String[] events = payload.split("\\r?\\n\\r?\\n");
+ for (String part : events) {
+ String s = part;
+ if (s == null || s.isEmpty()) continue;
+ if (s.startsWith("data:")) {
+ s = s.substring(5);
+ if (s.startsWith(" ")) s = s.substring(1);
+ }
+ if ("[DONE]".equals(s.trim())) {
+ doneSeen.set(true);
+ continue;
+ }
+ try {
+ JSONObject obj = JSON.parseObject(s);
+ JSONArray choices = obj.getJSONArray("choices");
+ if (choices != null && !choices.isEmpty()) {
+ JSONObject c0 = choices.getJSONObject(0);
+ JSONObject delta = c0.getJSONObject("delta");
+ if (delta != null) {
+ String content = delta.getString("content");
+ if (content != null) {
+ try { queue.offer(content); } catch (Exception ignore) {}
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}, err -> {
+ errorSeen.set(true);
+ doneSeen.set(true);
if (onError != null) onError.accept(err);
}, () -> {
- if (onComplete != null) onComplete.run();
+ if (!doneSeen.get()) {
+ errorSeen.set(true);
+ doneSeen.set(true);
+ if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
+ } else {
+ doneSeen.set(true);
+ }
});
}
}
--
Gitblit v1.9.1