From 52cbfeb0c93770530965955ca861f3d0a2bedd66 Mon Sep 17 00:00:00 2001
From: dubin <bindu_bean@163.com>
Date: 星期一, 12 一月 2026 11:01:27 +0800
Subject: [PATCH] 添加重量信息
---
src/main/java/com/zy/ai/service/LlmChatService.java | 428 +++++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 411 insertions(+), 17 deletions(-)
diff --git a/src/main/java/com/zy/ai/service/LlmChatService.java b/src/main/java/com/zy/ai/service/LlmChatService.java
index e8c7a77..431896c 100644
--- a/src/main/java/com/zy/ai/service/LlmChatService.java
+++ b/src/main/java/com/zy/ai/service/LlmChatService.java
@@ -12,8 +12,12 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
+import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
@@ -31,6 +35,9 @@
@Value("${llm.model}")
private String model;
+ @Value("${llm.pythonPlatformUrl}")
+ private String pythonPlatformUrl;
+
/**
* 閫氱敤瀵硅瘽鏂规硶锛氫紶鍏� messages锛岃繑鍥炲ぇ妯″瀷鏂囨湰鍥炲
*/
@@ -43,14 +50,20 @@
req.setMessages(messages);
req.setTemperature(temperature != null ? temperature : 0.3);
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
+ req.setStream(false);
ChatCompletionResponse response = llmWebClient.post()
.uri("/chat/completions")
.header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
.contentType(MediaType.APPLICATION_JSON)
- .bodyValue(req) // 2.5.14 宸叉敮鎸� bodyValue
- .retrieve()
- .bodyToMono(ChatCompletionResponse.class)
+ .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
+ .bodyValue(req)
+ .exchangeToMono(resp -> resp.bodyToFlux(String.class)
+ .collectList()
+ .map(list -> {
+ String payload = String.join("\n\n", list);
+ return parseCompletion(payload);
+ }))
.doOnError(ex -> log.error("璋冪敤 LLM 澶辫触", ex))
.onErrorResume(ex -> Mono.empty())
.block();
@@ -58,12 +71,54 @@
if (response == null ||
response.getChoices() == null ||
response.getChoices().isEmpty() ||
- response.getChoices().get(0).getMessage() == null) {
-
- return "AI 璇婃柇澶辫触锛氭湭鑾峰彇鍒版湁鏁堝洖澶嶃��";
+ response.getChoices().get(0).getMessage() == null ||
+ response.getChoices().get(0).getMessage().getContent() == null ||
+ response.getChoices().get(0).getMessage().getContent().isEmpty()) {
+ return null;
}
return response.getChoices().get(0).getMessage().getContent();
+ }
+
+ public ChatCompletionResponse chatCompletion(List<ChatCompletionRequest.Message> messages,
+ Double temperature,
+ Integer maxTokens,
+ List<Object> tools) {
+
+ ChatCompletionRequest req = new ChatCompletionRequest();
+ req.setModel(model);
+ req.setMessages(messages);
+ req.setTemperature(temperature != null ? temperature : 0.3);
+ req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
+ req.setStream(false);
+ if (tools != null && !tools.isEmpty()) {
+ req.setTools(tools);
+ req.setTool_choice("auto");
+ }
+ return complete(req);
+ }
+
+ public ChatCompletionResponse complete(ChatCompletionRequest req) {
+ try {
+ return llmWebClient.post()
+ .uri("/chat/completions")
+ .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
+ .bodyValue(req)
+ .exchangeToMono(resp -> resp.bodyToFlux(String.class)
+ .collectList()
+ .map(list -> {
+ String payload = String.join("\n\n", list);
+ return parseCompletion(payload);
+ }))
+ .doOnError(ex -> log.error("璋冪敤 LLM 澶辫触", ex))
+ .onErrorResume(ex -> Mono.empty())
+ .block();
+ } catch (Exception e) {
+ log.error("璋冪敤 LLM 澶辫触", e);
+ return null;
+ }
}
public void chatStream(List<ChatCompletionRequest.Message> messages,
@@ -80,6 +135,7 @@
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
req.setStream(true);
+
Flux<String> flux = llmWebClient.post()
.uri("/chat/completions")
.header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
@@ -90,27 +146,365 @@
.bodyToFlux(String.class)
.doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
+ AtomicBoolean doneSeen = new AtomicBoolean(false);
+ AtomicBoolean errorSeen = new AtomicBoolean(false);
+ LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
+
+ Thread drain = new Thread(() -> {
+ try {
+ while (true) {
+ String s = queue.poll(2, TimeUnit.SECONDS);
+ if (s != null) {
+ try { onChunk.accept(s); } catch (Exception ignore) {}
+ }
+ if (doneSeen.get() && queue.isEmpty()) {
+ if (!errorSeen.get()) {
+ try { if (onComplete != null) onComplete.run(); } catch (Exception ignore) {}
+ }
+ break;
+ }
+ }
+ } catch (InterruptedException ignore) {
+ ignore.printStackTrace();
+ }
+ });
+ drain.setDaemon(true);
+ drain.start();
+
flux.subscribe(payload -> {
- String s = payload == null ? null : payload.trim();
- if (s == null || s.isEmpty()) return;
- if (s.startsWith("data:")) s = s.substring(5).trim();
- if ("[DONE]".equals(s)) return;
+ if (payload == null || payload.isEmpty()) return;
+ String[] events = payload.split("\\r?\\n\\r?\\n");
+ for (String part : events) {
+ String s = part;
+ if (s == null || s.isEmpty()) continue;
+ if (s.startsWith("data:")) {
+ s = s.substring(5);
+ if (s.startsWith(" ")) s = s.substring(1);
+ }
+ if ("[DONE]".equals(s.trim())) {
+ doneSeen.set(true);
+ continue;
+ }
+ try {
+ JSONObject obj = JSON.parseObject(s);
+ JSONArray choices = obj.getJSONArray("choices");
+ if (choices != null && !choices.isEmpty()) {
+ JSONObject c0 = choices.getJSONObject(0);
+ JSONObject delta = c0.getJSONObject("delta");
+ if (delta != null) {
+ String content = delta.getString("content");
+ if (content != null) {
+ try { queue.offer(content); } catch (Exception ignore) {}
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, err -> {
+ errorSeen.set(true);
+ doneSeen.set(true);
+ if (onError != null) onError.accept(err);
+ }, () -> {
+ if (!doneSeen.get()) {
+ errorSeen.set(true);
+ doneSeen.set(true);
+ if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
+ } else {
+ doneSeen.set(true);
+ }
+ });
+ }
+
+ public void chatStreamWithTools(List<ChatCompletionRequest.Message> messages,
+ Double temperature,
+ Integer maxTokens,
+ List<Object> tools,
+ Consumer<String> onChunk,
+ Runnable onComplete,
+ Consumer<Throwable> onError) {
+
+ ChatCompletionRequest req = new ChatCompletionRequest();
+ req.setModel(model);
+ req.setMessages(messages);
+ req.setTemperature(temperature != null ? temperature : 0.3);
+ req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
+ req.setStream(true);
+ if (tools != null && !tools.isEmpty()) {
+ req.setTools(tools);
+ req.setTool_choice("auto");
+ }
+
+ Flux<String> flux = llmWebClient.post()
+ .uri("/chat/completions")
+ .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .bodyValue(req)
+ .retrieve()
+ .bodyToFlux(String.class)
+ .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
+
+ AtomicBoolean doneSeen = new AtomicBoolean(false);
+ AtomicBoolean errorSeen = new AtomicBoolean(false);
+ LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
+
+ Thread drain = new Thread(() -> {
+ try {
+ while (true) {
+ String s = queue.poll(5, TimeUnit.SECONDS);
+ if (s != null) {
+ try { onChunk.accept(s); } catch (Exception ignore) {}
+ }
+ if (doneSeen.get() && queue.isEmpty()) {
+ if (!errorSeen.get()) {
+ try { if (onComplete != null) onComplete.run(); } catch (Exception ignore) {}
+ }
+ break;
+ }
+ }
+ } catch (InterruptedException ignore) {
+ ignore.printStackTrace();
+ }
+ });
+ drain.setDaemon(true);
+ drain.start();
+
+ flux.subscribe(payload -> {
+ if (payload == null || payload.isEmpty()) return;
+ String[] events = payload.split("\\r?\\n\\r?\\n");
+ for (String part : events) {
+ String s = part;
+ if (s == null || s.isEmpty()) continue;
+ if (s.startsWith("data:")) {
+ s = s.substring(5);
+ if (s.startsWith(" ")) s = s.substring(1);
+ }
+ if ("[DONE]".equals(s.trim())) {
+ doneSeen.set(true);
+ continue;
+ }
+ try {
+ JSONObject obj = JSON.parseObject(s);
+ JSONArray choices = obj.getJSONArray("choices");
+ if (choices != null && !choices.isEmpty()) {
+ JSONObject c0 = choices.getJSONObject(0);
+ JSONObject delta = c0.getJSONObject("delta");
+ if (delta != null) {
+ String content = delta.getString("content");
+ if (content != null) {
+ try { queue.offer(content); } catch (Exception ignore) {}
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, err -> {
+ errorSeen.set(true);
+ doneSeen.set(true);
+ if (onError != null) onError.accept(err);
+ }, () -> {
+ if (!doneSeen.get()) {
+ errorSeen.set(true);
+ doneSeen.set(true);
+ if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
+ } else {
+ doneSeen.set(true);
+ }
+ });
+ }
+
+ public void chatStreamRunPython(String prompt, String chatId, Consumer<String> onChunk,
+ Runnable onComplete,
+ Consumer<Throwable> onError) {
+ HashMap<String, Object> req = new HashMap<>();
+ req.put("prompt", prompt);
+ req.put("chatId", chatId);
+
+ Flux<String> flux = llmWebClient.post()
+ .uri(pythonPlatformUrl)
+ .header(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .bodyValue(req)
+ .retrieve()
+ .bodyToFlux(String.class)
+ .doOnError(ex -> log.error("璋冪敤 LLM 娴佸紡澶辫触", ex));
+
+ AtomicBoolean doneSeen = new AtomicBoolean(false);
+ AtomicBoolean errorSeen = new AtomicBoolean(false);
+ LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
+
+ Thread drain = new Thread(() -> {
+ try {
+ while (true) {
+ String s = queue.poll(2, TimeUnit.SECONDS);
+ if (s != null) {
+ try {
+ onChunk.accept(s);
+ } catch (Exception ignore) {
+ }
+ }
+ if (doneSeen.get() && queue.isEmpty()) {
+ if (!errorSeen.get()) {
+ try {
+ if (onComplete != null) onComplete.run();
+ } catch (Exception ignore) {
+ }
+ }
+ break;
+ }
+ }
+ } catch (InterruptedException ignore) {
+ ignore.printStackTrace();
+ }
+ });
+ drain.setDaemon(true);
+ drain.start();
+
+ flux.subscribe(payload -> {
+ if (payload == null || payload.isEmpty()) return;
+ String[] events = payload.split("\\r?\\n\\r?\\n");
+ for (String part : events) {
+ String s = part;
+ if (s == null || s.isEmpty()) continue;
+ if (s.startsWith("data:")) {
+ s = s.substring(5);
+ if (s.startsWith(" ")) s = s.substring(1);
+ }
+ if ("[DONE]".equals(s.trim())) {
+ doneSeen.set(true);
+ continue;
+ }
+ if("<think>".equals(s.trim()) || "</think>".equals(s.trim())) {
+ queue.offer(s.trim());
+ continue;
+ }
+ try {
+ JSONObject obj = JSON.parseObject(s);
+ JSONArray choices = obj.getJSONArray("choices");
+ if (choices != null && !choices.isEmpty()) {
+ JSONObject c0 = choices.getJSONObject(0);
+ JSONObject delta = c0.getJSONObject("delta");
+ if (delta != null) {
+ String content = delta.getString("content");
+ if (content != null) {
+ try {
+ queue.offer(content);
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }, err -> {
+ errorSeen.set(true);
+ doneSeen.set(true);
+ if (onError != null) onError.accept(err);
+ }, () -> {
+ if (!doneSeen.get()) {
+ errorSeen.set(true);
+ doneSeen.set(true);
+ if (onError != null) onError.accept(new RuntimeException("LLM 娴佹剰澶栧畬鎴�"));
+ } else {
+ doneSeen.set(true);
+ }
+ });
+ }
+
+ private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
+ if (payload == null || payload.isEmpty()) return acc;
+ String[] events = payload.split("\\r?\\n\\r?\\n");
+ for (String part : events) {
+ String s = part;
+ if (s == null || s.isEmpty()) continue;
+ if (s.startsWith("data:")) {
+ s = s.substring(5);
+ if (s.startsWith(" ")) s = s.substring(1);
+ }
+ if ("[DONE]".equals(s.trim())) {
+ continue;
+ }
try {
JSONObject obj = JSON.parseObject(s);
+ if (obj == null) continue;
JSONArray choices = obj.getJSONArray("choices");
if (choices != null && !choices.isEmpty()) {
JSONObject c0 = choices.getJSONObject(0);
+ if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
+ ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
+ ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
+ choice.setMessage(msg);
+ java.util.ArrayList<ChatCompletionResponse.Choice> list = new java.util.ArrayList<>();
+ list.add(choice);
+ acc.setChoices(list);
+ }
+ ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
+ ChatCompletionRequest.Message msg = choice.getMessage();
+ if (msg.getRole() == null || msg.getRole().isEmpty()) {
+ msg.setRole("assistant");
+ }
JSONObject delta = c0.getJSONObject("delta");
if (delta != null) {
- String content = delta.getString("content");
- if (content != null) onChunk.accept(content);
+ String c = delta.getString("content");
+ if (c != null) {
+ String prev = msg.getContent();
+ msg.setContent(prev == null ? c : prev + c);
+ }
+ String role = delta.getString("role");
+ if (role != null && !role.isEmpty()) msg.setRole(role);
}
+ JSONObject message = c0.getJSONObject("message");
+ if (message != null) {
+ String c = message.getString("content");
+ if (c != null) {
+ String prev = msg.getContent();
+ msg.setContent(prev == null ? c : prev + c);
+ }
+ String role = message.getString("role");
+ if (role != null && !role.isEmpty()) msg.setRole(role);
+ }
+ String fr = c0.getString("finish_reason");
+ if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
}
+ String id = obj.getString("id");
+ if (id != null && !id.isEmpty()) acc.setId(id);
+ Long created = obj.getLong("created");
+ if (created != null) acc.setCreated(created);
+ String object = obj.getString("object");
+ if (object != null && !object.isEmpty()) acc.setObjectName(object);
} catch (Exception ignore) {}
- }, err -> {
- if (onError != null) onError.accept(err);
- }, () -> {
- if (onComplete != null) onComplete.run();
- });
+ }
+ return acc;
+ }
+
+ private ChatCompletionResponse parseCompletion(String payload) {
+ if (payload == null) return null;
+ try {
+ ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
+ if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
+ return r;
+ }
+ } catch (Exception ignore) {}
+ ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
+ if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
+ return sse;
+ }
+ ChatCompletionResponse r = new ChatCompletionResponse();
+ ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
+ ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
+ msg.setRole("assistant");
+ msg.setContent(payload);
+ choice.setMessage(msg);
+ java.util.ArrayList<ChatCompletionResponse.Choice> list = new java.util.ArrayList<>();
+ list.add(choice);
+ r.setChoices(list);
+ return r;
}
}
--
Gitblit v1.9.1