package com.vincent.rsf.server.ai.service; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.vincent.rsf.server.ai.config.AiProperties; import com.vincent.rsf.server.ai.dto.GatewayChatRequest; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; @Component public class AiGatewayClient { @Resource private AiProperties aiProperties; @Resource private ObjectMapper objectMapper; public interface StreamCallback { /** * 处理网关返回的一条 NDJSON 事件。 * 返回 true 表示继续消费后续事件,返回 false 表示主动停止本次流读取。 */ boolean handle(JsonNode event) throws Exception; } /** * 调用 AI 网关的内部流式接口,并将网关返回的事件逐条回调给上层编排逻辑。 * 这里屏蔽了 HTTP 细节,调用方只需要关注 delta / done / error 事件本身。 */ public void stream(GatewayChatRequest request, StreamCallback callback) throws Exception { HttpURLConnection connection = null; boolean terminalEventReceived = false; try { String url = aiProperties.getGatewayBaseUrl() + "/internal/chat/stream"; connection = (HttpURLConnection) new URL(url).openConnection(); connection.setRequestMethod("POST"); connection.setDoOutput(true); connection.setConnectTimeout(10000); connection.setReadTimeout(0); connection.setRequestProperty("Content-Type", "application/json"); connection.setRequestProperty("Accept", "application/x-ndjson"); try (OutputStream outputStream = connection.getOutputStream()) { outputStream.write(objectMapper.writeValueAsBytes(request)); outputStream.flush(); } InputStream inputStream = connection.getResponseCode() >= 400 ? connection.getErrorStream() : connection.getInputStream(); if (inputStream == null) { return; } try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null) { if (line.trim().isEmpty()) { continue; } JsonNode event = objectMapper.readTree(line); String type = event.path("type").asText(); if ("done".equals(type) || "error".equals(type)) { terminalEventReceived = true; } if (!callback.handle(event)) { break; } } } if (!terminalEventReceived) { throw new IOException("AI网关流异常中断"); } } finally { if (connection != null) { connection.disconnect(); } } } }