zhou zhou
8 小时以前 82624affb0251b75b62b35567d3eb260c06efe78
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.vincent.rsf.server.ai.service.impl.chat;
 
import com.vincent.rsf.server.ai.dto.AiChatErrorDto;
import com.vincent.rsf.server.ai.enums.AiErrorCategory;
import com.vincent.rsf.server.ai.exception.AiChatException;
import com.vincent.rsf.server.ai.service.AiCallLogService;
import com.vincent.rsf.server.ai.store.AiStreamStateStore;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
import java.time.Instant;
 
@Slf4j
@Component
@RequiredArgsConstructor
public class AiChatFailureHandler {
 
    private final AiSseEventPublisher aiSseEventPublisher;
    private final AiCallLogService aiCallLogService;
    private final AiStreamStateStore aiStreamStateStore;
 
    public AiChatException buildAiException(String code, AiErrorCategory category, String stage, String message, Throwable cause) {
        return new AiChatException(code, category, stage, message, cause);
    }
 
    public void handleStreamFailure(SseEmitter emitter, String requestId, Long sessionId, String model, long startedAt,
                                    Long firstTokenAt, AiChatException exception, Long callLogId,
                                    long toolSuccessCount, long toolFailureCount,
                                    AiThinkingTraceEmitter thinkingTraceEmitter,
                                    Long tenantId, Long userId, String promptCode) {
        if (isClientAbortException(exception)) {
            log.warn("AI chat aborted by client, requestId={}, sessionId={}, stage={}, message={}",
                    requestId, sessionId, exception.getStage(), exception.getMessage());
            if (thinkingTraceEmitter != null) {
                thinkingTraceEmitter.markTerminated("ABORTED");
            }
            aiSseEventPublisher.emitSafely(emitter, "status",
                    aiSseEventPublisher.buildTerminalStatus(requestId, sessionId, "ABORTED", model, startedAt, firstTokenAt));
            aiCallLogService.failCallLog(
                    callLogId,
                    "ABORTED",
                    exception.getCategory().name(),
                    exception.getStage(),
                    exception.getMessage(),
                    System.currentTimeMillis() - startedAt,
                    aiSseEventPublisher.resolveFirstTokenLatency(startedAt, firstTokenAt),
                    toolSuccessCount,
                    toolFailureCount
            );
            aiStreamStateStore.markStreamState(requestId, tenantId, userId, sessionId, promptCode, "ABORTED", exception.getMessage());
            emitter.completeWithError(exception);
            return;
        }
        log.error("AI chat failed, requestId={}, sessionId={}, category={}, stage={}, message={}",
                requestId, sessionId, exception.getCategory(), exception.getStage(), exception.getMessage(), exception);
        if (thinkingTraceEmitter != null) {
            thinkingTraceEmitter.markTerminated("FAILED");
        }
        aiSseEventPublisher.emitSafely(emitter, "status",
                aiSseEventPublisher.buildTerminalStatus(requestId, sessionId, "FAILED", model, startedAt, firstTokenAt));
        aiSseEventPublisher.emitSafely(emitter, "error", AiChatErrorDto.builder()
                .requestId(requestId)
                .sessionId(sessionId)
                .code(exception.getCode())
                .category(exception.getCategory().name())
                .stage(exception.getStage())
                .message(exception.getMessage())
                .timestamp(Instant.now().toEpochMilli())
                .build());
        aiCallLogService.failCallLog(
                callLogId,
                "FAILED",
                exception.getCategory().name(),
                exception.getStage(),
                exception.getMessage(),
                System.currentTimeMillis() - startedAt,
                aiSseEventPublisher.resolveFirstTokenLatency(startedAt, firstTokenAt),
                toolSuccessCount,
                toolFailureCount
        );
        aiStreamStateStore.markStreamState(requestId, tenantId, userId, sessionId, promptCode, "FAILED", exception.getMessage());
        emitter.completeWithError(exception);
    }
 
    private boolean isClientAbortException(Throwable throwable) {
        Throwable current = throwable;
        while (current != null) {
            String message = current.getMessage();
            if (message != null) {
                String normalized = message.toLowerCase();
                if (normalized.contains("broken pipe")
                        || normalized.contains("connection reset")
                        || normalized.contains("forcibly closed")
                        || normalized.contains("abort")) {
                    return true;
                }
            }
            current = current.getCause();
        }
        return false;
    }
}