package com.vincent.rsf.server.ai.service;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.vincent.rsf.server.ai.config.AiProperties;
|
import com.vincent.rsf.server.ai.dto.GatewayChatRequest;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.io.BufferedReader;
|
import java.io.IOException;
|
import java.io.InputStream;
|
import java.io.InputStreamReader;
|
import java.io.OutputStream;
|
import java.net.HttpURLConnection;
|
import java.net.URL;
|
import java.nio.charset.StandardCharsets;
|
|
@Component
|
public class AiGatewayClient {
|
|
@Resource
|
private AiProperties aiProperties;
|
@Resource
|
private ObjectMapper objectMapper;
|
|
public interface StreamCallback {
|
/**
|
* 处理网关返回的一条 NDJSON 事件。
|
* 返回 true 表示继续消费后续事件,返回 false 表示主动停止本次流读取。
|
*/
|
boolean handle(JsonNode event) throws Exception;
|
}
|
|
/**
|
* 调用 AI 网关的内部流式接口,并将网关返回的事件逐条回调给上层编排逻辑。
|
* 这里屏蔽了 HTTP 细节,调用方只需要关注 delta / done / error 事件本身。
|
*/
|
public void stream(GatewayChatRequest request, StreamCallback callback) throws Exception {
|
HttpURLConnection connection = null;
|
boolean terminalEventReceived = false;
|
try {
|
String url = aiProperties.getGatewayBaseUrl() + "/internal/chat/stream";
|
connection = (HttpURLConnection) new URL(url).openConnection();
|
connection.setRequestMethod("POST");
|
connection.setDoOutput(true);
|
connection.setConnectTimeout(10000);
|
connection.setReadTimeout(0);
|
connection.setRequestProperty("Content-Type", "application/json");
|
connection.setRequestProperty("Accept", "application/x-ndjson");
|
try (OutputStream outputStream = connection.getOutputStream()) {
|
outputStream.write(objectMapper.writeValueAsBytes(request));
|
outputStream.flush();
|
}
|
InputStream inputStream = connection.getResponseCode() >= 400 ? connection.getErrorStream() : connection.getInputStream();
|
if (inputStream == null) {
|
return;
|
}
|
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
|
String line;
|
while ((line = reader.readLine()) != null) {
|
if (line.trim().isEmpty()) {
|
continue;
|
}
|
JsonNode event = objectMapper.readTree(line);
|
String type = event.path("type").asText();
|
if ("done".equals(type) || "error".equals(type)) {
|
terminalEventReceived = true;
|
}
|
if (!callback.handle(event)) {
|
break;
|
}
|
}
|
}
|
if (!terminalEventReceived) {
|
throw new IOException("AI网关流异常中断");
|
}
|
} finally {
|
if (connection != null) {
|
connection.disconnect();
|
}
|
}
|
}
|
|
}
|