package com.zy.ai.service;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.zy.ai.entity.ChatCompletionRequest;
|
import com.zy.ai.entity.ChatCompletionResponse;
|
import com.zy.ai.entity.LlmRouteConfig;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.MediaType;
|
import org.springframework.stereotype.Service;
|
import org.springframework.web.reactive.function.client.WebClient;
|
import reactor.core.publisher.Flux;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.function.Consumer;
|
|
@Slf4j
|
@Service
|
@RequiredArgsConstructor
|
public class LlmChatService {
|
|
private final LlmRoutingService llmRoutingService;
|
|
@Value("${llm.base-url:}")
|
private String fallbackBaseUrl;
|
|
@Value("${llm.api-key:}")
|
private String fallbackApiKey;
|
|
@Value("${llm.model:}")
|
private String fallbackModel;
|
|
@Value("${llm.thinking:false}")
|
private String fallbackThinking;
|
|
/**
|
* 通用对话方法:传入 messages,返回大模型文本回复
|
*/
|
public String chat(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens) {
|
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(false);
|
|
ChatCompletionResponse response = complete(req);
|
|
if (response == null ||
|
response.getChoices() == null ||
|
response.getChoices().isEmpty() ||
|
response.getChoices().get(0).getMessage() == null ||
|
response.getChoices().get(0).getMessage().getContent() == null ||
|
response.getChoices().get(0).getMessage().getContent().isEmpty()) {
|
return null;
|
}
|
|
return response.getChoices().get(0).getMessage().getContent();
|
}
|
|
public ChatCompletionResponse chatCompletion(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens,
|
List<Object> tools) {
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(false);
|
if (tools != null && !tools.isEmpty()) {
|
req.setTools(tools);
|
req.setTool_choice("auto");
|
}
|
return complete(req);
|
}
|
|
public ChatCompletionResponse complete(ChatCompletionRequest req) {
|
List<ResolvedRoute> routes = resolveRoutes();
|
if (routes.isEmpty()) {
|
log.error("调用 LLM 失败: 未配置可用 LLM 路由");
|
return null;
|
}
|
|
Throwable last = null;
|
for (int i = 0; i < routes.size(); i++) {
|
ResolvedRoute route = routes.get(i);
|
boolean hasNext = i < routes.size() - 1;
|
try {
|
ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, false);
|
ChatCompletionResponse resp = callCompletion(route, routeReq);
|
if (!isValidCompletion(resp)) {
|
throw new RuntimeException("LLM 响应为空");
|
}
|
markSuccess(route);
|
return resp;
|
} catch (Throwable ex) {
|
last = ex;
|
boolean quota = isQuotaExhausted(ex);
|
boolean canSwitch = shouldSwitch(route, quota);
|
markFailure(route, ex, canSwitch);
|
if (hasNext && canSwitch) {
|
log.warn("LLM 切换到下一路由, current={}, reason={}", route.tag(), errorText(ex));
|
continue;
|
}
|
log.error("调用 LLM 失败, route={}", route.tag(), ex);
|
break;
|
}
|
}
|
|
if (last != null) {
|
log.error("调用 LLM 全部路由失败: {}", errorText(last));
|
}
|
return null;
|
}
|
|
public void chatStream(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError) {
|
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(true);
|
|
streamWithFailover(req, onChunk, onComplete, onError);
|
}
|
|
public void chatStreamWithTools(List<ChatCompletionRequest.Message> messages,
|
Double temperature,
|
Integer maxTokens,
|
List<Object> tools,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError) {
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setMessages(messages);
|
req.setTemperature(temperature != null ? temperature : 0.3);
|
req.setMax_tokens(maxTokens != null ? maxTokens : 1024);
|
req.setStream(true);
|
if (tools != null && !tools.isEmpty()) {
|
req.setTools(tools);
|
req.setTool_choice("auto");
|
}
|
streamWithFailover(req, onChunk, onComplete, onError);
|
}
|
|
private void streamWithFailover(ChatCompletionRequest req,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError) {
|
List<ResolvedRoute> routes = resolveRoutes();
|
if (routes.isEmpty()) {
|
if (onError != null) onError.accept(new RuntimeException("未配置可用 LLM 路由"));
|
return;
|
}
|
attemptStream(routes, 0, req, onChunk, onComplete, onError);
|
}
|
|
private void attemptStream(List<ResolvedRoute> routes,
|
int index,
|
ChatCompletionRequest req,
|
Consumer<String> onChunk,
|
Runnable onComplete,
|
Consumer<Throwable> onError) {
|
if (index >= routes.size()) {
|
if (onError != null) onError.accept(new RuntimeException("LLM 路由全部失败"));
|
return;
|
}
|
|
ResolvedRoute route = routes.get(index);
|
ChatCompletionRequest routeReq = applyRoute(cloneRequest(req), route, true);
|
|
AtomicBoolean doneSeen = new AtomicBoolean(false);
|
AtomicBoolean errorSeen = new AtomicBoolean(false);
|
AtomicBoolean emitted = new AtomicBoolean(false);
|
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
|
|
Thread drain = new Thread(() -> {
|
try {
|
while (true) {
|
String s = queue.poll(2, TimeUnit.SECONDS);
|
if (s != null) {
|
emitted.set(true);
|
try {
|
onChunk.accept(s);
|
} catch (Exception ignore) {
|
}
|
}
|
if (doneSeen.get() && queue.isEmpty()) {
|
if (!errorSeen.get()) {
|
try {
|
if (onComplete != null) onComplete.run();
|
} catch (Exception ignore) {
|
}
|
}
|
break;
|
}
|
}
|
} catch (InterruptedException ignore) {
|
}
|
});
|
drain.setDaemon(true);
|
drain.start();
|
|
streamFlux(route, routeReq).subscribe(payload -> {
|
if (payload == null || payload.isEmpty()) return;
|
String[] events = payload.split("\\r?\\n\\r?\\n");
|
for (String part : events) {
|
String s = part;
|
if (s == null || s.isEmpty()) continue;
|
if (s.startsWith("data:")) {
|
s = s.substring(5);
|
if (s.startsWith(" ")) s = s.substring(1);
|
}
|
if ("[DONE]".equals(s.trim())) {
|
doneSeen.set(true);
|
continue;
|
}
|
try {
|
JSONObject obj = JSON.parseObject(s);
|
JSONArray choices = obj.getJSONArray("choices");
|
if (choices != null && !choices.isEmpty()) {
|
JSONObject c0 = choices.getJSONObject(0);
|
JSONObject delta = c0.getJSONObject("delta");
|
if (delta != null) {
|
String content = delta.getString("content");
|
if (content != null) {
|
queue.offer(content);
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.warn("解析 LLM stream 片段失败: {}", e.getMessage());
|
}
|
}
|
}, err -> {
|
errorSeen.set(true);
|
doneSeen.set(true);
|
boolean quota = isQuotaExhausted(err);
|
boolean canSwitch = shouldSwitch(route, quota);
|
markFailure(route, err, canSwitch);
|
if (!emitted.get() && canSwitch && index < routes.size() - 1) {
|
log.warn("LLM 路由失败,自动切换,current={}, reason={}", route.tag(), errorText(err));
|
attemptStream(routes, index + 1, req, onChunk, onComplete, onError);
|
return;
|
}
|
if (onError != null) onError.accept(err);
|
}, () -> {
|
if (!doneSeen.get()) {
|
RuntimeException ex = new RuntimeException("LLM 流意外完成");
|
errorSeen.set(true);
|
doneSeen.set(true);
|
boolean canSwitch = shouldSwitch(route, false);
|
markFailure(route, ex, canSwitch);
|
if (!emitted.get() && canSwitch && index < routes.size() - 1) {
|
log.warn("LLM 路由流异常完成,自动切换,current={}", route.tag());
|
attemptStream(routes, index + 1, req, onChunk, onComplete, onError);
|
} else {
|
if (onError != null) onError.accept(ex);
|
}
|
} else {
|
markSuccess(route);
|
doneSeen.set(true);
|
}
|
});
|
}
|
|
private Flux<String> streamFlux(ResolvedRoute route, ChatCompletionRequest req) {
|
WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
|
return client.post()
|
.uri("/chat/completions")
|
.header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
|
.contentType(MediaType.APPLICATION_JSON)
|
.accept(MediaType.TEXT_EVENT_STREAM)
|
.bodyValue(req)
|
.exchangeToFlux(resp -> {
|
int status = resp.rawStatusCode();
|
if (status >= 200 && status < 300) {
|
return resp.bodyToFlux(String.class);
|
}
|
return resp.bodyToMono(String.class)
|
.defaultIfEmpty("")
|
.flatMapMany(body -> Flux.error(new LlmRouteException(status, body)));
|
})
|
.doOnError(ex -> log.error("调用 LLM 流式失败, route={}", route.tag(), ex));
|
}
|
|
private ChatCompletionResponse callCompletion(ResolvedRoute route, ChatCompletionRequest req) {
|
WebClient client = WebClient.builder().baseUrl(route.baseUrl).build();
|
RawCompletionResult raw = client.post()
|
.uri("/chat/completions")
|
.header(HttpHeaders.AUTHORIZATION, "Bearer " + route.apiKey)
|
.contentType(MediaType.APPLICATION_JSON)
|
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
|
.bodyValue(req)
|
.exchangeToMono(resp -> resp.bodyToFlux(String.class)
|
.collectList()
|
.map(list -> new RawCompletionResult(resp.rawStatusCode(), String.join("\\n\\n", list))))
|
.block();
|
|
if (raw == null) {
|
throw new RuntimeException("LLM 返回为空");
|
}
|
if (raw.statusCode < 200 || raw.statusCode >= 300) {
|
throw new LlmRouteException(raw.statusCode, raw.payload);
|
}
|
return parseCompletion(raw.payload);
|
}
|
|
private ChatCompletionRequest applyRoute(ChatCompletionRequest req, ResolvedRoute route, boolean stream) {
|
req.setModel(route.model);
|
req.setStream(stream);
|
if (route.thinkingEnabled) {
|
ChatCompletionRequest.Thinking t = new ChatCompletionRequest.Thinking();
|
t.setType("enable");
|
req.setThinking(t);
|
} else {
|
req.setThinking(null);
|
}
|
return req;
|
}
|
|
private ChatCompletionRequest cloneRequest(ChatCompletionRequest src) {
|
ChatCompletionRequest req = new ChatCompletionRequest();
|
req.setModel(src.getModel());
|
req.setMessages(src.getMessages());
|
req.setTemperature(src.getTemperature());
|
req.setMax_tokens(src.getMax_tokens());
|
req.setStream(src.getStream());
|
req.setTools(src.getTools());
|
req.setTool_choice(src.getTool_choice());
|
req.setThinking(src.getThinking());
|
return req;
|
}
|
|
private boolean isValidCompletion(ChatCompletionResponse response) {
|
if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) {
|
return false;
|
}
|
ChatCompletionRequest.Message message = response.getChoices().get(0).getMessage();
|
if (message == null) {
|
return false;
|
}
|
if (!isBlank(message.getContent())) {
|
return true;
|
}
|
return message.getTool_calls() != null && !message.getTool_calls().isEmpty();
|
}
|
|
private boolean shouldSwitch(ResolvedRoute route, boolean quota) {
|
return quota ? route.switchOnQuota : route.switchOnError;
|
}
|
|
private void markSuccess(ResolvedRoute route) {
|
if (route.id != null) {
|
llmRoutingService.markSuccess(route.id);
|
}
|
}
|
|
private void markFailure(ResolvedRoute route, Throwable ex, boolean enterCooldown) {
|
if (route.id != null) {
|
llmRoutingService.markFailure(route.id, errorText(ex), enterCooldown, route.cooldownSeconds);
|
}
|
}
|
|
private String errorText(Throwable ex) {
|
if (ex == null) return "unknown";
|
if (ex instanceof LlmRouteException) {
|
LlmRouteException e = (LlmRouteException) ex;
|
String body = e.body == null ? "" : e.body;
|
if (body.length() > 240) {
|
body = body.substring(0, 240);
|
}
|
return "status=" + e.statusCode + ", body=" + body;
|
}
|
return ex.getMessage() == null ? ex.toString() : ex.getMessage();
|
}
|
|
private boolean isQuotaExhausted(Throwable ex) {
|
if (!(ex instanceof LlmRouteException)) return false;
|
LlmRouteException e = (LlmRouteException) ex;
|
if (e.statusCode == 429) return true;
|
String text = (e.body == null ? "" : e.body).toLowerCase();
|
return text.contains("insufficient_quota")
|
|| text.contains("quota")
|
|| text.contains("余额")
|
|| text.contains("用量")
|
|| text.contains("超限")
|
|| text.contains("rate limit");
|
}
|
|
private List<ResolvedRoute> resolveRoutes() {
|
List<ResolvedRoute> routes = new ArrayList<>();
|
List<LlmRouteConfig> dbRoutes = llmRoutingService.listAvailableRoutes();
|
for (LlmRouteConfig c : dbRoutes) {
|
routes.add(ResolvedRoute.fromDb(c));
|
}
|
// 兼容:数据库为空时,回退到 yml
|
if (routes.isEmpty() && !isBlank(fallbackBaseUrl) && !isBlank(fallbackApiKey) && !isBlank(fallbackModel)) {
|
routes.add(ResolvedRoute.fromFallback(fallbackBaseUrl, fallbackApiKey, fallbackModel, isFallbackThinkingEnabled()));
|
}
|
return routes;
|
}
|
|
private boolean isFallbackThinkingEnabled() {
|
String x = fallbackThinking == null ? "" : fallbackThinking.trim().toLowerCase();
|
return "true".equals(x) || "1".equals(x) || "enable".equals(x);
|
}
|
|
private boolean isBlank(String s) {
|
return s == null || s.trim().isEmpty();
|
}
|
|
private ChatCompletionResponse mergeSseChunk(ChatCompletionResponse acc, String payload) {
|
if (payload == null || payload.isEmpty()) return acc;
|
String[] events = payload.split("\\r?\\n\\r?\\n");
|
for (String part : events) {
|
String s = part;
|
if (s == null || s.isEmpty()) continue;
|
if (s.startsWith("data:")) {
|
s = s.substring(5);
|
if (s.startsWith(" ")) s = s.substring(1);
|
}
|
if ("[DONE]".equals(s.trim())) {
|
continue;
|
}
|
try {
|
JSONObject obj = JSON.parseObject(s);
|
if (obj == null) continue;
|
JSONArray choices = obj.getJSONArray("choices");
|
if (choices != null && !choices.isEmpty()) {
|
JSONObject c0 = choices.getJSONObject(0);
|
if (acc.getChoices() == null || acc.getChoices().isEmpty()) {
|
ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
|
ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
|
choice.setMessage(msg);
|
ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
|
list.add(choice);
|
acc.setChoices(list);
|
}
|
ChatCompletionResponse.Choice choice = acc.getChoices().get(0);
|
ChatCompletionRequest.Message msg = choice.getMessage();
|
if (msg.getRole() == null || msg.getRole().isEmpty()) {
|
msg.setRole("assistant");
|
}
|
JSONObject delta = c0.getJSONObject("delta");
|
if (delta != null) {
|
String c = delta.getString("content");
|
if (c != null) {
|
String prev = msg.getContent();
|
msg.setContent(prev == null ? c : prev + c);
|
}
|
String role = delta.getString("role");
|
if (role != null && !role.isEmpty()) msg.setRole(role);
|
}
|
JSONObject message = c0.getJSONObject("message");
|
if (message != null) {
|
String c = message.getString("content");
|
if (c != null) {
|
String prev = msg.getContent();
|
msg.setContent(prev == null ? c : prev + c);
|
}
|
String role = message.getString("role");
|
if (role != null && !role.isEmpty()) msg.setRole(role);
|
}
|
String fr = c0.getString("finish_reason");
|
if (fr != null && !fr.isEmpty()) choice.setFinishReason(fr);
|
}
|
String id = obj.getString("id");
|
if (id != null && !id.isEmpty()) acc.setId(id);
|
Long created = obj.getLong("created");
|
if (created != null) acc.setCreated(created);
|
String object = obj.getString("object");
|
if (object != null && !object.isEmpty()) acc.setObjectName(object);
|
} catch (Exception ignore) {
|
}
|
}
|
return acc;
|
}
|
|
private ChatCompletionResponse parseCompletion(String payload) {
|
if (payload == null) return null;
|
try {
|
ChatCompletionResponse r = JSON.parseObject(payload, ChatCompletionResponse.class);
|
if (r != null && r.getChoices() != null && !r.getChoices().isEmpty() && r.getChoices().get(0).getMessage() != null) {
|
return r;
|
}
|
} catch (Exception ignore) {
|
}
|
ChatCompletionResponse sse = mergeSseChunk(new ChatCompletionResponse(), payload);
|
if (sse.getChoices() != null && !sse.getChoices().isEmpty() && sse.getChoices().get(0).getMessage() != null && sse.getChoices().get(0).getMessage().getContent() != null) {
|
return sse;
|
}
|
ChatCompletionResponse r = new ChatCompletionResponse();
|
ChatCompletionResponse.Choice choice = new ChatCompletionResponse.Choice();
|
ChatCompletionRequest.Message msg = new ChatCompletionRequest.Message();
|
msg.setRole("assistant");
|
msg.setContent(payload);
|
choice.setMessage(msg);
|
ArrayList<ChatCompletionResponse.Choice> list = new ArrayList<>();
|
list.add(choice);
|
r.setChoices(list);
|
return r;
|
}
|
|
private static class RawCompletionResult {
|
private final int statusCode;
|
private final String payload;
|
|
private RawCompletionResult(int statusCode, String payload) {
|
this.statusCode = statusCode;
|
this.payload = payload;
|
}
|
}
|
|
private static class LlmRouteException extends RuntimeException {
|
private final int statusCode;
|
private final String body;
|
|
private LlmRouteException(int statusCode, String body) {
|
super("http status=" + statusCode);
|
this.statusCode = statusCode;
|
this.body = body;
|
}
|
}
|
|
private static class ResolvedRoute {
|
private Long id;
|
private String name;
|
private String baseUrl;
|
private String apiKey;
|
private String model;
|
private boolean thinkingEnabled;
|
private boolean switchOnQuota;
|
private boolean switchOnError;
|
private Integer cooldownSeconds;
|
|
private static ResolvedRoute fromDb(LlmRouteConfig c) {
|
ResolvedRoute r = new ResolvedRoute();
|
r.id = c.getId();
|
r.name = c.getName();
|
r.baseUrl = c.getBaseUrl();
|
r.apiKey = c.getApiKey();
|
r.model = c.getModel();
|
r.thinkingEnabled = c.getThinking() != null && c.getThinking() == 1;
|
r.switchOnQuota = c.getSwitchOnQuota() == null || c.getSwitchOnQuota() == 1;
|
r.switchOnError = c.getSwitchOnError() == null || c.getSwitchOnError() == 1;
|
r.cooldownSeconds = c.getCooldownSeconds();
|
return r;
|
}
|
|
private static ResolvedRoute fromFallback(String baseUrl, String apiKey, String model, boolean thinkingEnabled) {
|
ResolvedRoute r = new ResolvedRoute();
|
r.name = "fallback-yml";
|
r.baseUrl = baseUrl;
|
r.apiKey = apiKey;
|
r.model = model;
|
r.thinkingEnabled = thinkingEnabled;
|
r.switchOnQuota = true;
|
r.switchOnError = true;
|
r.cooldownSeconds = 300;
|
return r;
|
}
|
|
private String tag() {
|
String showName = name == null ? "unnamed" : name;
|
String showModel = model == null ? "" : (" model=" + model);
|
return showName + showModel;
|
}
|
}
|
}
|