package com.zy.asrs.service.impl;
|
|
import com.alibaba.fastjson.JSON;
|
import com.zy.asrs.domain.replay.DeviceReplayChunkMeta;
|
import com.zy.asrs.domain.replay.DeviceReplayManifest;
|
import com.zy.asrs.domain.replay.DeviceReplayState;
|
import com.zy.asrs.domain.replay.DeviceReplayStreamKey;
|
import com.zy.asrs.domain.replay.ReplayChunk;
|
import com.zy.asrs.domain.replay.ReplayFrameSummary;
|
import com.zy.asrs.domain.replay.ReplaySeekResult;
|
import com.zy.asrs.domain.replay.ReplaySessionContext;
|
import com.zy.asrs.entity.DeviceDataLog;
|
import com.zy.asrs.service.DeviceLogReplayManifestService;
|
import com.zy.asrs.service.DeviceLogReplayNormalizer;
|
import com.zy.asrs.service.DeviceLogReplayService;
|
import jakarta.annotation.PreDestroy;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Service;
|
|
import java.io.BufferedReader;
|
import java.nio.charset.StandardCharsets;
|
import java.nio.file.Files;
|
import java.nio.file.Path;
|
import java.nio.file.Paths;
|
import java.time.LocalDate;
|
import java.time.ZoneId;
|
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeParseException;
|
import java.util.ArrayList;
|
import java.util.Collections;
|
import java.util.Comparator;
|
import java.util.LinkedHashSet;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.UUID;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
@Slf4j
|
@Service
|
public class DeviceLogReplayServiceImpl implements DeviceLogReplayService {
|
|
private static final long CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS = 2_000L;
|
private static final long DEFAULT_REPLAY_WINDOW_HALF_MS = 15L * 60L * 1000L;
|
private static final long DAY_END_OFFSET_MS = 24L * 60L * 60L * 1000L - 1L;
|
private static final String REPLAY_WINDOW_MISSING = "REPLAY_WINDOW_MISSING";
|
private static final String REPLAY_WINDOW_INVALID = "REPLAY_WINDOW_INVALID";
|
private static final String REPLAY_WINDOW_CROSS_DAY = "REPLAY_WINDOW_CROSS_DAY";
|
private static final String REPLAY_WINDOW_TOO_LARGE = "REPLAY_WINDOW_TOO_LARGE";
|
private static final String REPLAY_SEEK_OUT_OF_WINDOW = "REPLAY_SEEK_OUT_OF_WINDOW";
|
|
private final DeviceLogReplayManifestService manifestService;
|
private final DeviceLogReplayNormalizer replayNormalizer;
|
|
private final ConcurrentHashMap<String, ReplaySessionContext> sessions = new ConcurrentHashMap<>();
|
private final ExecutorService replayWarmupExecutor = Executors.newSingleThreadExecutor(runnable -> {
|
Thread thread = new Thread(runnable, "replay-tail-checkpoint-warmup");
|
thread.setDaemon(true);
|
return thread;
|
});
|
|
@Value("${deviceLogStorage.type}")
|
private String storageType;
|
@Value("${deviceLogStorage.loggingPath}")
|
private String loggingPath;
|
@Value("${deviceReplay.chunkSampleSize:400}")
|
private Integer chunkSampleSize;
|
@Value("${deviceReplay.maxChunkFrames:32}")
|
private Integer maxChunkFrames;
|
@Value("${deviceReplay.timelineBucketSeconds:20}")
|
private Integer timelineBucketSeconds;
|
@Value("${deviceReplay.maxWindowMinutes:240}")
|
private Integer maxWindowMinutes;
|
@Value("${deviceReplay.maxConcurrentSessions:4}")
|
private Integer maxConcurrentSessions;
|
@Value("${deviceReplay.maxPrefetchChunks:3}")
|
private Integer maxPrefetchChunks;
|
@Value("${deviceReplay.maxChunkCacheEntries:128}")
|
private Integer maxChunkCacheEntries;
|
@Value("${deviceReplay.sessionTtlSeconds:1800}")
|
private Integer sessionTtlSeconds;
|
|
public DeviceLogReplayServiceImpl(DeviceLogReplayManifestService manifestService,
|
DeviceLogReplayNormalizer replayNormalizer) {
|
this.manifestService = manifestService;
|
this.replayNormalizer = replayNormalizer;
|
}
|
|
@Override
|
public Map<String, Object> createSession(String day,
|
String type,
|
Integer deviceNo,
|
Integer stationId,
|
Long targetTimestamp,
|
Long windowStartTimeMs,
|
Long windowEndTimeMs) {
|
ensureFileMode();
|
ReplayWindow replayWindow = resolveReplayWindow(day, targetTimestamp, windowStartTimeMs, windowEndTimeMs);
|
cleanupExpiredSessions();
|
if (sessions.size() >= safePositive(maxConcurrentSessions, 4)) {
|
throw new IllegalStateException("历史回放会话已满,请先关闭未使用的回放页面");
|
}
|
boolean currentReplayDay = isCurrentReplayDay(day);
|
if (type != null && !type.isBlank() && deviceNo != null) {
|
warmupEntryManifest(day, type, deviceNo, stationId);
|
}
|
List<DeviceReplayManifest> manifests = loadSessionManifests(day, currentReplayDay, true);
|
if (manifests.isEmpty()) {
|
throw new IllegalArgumentException("当天没有可回放清单,请确认该日期日志已生成 replay manifest");
|
}
|
TimelineChunkSnapshotData timelineSnapshot = buildTimelineChunkSnapshot(manifests, replayWindow.startTimeMs, replayWindow.endTimeMs);
|
ReplaySessionContext session = new ReplaySessionContext();
|
session.setSessionId(UUID.randomUUID().toString().replace("-", ""));
|
session.setDay(day);
|
session.setCreatedAtMs(System.currentTimeMillis());
|
session.setLastAccessAtMs(session.getCreatedAtMs());
|
session.setWindowStartTimeMs(replayWindow.startTimeMs);
|
session.setWindowEndTimeMs(replayWindow.endTimeMs);
|
session.setStartTimeMs(replayWindow.startTimeMs);
|
session.setEndTimeMs(replayWindow.endTimeMs);
|
session.setTimelineChunks(timelineSnapshot.timelineChunks);
|
session.setChunkStreamIdsByChunk(timelineSnapshot.chunkStreamIdsByChunk);
|
session.setLastManifestRefreshAtMs(System.currentTimeMillis());
|
session.setManifestSnapshotKey(buildManifestSnapshotKey(manifests));
|
session.setManifestByStream(buildManifestSnapshot(manifests));
|
sessions.put(session.getSessionId(), session);
|
if (currentReplayDay) {
|
manifestService.scheduleDayManifestRefresh(day);
|
scheduleTailCheckpointWarmup(session.getSessionId());
|
}
|
|
Map<String, Object> data = new LinkedHashMap<>();
|
data.put("sessionId", session.getSessionId());
|
data.put("day", day);
|
data.put("startTimeMs", session.getStartTimeMs());
|
data.put("endTimeMs", session.getEndTimeMs());
|
data.put("windowStartTimeMs", session.getWindowStartTimeMs());
|
data.put("windowEndTimeMs", session.getWindowEndTimeMs());
|
data.put("chunkCount", session.getTimelineChunks().size());
|
data.put("streamCount", session.getManifestByStream().size());
|
data.put("entryType", type);
|
data.put("entryDeviceNo", deviceNo);
|
data.put("entryStationId", stationId);
|
data.put("maxPrefetchChunks", safePositive(maxPrefetchChunks, 3));
|
return data;
|
}
|
|
@Override
|
public Map<String, Object> loadTimeline(String sessionId) {
|
ReplaySessionContext session = requireSession(sessionId);
|
Map<String, Object> data = new LinkedHashMap<>();
|
data.put("sessionId", session.getSessionId());
|
data.put("day", session.getDay());
|
data.put("startTimeMs", session.getStartTimeMs());
|
data.put("endTimeMs", session.getEndTimeMs());
|
data.put("windowStartTimeMs", session.getWindowStartTimeMs());
|
data.put("windowEndTimeMs", session.getWindowEndTimeMs());
|
data.put("chunks", session.getTimelineChunks());
|
data.put("maxPrefetchChunks", safePositive(maxPrefetchChunks, 3));
|
return data;
|
}
|
|
@Override
|
public Map<String, Object> loadTimelineSummary(String sessionId, Integer bucketCount) {
|
ReplaySessionContext session = requireSession(sessionId);
|
int effectiveBucketCount = resolveSummaryBucketCount(session, bucketCount);
|
synchronized (session.getTimelineSummaryCache()) {
|
Map<String, Object> cachedSummary = session.getTimelineSummaryCache().get(effectiveBucketCount);
|
if (cachedSummary != null) {
|
return cachedSummary;
|
}
|
}
|
TimelineSummarySnapshot summarySnapshot = buildTimelineSummarySnapshot(session);
|
Map<String, Object> summary = buildTimelineSummary(summarySnapshot, effectiveBucketCount);
|
synchronized (session.getTimelineSummaryCache()) {
|
Map<String, Object> cachedSummary = session.getTimelineSummaryCache().get(effectiveBucketCount);
|
if (cachedSummary != null) {
|
return cachedSummary;
|
}
|
session.getTimelineSummaryCache().put(effectiveBucketCount, summary);
|
return summary;
|
}
|
}
|
|
private TimelineSummarySnapshot buildTimelineSummarySnapshot(ReplaySessionContext session) {
|
TimelineSummarySnapshot snapshot = new TimelineSummarySnapshot();
|
synchronized (session) {
|
snapshot.sessionId = session.getSessionId();
|
snapshot.day = session.getDay();
|
snapshot.startTimeMs = safeLong(session.getStartTimeMs());
|
snapshot.endTimeMs = safeLong(session.getEndTimeMs());
|
snapshot.windowStartTimeMs = safeLong(session.getWindowStartTimeMs());
|
snapshot.windowEndTimeMs = safeLong(session.getWindowEndTimeMs());
|
snapshot.timelineChunks = new ArrayList<>(session.getTimelineChunks());
|
snapshot.manifests = new ArrayList<>(session.getManifestByStream().values());
|
}
|
return snapshot;
|
}
|
|
private Map<String, Object> buildTimelineSummary(TimelineSummarySnapshot snapshot, int bucketCount) {
|
Map<String, Object> summary = new LinkedHashMap<>();
|
List<Map<String, Object>> buckets = new ArrayList<>();
|
if (bucketCount <= 0 || snapshot.timelineChunks.isEmpty()) {
|
summary.put("sessionId", snapshot.sessionId);
|
summary.put("day", snapshot.day);
|
summary.put("bucketCount", 0);
|
summary.put("startTimeMs", snapshot.startTimeMs);
|
summary.put("endTimeMs", snapshot.endTimeMs);
|
summary.put("buckets", buckets);
|
return summary;
|
}
|
|
long totalDurationMs = Math.max(1L, snapshot.endTimeMs - snapshot.startTimeMs + 1L);
|
long bucketDurationMs = Math.max(1L, (long) Math.ceil(totalDurationMs / (double) bucketCount));
|
for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) {
|
long bucketStartTime = snapshot.startTimeMs + bucketIndex * bucketDurationMs;
|
if (bucketStartTime > snapshot.endTimeMs) {
|
bucketStartTime = snapshot.endTimeMs;
|
}
|
long bucketEndTime = Math.min(snapshot.endTimeMs, bucketStartTime + bucketDurationMs - 1L);
|
int startChunkIndex = resolveChunkIndex(snapshot.timelineChunks, bucketStartTime, 0L);
|
int endChunkIndex = resolveChunkIndex(snapshot.timelineChunks, bucketEndTime, 0L);
|
if (endChunkIndex < startChunkIndex) {
|
endChunkIndex = startChunkIndex;
|
}
|
buckets.add(buildSummaryBucket(snapshot, bucketIndex, bucketStartTime, bucketEndTime, startChunkIndex, endChunkIndex));
|
}
|
summary.put("sessionId", snapshot.sessionId);
|
summary.put("day", snapshot.day);
|
summary.put("bucketCount", bucketCount);
|
summary.put("startTimeMs", snapshot.startTimeMs);
|
summary.put("endTimeMs", snapshot.endTimeMs);
|
summary.put("buckets", buckets);
|
return summary;
|
}
|
|
private Map<String, Object> buildSummaryBucket(TimelineSummarySnapshot snapshot,
|
int bucketIndex,
|
long bucketStartTime,
|
long bucketEndTime,
|
int startChunkIndex,
|
int endChunkIndex) {
|
Map<String, Object> bucket = new LinkedHashMap<>();
|
bucket.put("bucketIndex", bucketIndex);
|
bucket.put("startTimeMs", clampTimestampToWindow(snapshot, bucketStartTime));
|
bucket.put("endTimeMs", clampTimestampToWindow(snapshot, bucketEndTime));
|
|
int firstDataChunkIndex = -1;
|
int anchorChunkIndex = -1;
|
int anchorFrameIndex = -1;
|
int firstAbnormalChunkIndex = -1;
|
int firstAbnormalFrameIndex = -1;
|
String topEventType = "";
|
int bucketEstimatedSamples = 0;
|
|
for (int chunkIndex = startChunkIndex; chunkIndex <= endChunkIndex; chunkIndex++) {
|
Map<String, Object> chunkMeta = resolveTimelineChunk(snapshot.timelineChunks, chunkIndex);
|
int estimatedSamples = intValue(chunkMeta.get("estimatedSamples"));
|
bucketEstimatedSamples += Math.max(0, estimatedSamples);
|
if (firstDataChunkIndex >= 0 || estimatedSamples <= 0) {
|
continue;
|
}
|
firstDataChunkIndex = chunkIndex;
|
}
|
|
boolean hasReplayData = bucketEstimatedSamples > 0 && firstDataChunkIndex >= 0;
|
int activityScore = hasReplayData ? 100 : 0;
|
int abnormalCount = 0;
|
int errorCount = 0;
|
int blockCount = 0;
|
boolean hasCrnData = false;
|
boolean hasStationData = false;
|
boolean hasRgvData = false;
|
|
if (hasReplayData) {
|
long visibleBucketStartTime = clampTimestampToWindow(snapshot, bucketStartTime);
|
long visibleBucketEndTime = clampTimestampToWindow(snapshot, bucketEndTime);
|
for (DeviceReplayManifest manifest : snapshot.manifests) {
|
if (manifest == null) {
|
continue;
|
}
|
long manifestStartTime = safeLong(manifest.getFirstSampleTimeMs());
|
long manifestEndTime = safeLong(manifest.getLastSampleTimeMs());
|
if (manifestEndTime < visibleBucketStartTime || manifestStartTime > visibleBucketEndTime) {
|
continue;
|
}
|
if ("Devp".equalsIgnoreCase(manifest.getType())) {
|
hasStationData = true;
|
} else if ("Rgv".equalsIgnoreCase(manifest.getType())) {
|
hasRgvData = true;
|
} else if ("Crn".equalsIgnoreCase(manifest.getType()) || "DualCrn".equalsIgnoreCase(manifest.getType())) {
|
hasCrnData = true;
|
}
|
}
|
}
|
|
if (hasReplayData && anchorChunkIndex < 0) {
|
anchorChunkIndex = firstDataChunkIndex;
|
anchorFrameIndex = 0;
|
}
|
|
bucket.put("activityScore", activityScore);
|
bucket.put("abnormalCount", abnormalCount);
|
bucket.put("errorCount", errorCount);
|
bucket.put("blockCount", blockCount);
|
bucket.put("hasReplayData", hasReplayData);
|
bucket.put("hasCrnData", hasCrnData);
|
bucket.put("hasStationData", hasStationData);
|
bucket.put("hasRgvData", hasRgvData);
|
bucket.put("anchorChunkIndex", anchorChunkIndex);
|
bucket.put("anchorFrameIndex", anchorFrameIndex);
|
bucket.put("firstAbnormalChunkIndex", firstAbnormalChunkIndex);
|
bucket.put("firstAbnormalFrameIndex", firstAbnormalFrameIndex);
|
bucket.put("topEventType", topEventType);
|
return bucket;
|
}
|
|
@Override
|
public ReplayChunk loadChunk(String sessionId, Integer chunkIndex) {
|
ReplaySessionContext session = requireSession(sessionId);
|
return projectChunkForClient(session, loadFullChunk(session, chunkIndex));
|
}
|
|
private ReplayChunk loadFullChunk(ReplaySessionContext session, Integer chunkIndex) {
|
ReplayChunk cachedChunk = getCachedChunk(session, chunkIndex, true);
|
if (cachedChunk != null) {
|
return cachedChunk;
|
}
|
String manifestSnapshotKey = session.getManifestSnapshotKey();
|
ReplayChunk chunk = buildChunk(session, chunkIndex, true);
|
if (isSameManifestSnapshot(session, manifestSnapshotKey)) {
|
cacheChunk(session, chunkIndex, chunk, true);
|
} else {
|
chunk = buildChunk(session, chunkIndex, true);
|
}
|
return chunk;
|
}
|
|
@Override
|
public ReplayChunk loadChunk(String sessionId, Integer chunkIndex, boolean fullFrames) {
|
ReplaySessionContext session = requireSession(sessionId);
|
ReplayChunk cachedChunk = getCachedChunk(session, chunkIndex, fullFrames);
|
if (cachedChunk != null) {
|
return projectChunkForClient(session, cachedChunk);
|
}
|
String manifestSnapshotKey = session.getManifestSnapshotKey();
|
ReplayChunk chunk = fullFrames
|
? buildChunk(session, chunkIndex, true)
|
: buildCompactChunk(session, chunkIndex);
|
if (isSameManifestSnapshot(session, manifestSnapshotKey)) {
|
cacheChunk(session, chunkIndex, chunk, fullFrames);
|
} else {
|
chunk = fullFrames
|
? buildChunk(session, chunkIndex, true)
|
: buildCompactChunk(session, chunkIndex);
|
}
|
return projectChunkForClient(session, chunk);
|
}
|
|
private ReplayChunk buildCompactChunk(ReplaySessionContext session, Integer chunkIndex) {
|
Map<String, Object> chunkMeta = resolveTimelineChunk(session, chunkIndex);
|
long startTime = numberValue(chunkMeta.get("startTimeMs"));
|
long endTime = numberValue(chunkMeta.get("endTimeMs"));
|
long sampleSeq = numberValue(chunkMeta.get("lastSampleSeq"));
|
|
ReplayChunk chunk = new ReplayChunk();
|
chunk.setChunkIndex(chunkIndex);
|
chunk.setStartTimeMs(startTime);
|
chunk.setEndTimeMs(endTime);
|
|
Map<String, Object> summary = new LinkedHashMap<>();
|
summary.put("stationCount", 0);
|
summary.put("crnCount", 0);
|
summary.put("dualCrnCount", 0);
|
summary.put("rgvCount", 0);
|
summary.put("abnormalCount", 0);
|
summary.put("errorCount", 0);
|
summary.put("blockCount", 0);
|
summary.put("estimatedSamples", intValue(chunkMeta.get("estimatedSamples")));
|
|
Map<String, Object> frame = new LinkedHashMap<>();
|
frame.put("timestamp", endTime > 0 ? endTime : startTime);
|
frame.put("sampleSeq", sampleSeq > 0 ? sampleSeq : 0L);
|
frame.put("abnormalList", new ArrayList<>());
|
frame.put("summary", summary);
|
appendFrame(chunk, frame);
|
chunk.setFrameCount(chunk.getFrames().size());
|
return chunk;
|
}
|
|
private ReplayChunk buildChunk(ReplaySessionContext session, Integer chunkIndex, boolean fullFrames) {
|
String manifestSnapshotKey = session.getManifestSnapshotKey();
|
Map<String, Object> chunkMeta = resolveTimelineChunk(session, chunkIndex);
|
long startTime = numberValue(chunkMeta.get("startTimeMs"));
|
long endTime = numberValue(chunkMeta.get("endTimeMs"));
|
|
Map<String, Object> currentStateByStream = new LinkedHashMap<>();
|
List<DeviceReplayState> eventList = new ArrayList<>();
|
ChunkCheckpointSeed checkpointSeed = resolveCheckpointSeed(session, chunkIndex);
|
boolean enableBaselineRecovery = fullFrames || checkpointSeed != null;
|
if (checkpointSeed != null && checkpointSeed.stateByStream != null) {
|
currentStateByStream.putAll(checkpointSeed.stateByStream);
|
}
|
long scanStartExclusiveTimeMs = checkpointSeed == null ? 0L : checkpointSeed.scanStartExclusiveTimeMs;
|
boolean allowBaselineLookup = fullFrames && checkpointSeed == null;
|
|
for (DeviceReplayManifest manifest : resolveChunkManifests(session, chunkIndex)) {
|
StreamReplayScanResult scanResult = scanManifestRange(
|
manifest,
|
startTime,
|
endTime,
|
allowBaselineLookup,
|
scanStartExclusiveTimeMs);
|
if (enableBaselineRecovery && scanResult.baselineState != null) {
|
currentStateByStream.put(manifest.getStreamId(), scanResult.baselineState);
|
}
|
eventList.addAll(scanResult.eventList);
|
}
|
eventList.sort(Comparator
|
.comparing(DeviceReplayState::getTimestamp, Comparator.nullsLast(Long::compareTo))
|
.thenComparing(DeviceReplayState::getSampleSeq, Comparator.nullsLast(Long::compareTo))
|
.thenComparing(DeviceReplayState::getType, Comparator.nullsLast(String::compareTo))
|
.thenComparing(DeviceReplayState::getDeviceNo, Comparator.nullsLast(Integer::compareTo))
|
.thenComparing(DeviceReplayState::getStationId, Comparator.nullsLast(Integer::compareTo)));
|
|
ReplayChunk chunk = new ReplayChunk();
|
chunk.setChunkIndex(chunkIndex);
|
chunk.setStartTimeMs(startTime);
|
chunk.setEndTimeMs(endTime);
|
|
if (fullFrames && !currentStateByStream.isEmpty()) {
|
Map<String, Object> baseState = replayNormalizer.buildFrame(startTime, 0L, currentStateByStream);
|
chunk.setBaseState(baseState);
|
appendFrame(chunk, baseState);
|
}
|
|
List<Integer> keptGroupIndexes = fullFrames ? null : resolveCompactGroupIndexes(eventList);
|
int groupIndex = -1;
|
int cursor = 0;
|
while (cursor < eventList.size()) {
|
groupIndex++;
|
DeviceReplayState currentEvent = eventList.get(cursor);
|
Long frameTime = currentEvent.getTimestamp();
|
Long frameSeq = currentEvent.getSampleSeq();
|
while (cursor < eventList.size()) {
|
DeviceReplayState nextEvent = eventList.get(cursor);
|
if (!equalsLong(frameTime, nextEvent.getTimestamp())
|
|| !equalsLong(frameSeq, nextEvent.getSampleSeq())) {
|
break;
|
}
|
String streamId = buildStreamId(session.getDay(), nextEvent.getType(), nextEvent.getDeviceNo(), nextEvent.getStationId());
|
currentStateByStream.put(streamId, nextEvent.getPayload());
|
frameSeq = nextEvent.getSampleSeq();
|
cursor++;
|
}
|
if (fullFrames || keptGroupIndexes.contains(groupIndex)) {
|
Map<String, Object> frame = fullFrames
|
? replayNormalizer.buildFrame(frameTime, frameSeq, currentStateByStream)
|
: replayNormalizer.buildCompactFrame(frameTime, frameSeq, currentStateByStream);
|
appendFrame(chunk, frame);
|
}
|
}
|
|
if (chunk.getFrames().isEmpty()) {
|
Map<String, Object> emptyFrame = fullFrames
|
? replayNormalizer.buildFrame(startTime, 0L, currentStateByStream)
|
: replayNormalizer.buildCompactFrame(startTime, 0L, currentStateByStream);
|
appendFrame(chunk, emptyFrame);
|
}
|
if (fullFrames) {
|
chunk.setFullFrames(new ArrayList<>(chunk.getFrames()));
|
chunk.setFullFrameSummaryList(new ArrayList<>(chunk.getFrameSummaryList()));
|
shrinkChunkFrames(chunk);
|
}
|
chunk.setFrameCount(chunk.getFrames().size());
|
if (fullFrames && !currentStateByStream.isEmpty() && isSameManifestSnapshot(session, manifestSnapshotKey)) {
|
cacheChunkCheckpoint(session, chunkIndex, currentStateByStream);
|
}
|
return chunk;
|
}
|
|
private ReplayChunk projectChunkForClient(ReplaySessionContext session, ReplayChunk sourceChunk) {
|
ReplayChunk chunk = new ReplayChunk();
|
chunk.setChunkIndex(sourceChunk.getChunkIndex());
|
chunk.setStartTimeMs(clampTimestampToWindow(session, safeLong(sourceChunk.getStartTimeMs())));
|
chunk.setEndTimeMs(clampTimestampToWindow(session, safeLong(sourceChunk.getEndTimeMs())));
|
chunk.setBaseState(copyObjectForClient(sourceChunk.getBaseState()));
|
copyVisibleFrames(sourceChunk, chunk);
|
chunk.setFrameCount(chunk.getFrames().size());
|
return chunk;
|
}
|
|
@SuppressWarnings("unchecked")
|
private Map<String, Object> copyObjectForClient(Map<String, Object> sourceMap) {
|
if (sourceMap == null) {
|
return null;
|
}
|
return (Map<String, Object>) copyValueForClient(sourceMap);
|
}
|
|
@SuppressWarnings("unchecked")
|
private Object copyValueForClient(Object sourceValue) {
|
if (sourceValue instanceof Map<?, ?> sourceMap) {
|
Map<String, Object> copiedMap = new LinkedHashMap<>();
|
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
|
copiedMap.put(String.valueOf(entry.getKey()), copyValueForClient(entry.getValue()));
|
}
|
return copiedMap;
|
}
|
if (sourceValue instanceof List<?> sourceList) {
|
List<Object> copiedList = new ArrayList<>(sourceList.size());
|
for (Object item : sourceList) {
|
copiedList.add(copyValueForClient(item));
|
}
|
return copiedList;
|
}
|
return sourceValue;
|
}
|
|
private void copyVisibleFrames(ReplayChunk sourceChunk, ReplayChunk targetChunk) {
|
List<Map<String, Object>> sourceFrames = resolveFullFrameList(sourceChunk);
|
List<ReplayFrameSummary> sourceSummaries = resolveFullFrameSummaryList(sourceChunk);
|
long startTime = safeLong(targetChunk.getStartTimeMs());
|
long endTime = safeLong(targetChunk.getEndTimeMs());
|
for (int frameIndex = 0; frameIndex < sourceFrames.size(); frameIndex++) {
|
Map<String, Object> frame = sourceFrames.get(frameIndex);
|
long frameTime = numberValue(frame.get("timestamp"));
|
if (frameTime < startTime || frameTime > endTime) {
|
continue;
|
}
|
targetChunk.getFrames().add(copyObjectForClient(frame));
|
ReplayFrameSummary summary = frameIndex < sourceSummaries.size() ? sourceSummaries.get(frameIndex) : null;
|
targetChunk.getFrameSummaryList().add(copyFrameSummary(summary, targetChunk.getFrameSummaryList().size(), frameTime));
|
}
|
}
|
|
private List<ReplayFrameSummary> resolveFullFrameSummaryList(ReplayChunk replayChunk) {
|
if (replayChunk == null) {
|
return new ArrayList<>();
|
}
|
if (replayChunk.getFullFrameSummaryList() != null && !replayChunk.getFullFrameSummaryList().isEmpty()) {
|
return replayChunk.getFullFrameSummaryList();
|
}
|
return replayChunk.getFrameSummaryList();
|
}
|
|
private ReplayFrameSummary copyFrameSummary(ReplayFrameSummary sourceSummary, int frameIndex, long frameTime) {
|
ReplayFrameSummary summary = new ReplayFrameSummary();
|
summary.setFrameIndex(frameIndex);
|
summary.setTimestamp(frameTime);
|
summary.setAbnormalCount(sourceSummary == null ? 0 : sourceSummary.getAbnormalCount());
|
summary.setErrorCount(sourceSummary == null ? 0 : sourceSummary.getErrorCount());
|
summary.setBlockCount(sourceSummary == null ? 0 : sourceSummary.getBlockCount());
|
summary.setManualCount(sourceSummary == null ? 0 : sourceSummary.getManualCount());
|
return summary;
|
}
|
|
private List<Map<String, Object>> resolveFullFrameList(ReplayChunk replayChunk) {
|
if (replayChunk == null) {
|
return new ArrayList<>();
|
}
|
if (replayChunk.getFullFrames() != null && !replayChunk.getFullFrames().isEmpty()) {
|
return replayChunk.getFullFrames();
|
}
|
return replayChunk.getFrames();
|
}
|
|
private void shrinkChunkFrames(ReplayChunk chunk) {
|
if (chunk == null || chunk.getFrames() == null || chunk.getFrames().isEmpty()) {
|
return;
|
}
|
int frameLimit = Math.max(16, safePositive(maxChunkFrames, 32));
|
if (chunk.getFrames().size() <= frameLimit) {
|
return;
|
}
|
|
LinkedHashSet<Integer> keptIndexes = new LinkedHashSet<>();
|
keptIndexes.add(0);
|
keptIndexes.add(chunk.getFrames().size() - 1);
|
|
for (int index = 0; index < chunk.getFrameSummaryList().size(); index++) {
|
ReplayFrameSummary frameSummary = chunk.getFrameSummaryList().get(index);
|
if (frameSummary != null && safeSummaryActivity(frameSummary) > 0) {
|
keptIndexes.add(index);
|
}
|
}
|
|
int remainingSlots = Math.max(0, frameLimit - keptIndexes.size());
|
if (remainingSlots > 0) {
|
double step = (chunk.getFrames().size() - 1D) / Math.max(1D, remainingSlots);
|
for (int slot = 1; slot <= remainingSlots; slot++) {
|
int sampledIndex = (int) Math.round(slot * step);
|
sampledIndex = Math.max(0, Math.min(chunk.getFrames().size() - 1, sampledIndex));
|
keptIndexes.add(sampledIndex);
|
if (keptIndexes.size() >= frameLimit) {
|
break;
|
}
|
}
|
}
|
|
List<Integer> orderedIndexes = new ArrayList<>(keptIndexes);
|
orderedIndexes.sort(Integer::compareTo);
|
|
List<Map<String, Object>> compactFrames = new ArrayList<>(orderedIndexes.size());
|
List<ReplayFrameSummary> compactSummaryList = new ArrayList<>(orderedIndexes.size());
|
for (int compactIndex = 0; compactIndex < orderedIndexes.size(); compactIndex++) {
|
int originalIndex = orderedIndexes.get(compactIndex);
|
compactFrames.add(chunk.getFrames().get(originalIndex));
|
ReplayFrameSummary originalSummary = originalIndex < chunk.getFrameSummaryList().size()
|
? chunk.getFrameSummaryList().get(originalIndex)
|
: null;
|
if (originalSummary == null) {
|
originalSummary = new ReplayFrameSummary();
|
}
|
ReplayFrameSummary compactSummary = new ReplayFrameSummary();
|
compactSummary.setFrameIndex(compactIndex);
|
compactSummary.setTimestamp(originalSummary.getTimestamp());
|
compactSummary.setAbnormalCount(originalSummary.getAbnormalCount());
|
compactSummary.setErrorCount(originalSummary.getErrorCount());
|
compactSummary.setBlockCount(originalSummary.getBlockCount());
|
compactSummary.setManualCount(originalSummary.getManualCount());
|
compactSummaryList.add(compactSummary);
|
}
|
chunk.setFrames(compactFrames);
|
chunk.setFrameSummaryList(compactSummaryList);
|
}
|
|
private List<Integer> resolveCompactGroupIndexes(List<DeviceReplayState> eventList) {
|
List<Integer> keptIndexes = new ArrayList<>();
|
if (eventList == null || eventList.isEmpty()) {
|
return keptIndexes;
|
}
|
int groupCount = 0;
|
int cursor = 0;
|
while (cursor < eventList.size()) {
|
DeviceReplayState currentEvent = eventList.get(cursor);
|
Long frameTime = currentEvent.getTimestamp();
|
Long frameSeq = currentEvent.getSampleSeq();
|
while (cursor < eventList.size()) {
|
DeviceReplayState nextEvent = eventList.get(cursor);
|
if (!equalsLong(frameTime, nextEvent.getTimestamp())
|
|| !equalsLong(frameSeq, nextEvent.getSampleSeq())) {
|
break;
|
}
|
cursor++;
|
}
|
groupCount++;
|
}
|
int compactFrameLimit = Math.max(8, safePositive(maxChunkFrames, 32));
|
if (groupCount <= compactFrameLimit) {
|
for (int index = 0; index < groupCount; index++) {
|
keptIndexes.add(index);
|
}
|
return keptIndexes;
|
}
|
keptIndexes.add(0);
|
keptIndexes.add(groupCount - 1);
|
int remainingSlots = Math.max(0, compactFrameLimit - keptIndexes.size());
|
double step = (groupCount - 1D) / Math.max(1D, remainingSlots);
|
for (int slot = 1; slot <= remainingSlots; slot++) {
|
int sampledIndex = (int) Math.round(slot * step);
|
sampledIndex = Math.max(0, Math.min(groupCount - 1, sampledIndex));
|
if (!keptIndexes.contains(sampledIndex)) {
|
keptIndexes.add(sampledIndex);
|
}
|
}
|
keptIndexes.sort(Integer::compareTo);
|
return keptIndexes;
|
}
|
|
private int safeSummaryActivity(ReplayFrameSummary frameSummary) {
|
if (frameSummary == null) {
|
return 0;
|
}
|
return intValue(frameSummary.getAbnormalCount())
|
+ intValue(frameSummary.getErrorCount())
|
+ intValue(frameSummary.getBlockCount());
|
}
|
|
@Override
|
public ReplaySeekResult seek(String sessionId, Long timestamp, Long sampleSeq) {
|
ReplaySessionContext session = requireSession(sessionId);
|
long requestTime = timestamp == null || timestamp <= 0 ? session.getStartTimeMs() : timestamp;
|
rejectTimestampOutsideWindow(session, requestTime, "seek 目标时间不在当前回放窗口内");
|
long requestSeq = sampleSeq == null || sampleSeq <= 0 ? 0L : sampleSeq;
|
rejectSampleSeqOutsideWindow(session, requestSeq);
|
int initialChunkIndex = resolveChunkIndex(session, requestTime, requestSeq);
|
if (requestSeq <= 0) {
|
ReplaySeekResult result = new ReplaySeekResult();
|
result.setChunkIndex(initialChunkIndex);
|
result.setFrameIndex(0);
|
result.setRequestedTimestamp(requestTime);
|
result.setResolvedTimestamp(requestTime);
|
result.setRequestedSampleSeq(null);
|
result.setResolvedSampleSeq(null);
|
result.setDriftMs(0L);
|
result.setResolvedFrame(null);
|
return result;
|
}
|
ResolvedSeekFrame resolvedSeekFrame = findNearestSeekFrame(session, sessionId, initialChunkIndex, requestTime, requestSeq);
|
int chunkIndex = resolvedSeekFrame == null ? initialChunkIndex : resolvedSeekFrame.chunkIndex;
|
ReplayChunk chunk = resolvedSeekFrame == null ? loadFullChunk(session, chunkIndex) : resolvedSeekFrame.chunk;
|
long resolvedTime = resolvedSeekFrame == null ? requestTime : numberValue(resolvedSeekFrame.frame.get("timestamp"));
|
long resolvedSeq = resolvedSeekFrame == null ? requestSeq : numberValue(resolvedSeekFrame.frame.get("sampleSeq"));
|
int displayFrameIndex = findNearestDisplayFrameIndex(chunk, resolvedTime);
|
ReplaySeekResult result = new ReplaySeekResult();
|
result.setChunkIndex(chunkIndex);
|
result.setFrameIndex(displayFrameIndex);
|
result.setRequestedTimestamp(requestTime);
|
result.setResolvedTimestamp(resolvedTime);
|
result.setRequestedSampleSeq(requestSeq > 0 ? requestSeq : null);
|
result.setResolvedSampleSeq(resolvedSeq > 0 ? resolvedSeq : null);
|
result.setDriftMs(requestSeq > 0
|
? Math.abs(resolvedSeq - requestSeq)
|
: Math.abs(resolvedTime - requestTime));
|
result.setResolvedFrame(resolvedSeekFrame == null ? null : resolvedSeekFrame.frame);
|
return result;
|
}
|
|
private ResolvedSeekFrame findNearestSeekFrame(ReplaySessionContext session,
|
String sessionId,
|
int initialChunkIndex,
|
long requestTime,
|
long requestSeq) {
|
ResolvedSeekFrame bestMatch = evaluateSeekChunk(session, initialChunkIndex, requestTime, requestSeq);
|
long bestDrift = bestMatch == null ? Long.MAX_VALUE : bestMatch.drift;
|
int maxSearchDistance = Math.min(session.getTimelineChunks().size(), 512);
|
|
for (int distance = 1; distance < maxSearchDistance; distance++) {
|
boolean checkedAny = false;
|
|
int leftChunkIndex = initialChunkIndex - distance;
|
if (leftChunkIndex >= 0) {
|
long lowerBoundDrift = resolveChunkBoundaryDrift(session, leftChunkIndex, requestTime);
|
if (lowerBoundDrift <= bestDrift) {
|
ResolvedSeekFrame leftMatch = evaluateSeekChunk(session, leftChunkIndex, requestTime, requestSeq);
|
bestMatch = chooseBetterSeekFrame(bestMatch, leftMatch);
|
bestDrift = bestMatch == null ? Long.MAX_VALUE : bestMatch.drift;
|
checkedAny = true;
|
}
|
}
|
|
int rightChunkIndex = initialChunkIndex + distance;
|
if (rightChunkIndex < session.getTimelineChunks().size()) {
|
long lowerBoundDrift = resolveChunkBoundaryDrift(session, rightChunkIndex, requestTime);
|
if (lowerBoundDrift <= bestDrift) {
|
ResolvedSeekFrame rightMatch = evaluateSeekChunk(session, rightChunkIndex, requestTime, requestSeq);
|
bestMatch = chooseBetterSeekFrame(bestMatch, rightMatch);
|
bestDrift = bestMatch == null ? Long.MAX_VALUE : bestMatch.drift;
|
checkedAny = true;
|
}
|
}
|
|
if (!checkedAny && bestMatch != null) {
|
break;
|
}
|
}
|
return bestMatch;
|
}
|
|
private ResolvedSeekFrame evaluateSeekChunk(ReplaySessionContext session, int chunkIndex, long requestTime, long requestSeq) {
|
if (chunkIndex < 0) {
|
return null;
|
}
|
ReplayChunk chunk = loadFullChunk(session, chunkIndex);
|
List<Map<String, Object>> seekFrames = chunk.getFullFrames() == null || chunk.getFullFrames().isEmpty()
|
? chunk.getFrames()
|
: chunk.getFullFrames();
|
if (seekFrames == null || seekFrames.isEmpty()) {
|
return null;
|
}
|
|
Map<String, Object> bestFrame = null;
|
long bestDrift = Long.MAX_VALUE;
|
for (Map<String, Object> frame : seekFrames) {
|
long drift = requestSeq > 0
|
? Math.abs(numberValue(frame.get("sampleSeq")) - requestSeq)
|
: Math.abs(numberValue(frame.get("timestamp")) - requestTime);
|
if (drift <= bestDrift) {
|
bestDrift = drift;
|
bestFrame = frame;
|
}
|
}
|
if (bestFrame == null) {
|
return null;
|
}
|
|
ResolvedSeekFrame resolvedSeekFrame = new ResolvedSeekFrame();
|
resolvedSeekFrame.chunkIndex = chunkIndex;
|
resolvedSeekFrame.chunk = chunk;
|
resolvedSeekFrame.frame = bestFrame;
|
resolvedSeekFrame.drift = bestDrift;
|
return resolvedSeekFrame;
|
}
|
|
private ResolvedSeekFrame chooseBetterSeekFrame(ResolvedSeekFrame currentBest, ResolvedSeekFrame challenger) {
|
if (challenger == null) {
|
return currentBest;
|
}
|
if (currentBest == null || challenger.drift <= currentBest.drift) {
|
return challenger;
|
}
|
return currentBest;
|
}
|
|
private long resolveChunkBoundaryDrift(ReplaySessionContext session, int chunkIndex, long requestTime) {
|
Map<String, Object> chunkMeta = resolveTimelineChunk(session, chunkIndex);
|
long chunkStartTime = numberValue(chunkMeta.get("startTimeMs"));
|
long chunkEndTime = numberValue(chunkMeta.get("endTimeMs"));
|
if (requestTime < chunkStartTime) {
|
return chunkStartTime - requestTime;
|
}
|
if (requestTime > chunkEndTime) {
|
return requestTime - chunkEndTime;
|
}
|
return 0L;
|
}
|
|
private int findNearestDisplayFrameIndex(ReplayChunk chunk, long resolvedTime) {
|
List<Map<String, Object>> displayFrames = chunk.getFrames();
|
if (displayFrames == null || displayFrames.isEmpty()) {
|
return 0;
|
}
|
int matchedIndex = 0;
|
long bestDrift = Long.MAX_VALUE;
|
for (int index = 0; index < displayFrames.size(); index++) {
|
Map<String, Object> frame = displayFrames.get(index);
|
long drift = Math.abs(numberValue(frame.get("timestamp")) - resolvedTime);
|
if (drift <= bestDrift) {
|
bestDrift = drift;
|
matchedIndex = index;
|
}
|
}
|
return matchedIndex;
|
}
|
|
@Override
|
public Map<String, Object> resolveDevice(String sessionId, String type, Integer deviceNo, Integer stationId, Long timestamp) {
|
ReplaySessionContext session = requireSession(sessionId);
|
DeviceReplayManifest manifest = resolveDeviceManifest(session, type, deviceNo, stationId);
|
if (manifest == null) {
|
throw new IllegalArgumentException("未找到对应设备的历史日志");
|
}
|
long targetTimestamp = timestamp == null || timestamp <= 0 ? session.getStartTimeMs() : timestamp;
|
rejectTimestampOutsideWindow(session, targetTimestamp, "设备详情时间不在当前回放窗口内");
|
Object resolvedState = resolveDeviceStateAt(manifest, targetTimestamp);
|
if (resolvedState == null) {
|
throw new IllegalArgumentException("当前时间没有该设备的历史详情");
|
}
|
Map<String, Object> data = new LinkedHashMap<>();
|
data.put("day", session.getDay());
|
data.put("type", type);
|
data.put("deviceNo", deviceNo);
|
data.put("stationId", stationId);
|
data.put("timestamp", targetTimestamp);
|
data.put("detail", JSON.parseObject(JSON.toJSONString(resolvedState)));
|
return data;
|
}
|
|
private DeviceReplayManifest resolveDeviceManifest(ReplaySessionContext session, String type, Integer deviceNo, Integer stationId) {
|
String manifestKey = buildStreamId(session.getDay(), type, deviceNo, stationId);
|
DeviceReplayManifest manifest = session.getManifestByStream().get(manifestKey);
|
if (manifest != null) {
|
return manifest;
|
}
|
if (!"Devp".equalsIgnoreCase(type) || stationId == null) {
|
return null;
|
}
|
for (DeviceReplayManifest currentManifest : session.getManifestByStream().values()) {
|
if (currentManifest == null) {
|
continue;
|
}
|
if (!"Devp".equalsIgnoreCase(currentManifest.getType())) {
|
continue;
|
}
|
if (stationId.equals(currentManifest.getStationId())) {
|
return currentManifest;
|
}
|
}
|
return null;
|
}
|
|
@Override
|
public void closeSession(String sessionId) {
|
if (sessionId != null) {
|
sessions.remove(sessionId);
|
}
|
}
|
|
@PreDestroy
|
public void shutdownReplayWarmupExecutor() {
|
replayWarmupExecutor.shutdownNow();
|
}
|
|
private TimelineChunkSnapshotData buildTimelineChunkSnapshot(List<DeviceReplayManifest> manifests,
|
long windowStartTimeMs,
|
long windowEndTimeMs) {
|
List<ChunkWindowMeta> allChunkMetas = collectTimelineChunkMetas(manifests, windowStartTimeMs, windowEndTimeMs);
|
if (allChunkMetas.isEmpty() || windowEndTimeMs < windowStartTimeMs) {
|
return TimelineChunkSnapshotData.empty();
|
}
|
|
List<Map<String, Object>> chunks = new ArrayList<>();
|
Map<Integer, List<String>> chunkStreamIdsByChunk = new LinkedHashMap<>();
|
|
long maxWindowMs = safePositive(maxWindowMinutes, 240) * 60L * 1000L;
|
long bucketWindowMs = safePositive(timelineBucketSeconds, 20) * 1000L;
|
if (maxWindowMs > 0) {
|
bucketWindowMs = Math.min(bucketWindowMs, maxWindowMs);
|
}
|
bucketWindowMs = Math.max(1_000L, bucketWindowMs);
|
|
int chunkIndex = 0;
|
for (long windowStart = windowStartTimeMs; windowStart <= windowEndTimeMs; windowStart += bucketWindowMs) {
|
long windowEnd = Math.min(windowEndTimeMs, windowStart + bucketWindowMs - 1);
|
int estimatedSamples = estimateWindowSamples(allChunkMetas, windowStart, windowEnd);
|
int targetChunkSampleSize = Math.max(1, safePositive(chunkSampleSize, 400));
|
int desiredSplitCount = Math.max(1, (int) Math.ceil(estimatedSamples / (double) targetChunkSampleSize));
|
int splitCount = Math.min(128, desiredSplitCount);
|
long windowDurationMs = Math.max(1L, windowEnd - windowStart + 1L);
|
long splitDurationMs = Math.max(1L, (long) Math.ceil(windowDurationMs / (double) splitCount));
|
|
for (int splitIndex = 0; splitIndex < splitCount; splitIndex++) {
|
long splitStartTime = windowStart + splitIndex * splitDurationMs;
|
if (splitStartTime > windowEnd) {
|
break;
|
}
|
long splitEndTime = Math.min(windowEnd, splitStartTime + splitDurationMs - 1L);
|
int splitEstimatedSamples = estimateWindowSamples(allChunkMetas, splitStartTime, splitEndTime);
|
long firstSampleSeq = resolveWindowFirstSampleSeq(allChunkMetas, splitStartTime, splitEndTime);
|
long lastSampleSeq = resolveWindowLastSampleSeq(allChunkMetas, splitStartTime, splitEndTime);
|
List<String> activeStreamIds = resolveWindowStreamIds(allChunkMetas, splitStartTime, splitEndTime);
|
chunks.add(buildChunkMeta(
|
chunkIndex++,
|
splitStartTime,
|
splitEndTime,
|
splitEstimatedSamples,
|
firstSampleSeq,
|
lastSampleSeq));
|
chunkStreamIdsByChunk.put(chunkIndex - 1, activeStreamIds);
|
}
|
}
|
TimelineChunkSnapshotData snapshotData = new TimelineChunkSnapshotData();
|
snapshotData.timelineChunks = Collections.unmodifiableList(new ArrayList<>(chunks));
|
snapshotData.chunkStreamIdsByChunk = Collections.unmodifiableMap(new LinkedHashMap<>(chunkStreamIdsByChunk));
|
return snapshotData;
|
}
|
|
private Map<String, DeviceReplayManifest> buildManifestSnapshot(List<DeviceReplayManifest> manifests) {
|
Map<String, DeviceReplayManifest> manifestByStream = new LinkedHashMap<>();
|
for (DeviceReplayManifest manifest : manifests) {
|
manifestByStream.put(manifest.getStreamId(), manifest);
|
}
|
return Collections.unmodifiableMap(manifestByStream);
|
}
|
|
private int estimateWindowSamples(List<ChunkWindowMeta> chunkMetas, long windowStart, long windowEnd) {
|
int estimatedSamples = 0;
|
for (ChunkWindowMeta chunkWindowMeta : chunkMetas) {
|
DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta;
|
long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs());
|
long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs());
|
if (chunkEndTime < windowStart || chunkStartTime > windowEnd) {
|
continue;
|
}
|
estimatedSamples += Math.max(1, chunkMeta.getSampleCount() == null ? 1 : chunkMeta.getSampleCount());
|
}
|
return estimatedSamples;
|
}
|
|
private StreamReplayScanResult scanManifestRange(DeviceReplayManifest manifest,
|
long startTime,
|
long endTime,
|
boolean allowBaselineLookup,
|
long scanStartExclusiveTimeMs) {
|
StreamReplayScanResult result = new StreamReplayScanResult();
|
List<DeviceReplayChunkMeta> candidates = new ArrayList<>();
|
DeviceReplayChunkMeta baselineChunk = null;
|
for (DeviceReplayChunkMeta chunkMeta : manifest.getChunks()) {
|
long chunkStart = safeLong(chunkMeta.getFirstSampleTimeMs());
|
long chunkEnd = safeLong(chunkMeta.getLastSampleTimeMs());
|
if (allowBaselineLookup && chunkEnd < startTime) {
|
baselineChunk = chunkMeta;
|
continue;
|
}
|
if (chunkEnd <= scanStartExclusiveTimeMs) {
|
continue;
|
}
|
if (chunkStart > endTime) {
|
break;
|
}
|
candidates.add(chunkMeta);
|
}
|
if (allowBaselineLookup && baselineChunk != null) {
|
candidates.add(0, baselineChunk);
|
}
|
for (DeviceReplayChunkMeta chunkMeta : candidates) {
|
Path logPath = resolveLogPath(manifest, chunkMeta);
|
if (!Files.exists(logPath)) {
|
continue;
|
}
|
try (BufferedReader reader = Files.newBufferedReader(logPath, StandardCharsets.UTF_8)) {
|
String line;
|
while ((line = reader.readLine()) != null) {
|
if (line == null || line.isBlank()) {
|
continue;
|
}
|
DeviceDataLog logItem;
|
try {
|
logItem = JSON.parseObject(line, DeviceDataLog.class);
|
} catch (Exception parseError) {
|
log.debug("跳过损坏的 replay 日志行, path={}", logPath, parseError);
|
continue;
|
}
|
if (logItem == null) {
|
continue;
|
}
|
long sampleTime = resolveSampleTime(logItem);
|
Object normalizedState = replayNormalizer.normalizeState(logItem);
|
if (normalizedState == null) {
|
continue;
|
}
|
if (allowBaselineLookup && sampleTime < startTime) {
|
result.baselineState = normalizedState;
|
continue;
|
}
|
if (sampleTime <= scanStartExclusiveTimeMs) {
|
continue;
|
}
|
if (sampleTime > endTime) {
|
break;
|
}
|
DeviceReplayState replayState = new DeviceReplayState();
|
replayState.setType(manifest.getType());
|
replayState.setDeviceNo(manifest.getDeviceNo());
|
replayState.setStationId(manifest.getStationId());
|
replayState.setTimestamp(sampleTime);
|
replayState.setSampleSeq(resolveSampleSeq(logItem));
|
replayState.setPayload(normalizedState);
|
result.eventList.add(replayState);
|
}
|
} catch (Exception e) {
|
log.warn("读取 replay chunk 失败, path={}", logPath, e);
|
}
|
}
|
return result;
|
}
|
|
private Object resolveDeviceStateAt(DeviceReplayManifest manifest, long targetTimestamp) {
|
Object latestState = null;
|
for (DeviceReplayChunkMeta chunkMeta : manifest.getChunks()) {
|
long chunkStart = safeLong(chunkMeta.getFirstSampleTimeMs());
|
if (chunkStart > targetTimestamp) {
|
break;
|
}
|
Path logPath = resolveLogPath(manifest, chunkMeta);
|
if (!Files.exists(logPath)) {
|
continue;
|
}
|
try (BufferedReader reader = Files.newBufferedReader(logPath, StandardCharsets.UTF_8)) {
|
String line;
|
while ((line = reader.readLine()) != null) {
|
if (line == null || line.isBlank()) {
|
continue;
|
}
|
DeviceDataLog logItem;
|
try {
|
logItem = JSON.parseObject(line, DeviceDataLog.class);
|
} catch (Exception parseError) {
|
log.debug("跳过损坏的回放设备详情日志行, path={}", logPath, parseError);
|
continue;
|
}
|
if (logItem == null) {
|
continue;
|
}
|
long sampleTime = resolveSampleTime(logItem);
|
if (sampleTime > targetTimestamp) {
|
return latestState;
|
}
|
Object normalizedState = replayNormalizer.normalizeState(logItem);
|
if (normalizedState != null) {
|
latestState = normalizedState;
|
}
|
}
|
} catch (Exception e) {
|
log.warn("读取回放设备详情失败, path={}", logPath, e);
|
}
|
}
|
return latestState;
|
}
|
|
private ChunkCheckpointSeed resolveCheckpointSeed(ReplaySessionContext session, Integer chunkIndex) {
|
if (session == null || chunkIndex == null || chunkIndex <= 0) {
|
return null;
|
}
|
ChunkCheckpointSeed hotCheckpoint = resolveCheckpointSeed(session.getChunkStateCheckpointCache(), session, chunkIndex);
|
if (hotCheckpoint != null) {
|
return hotCheckpoint;
|
}
|
return resolveCheckpointSeed(session.getSparseChunkStateCheckpointCache(), session, chunkIndex);
|
}
|
|
private ChunkCheckpointSeed resolveCheckpointSeed(Map<Integer, Map<String, Object>> checkpointCache,
|
ReplaySessionContext session,
|
Integer chunkIndex) {
|
synchronized (checkpointCache) {
|
for (int checkpointChunkIndex = chunkIndex - 1; checkpointChunkIndex >= 0; checkpointChunkIndex--) {
|
Map<String, Object> checkpointState = checkpointCache.get(checkpointChunkIndex);
|
if (checkpointState == null || checkpointState.isEmpty()) {
|
continue;
|
}
|
Map<String, Object> checkpointChunkMeta = resolveTimelineChunk(session, checkpointChunkIndex);
|
ChunkCheckpointSeed checkpointSeed = new ChunkCheckpointSeed();
|
checkpointSeed.chunkIndex = checkpointChunkIndex;
|
checkpointSeed.scanStartExclusiveTimeMs = numberValue(checkpointChunkMeta.get("endTimeMs"));
|
checkpointSeed.stateByStream = new LinkedHashMap<>(checkpointState);
|
return checkpointSeed;
|
}
|
}
|
return null;
|
}
|
|
private void appendFrame(ReplayChunk chunk, Map<String, Object> frame) {
|
int frameIndex = chunk.getFrames().size();
|
chunk.getFrames().add(frame);
|
ReplayFrameSummary summary = new ReplayFrameSummary();
|
summary.setFrameIndex(frameIndex);
|
summary.setTimestamp(numberValue(frame.get("timestamp")));
|
Map<String, Object> frameSummary = frame.get("summary") instanceof Map
|
? (Map<String, Object>) frame.get("summary")
|
: new LinkedHashMap<>();
|
summary.setAbnormalCount(intValue(frameSummary.get("abnormalCount")));
|
summary.setErrorCount(intValue(frameSummary.get("errorCount")));
|
summary.setBlockCount(intValue(frameSummary.get("blockCount")));
|
summary.setManualCount(0);
|
chunk.getFrameSummaryList().add(summary);
|
}
|
|
private Map<String, Object> resolveTimelineChunk(ReplaySessionContext session, Integer chunkIndex) {
|
return resolveTimelineChunk(session.getTimelineChunks(), chunkIndex);
|
}
|
|
private Map<String, Object> resolveTimelineChunk(List<Map<String, Object>> timelineChunks, Integer chunkIndex) {
|
if (chunkIndex == null || chunkIndex < 0 || chunkIndex >= timelineChunks.size()) {
|
throw new IllegalArgumentException("回放分片不存在");
|
}
|
return timelineChunks.get(chunkIndex);
|
}
|
|
private int resolveChunkIndex(ReplaySessionContext session, long timestamp, long sampleSeq) {
|
return resolveChunkIndex(session.getTimelineChunks(), timestamp, sampleSeq);
|
}
|
|
private int resolveChunkIndex(List<Map<String, Object>> timelineChunks, long timestamp, long sampleSeq) {
|
if (sampleSeq > 0) {
|
for (int i = 0; i < timelineChunks.size(); i++) {
|
Map<String, Object> chunkMeta = timelineChunks.get(i);
|
long firstSampleSeq = numberValue(chunkMeta.get("firstSampleSeq"));
|
long lastSampleSeq = numberValue(chunkMeta.get("lastSampleSeq"));
|
if (sampleSeq >= firstSampleSeq && sampleSeq <= lastSampleSeq) {
|
return i;
|
}
|
}
|
}
|
for (int i = 0; i < timelineChunks.size(); i++) {
|
Map<String, Object> chunkMeta = timelineChunks.get(i);
|
long startTime = numberValue(chunkMeta.get("startTimeMs"));
|
long endTime = numberValue(chunkMeta.get("endTimeMs"));
|
if (timestamp >= startTime && timestamp <= endTime) {
|
return i;
|
}
|
}
|
return Math.max(0, timelineChunks.size() - 1);
|
}
|
|
private int resolveSummaryBucketCount(ReplaySessionContext session, Integer bucketCount) {
|
int totalChunkCount = session.getTimelineChunks().size();
|
if (totalChunkCount <= 0) {
|
return 0;
|
}
|
int defaultBucketCount = Math.min(totalChunkCount, 96);
|
int safeBucketCount = bucketCount == null || bucketCount <= 0 ? defaultBucketCount : bucketCount;
|
return Math.max(1, Math.min(totalChunkCount, safeBucketCount));
|
}
|
|
private Map<String, Object> buildChunkMeta(int chunkIndex,
|
long startTimeMs,
|
long endTimeMs,
|
int estimatedSamples,
|
long firstSampleSeq,
|
long lastSampleSeq) {
|
Map<String, Object> chunkMeta = new LinkedHashMap<>();
|
chunkMeta.put("chunkIndex", chunkIndex);
|
chunkMeta.put("startTimeMs", startTimeMs);
|
chunkMeta.put("endTimeMs", endTimeMs);
|
chunkMeta.put("estimatedSamples", estimatedSamples);
|
chunkMeta.put("firstSampleSeq", firstSampleSeq);
|
chunkMeta.put("lastSampleSeq", lastSampleSeq);
|
return chunkMeta;
|
}
|
|
private long resolveWindowFirstSampleSeq(List<ChunkWindowMeta> chunkMetas, long windowStart, long windowEnd) {
|
long firstSampleSeq = Long.MAX_VALUE;
|
for (ChunkWindowMeta chunkWindowMeta : chunkMetas) {
|
DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta;
|
long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs());
|
long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs());
|
if (chunkEndTime < windowStart || chunkStartTime > windowEnd) {
|
continue;
|
}
|
long candidate = safeLong(chunkMeta.getFirstSampleSeq());
|
if (candidate > 0) {
|
firstSampleSeq = Math.min(firstSampleSeq, candidate);
|
}
|
}
|
return firstSampleSeq == Long.MAX_VALUE ? 0L : firstSampleSeq;
|
}
|
|
private long resolveWindowLastSampleSeq(List<ChunkWindowMeta> chunkMetas, long windowStart, long windowEnd) {
|
long lastSampleSeq = 0L;
|
for (ChunkWindowMeta chunkWindowMeta : chunkMetas) {
|
DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta;
|
long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs());
|
long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs());
|
if (chunkEndTime < windowStart || chunkStartTime > windowEnd) {
|
continue;
|
}
|
lastSampleSeq = Math.max(lastSampleSeq, safeLong(chunkMeta.getLastSampleSeq()));
|
}
|
return lastSampleSeq;
|
}
|
|
private List<DeviceReplayManifest> resolveChunkManifests(ReplaySessionContext session, Integer chunkIndex) {
|
if (session == null || chunkIndex == null) {
|
return new ArrayList<>();
|
}
|
List<String> streamIds = session.getChunkStreamIdsByChunk().get(chunkIndex);
|
if (streamIds == null) {
|
return new ArrayList<>(session.getManifestByStream().values());
|
}
|
List<DeviceReplayManifest> manifests = new ArrayList<>(streamIds.size());
|
for (String streamId : streamIds) {
|
DeviceReplayManifest manifest = session.getManifestByStream().get(streamId);
|
if (manifest != null) {
|
manifests.add(manifest);
|
}
|
}
|
return manifests;
|
}
|
|
private List<ChunkWindowMeta> collectTimelineChunkMetas(List<DeviceReplayManifest> manifests,
|
long windowStartTimeMs,
|
long windowEndTimeMs) {
|
List<ChunkWindowMeta> chunkMetas = new ArrayList<>();
|
for (DeviceReplayManifest manifest : manifests) {
|
if (manifest == null || manifest.getChunks() == null || manifest.getChunks().isEmpty()) {
|
continue;
|
}
|
for (DeviceReplayChunkMeta chunkMeta : manifest.getChunks()) {
|
long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs());
|
long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs());
|
if (chunkStartTime <= 0 || chunkEndTime < chunkStartTime) {
|
continue;
|
}
|
if (chunkEndTime < windowStartTimeMs || chunkStartTime > windowEndTimeMs) {
|
continue;
|
}
|
ChunkWindowMeta chunkWindowMeta = new ChunkWindowMeta();
|
chunkWindowMeta.streamId = manifest.getStreamId();
|
chunkWindowMeta.chunkMeta = chunkMeta;
|
chunkMetas.add(chunkWindowMeta);
|
}
|
}
|
return chunkMetas;
|
}
|
|
private List<String> resolveWindowStreamIds(List<ChunkWindowMeta> chunkMetas,
|
long windowStartTimeMs,
|
long windowEndTimeMs) {
|
LinkedHashSet<String> streamIds = new LinkedHashSet<>();
|
for (ChunkWindowMeta chunkWindowMeta : chunkMetas) {
|
DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta;
|
long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs());
|
long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs());
|
if (chunkEndTime < windowStartTimeMs || chunkStartTime > windowEndTimeMs) {
|
continue;
|
}
|
streamIds.add(chunkWindowMeta.streamId);
|
}
|
return new ArrayList<>(streamIds);
|
}
|
|
private long clampTimestampToWindow(ReplaySessionContext session, long timestamp) {
|
long windowStartTimeMs = safeLong(session.getWindowStartTimeMs());
|
long windowEndTimeMs = safeLong(session.getWindowEndTimeMs());
|
if (timestamp < windowStartTimeMs) {
|
return windowStartTimeMs;
|
}
|
if (timestamp > windowEndTimeMs) {
|
return windowEndTimeMs;
|
}
|
return timestamp;
|
}
|
|
private long clampTimestampToWindow(TimelineSummarySnapshot snapshot, long timestamp) {
|
if (timestamp < snapshot.windowStartTimeMs) {
|
return snapshot.windowStartTimeMs;
|
}
|
if (timestamp > snapshot.windowEndTimeMs) {
|
return snapshot.windowEndTimeMs;
|
}
|
return timestamp;
|
}
|
|
private void rejectTimestampOutsideWindow(ReplaySessionContext session, long timestamp, String message) {
|
long windowStartTimeMs = safeLong(session.getWindowStartTimeMs());
|
long windowEndTimeMs = safeLong(session.getWindowEndTimeMs());
|
if (timestamp < windowStartTimeMs || timestamp > windowEndTimeMs) {
|
throwReplayWindowException(REPLAY_SEEK_OUT_OF_WINDOW, message);
|
}
|
}
|
|
private void rejectSampleSeqOutsideWindow(ReplaySessionContext session, long sampleSeq) {
|
if (sampleSeq <= 0) {
|
return;
|
}
|
long firstWindowSampleSeq = Long.MAX_VALUE;
|
long lastWindowSampleSeq = 0L;
|
for (Map<String, Object> chunkMeta : session.getTimelineChunks()) {
|
long firstSampleSeq = numberValue(chunkMeta.get("firstSampleSeq"));
|
long lastSampleSeq = numberValue(chunkMeta.get("lastSampleSeq"));
|
if (firstSampleSeq <= 0 || lastSampleSeq <= 0) {
|
continue;
|
}
|
firstWindowSampleSeq = Math.min(firstWindowSampleSeq, firstSampleSeq);
|
lastWindowSampleSeq = Math.max(lastWindowSampleSeq, lastSampleSeq);
|
}
|
if (firstWindowSampleSeq == Long.MAX_VALUE || lastWindowSampleSeq <= 0) {
|
return;
|
}
|
if (sampleSeq < firstWindowSampleSeq || sampleSeq > lastWindowSampleSeq) {
|
throwReplayWindowException(REPLAY_SEEK_OUT_OF_WINDOW, "seek 采样序号不在当前回放窗口内");
|
}
|
}
|
|
private boolean isSameManifestSnapshot(ReplaySessionContext session, String manifestSnapshotKey) {
|
if (session == null) {
|
return false;
|
}
|
String currentManifestSnapshotKey = session.getManifestSnapshotKey();
|
return manifestSnapshotKey == null
|
? currentManifestSnapshotKey == null
|
: manifestSnapshotKey.equals(currentManifestSnapshotKey);
|
}
|
|
private ReplaySessionContext requireSession(String sessionId) {
|
cleanupExpiredSessions();
|
ReplaySessionContext session = sessions.get(sessionId);
|
if (session == null) {
|
throw new IllegalArgumentException("回放会话不存在或已过期");
|
}
|
session.setLastAccessAtMs(System.currentTimeMillis());
|
refreshCurrentDaySessionIfNeeded(session);
|
return session;
|
}
|
|
private void refreshCurrentDaySessionIfNeeded(ReplaySessionContext session) {
|
if (session == null || !isCurrentReplayDay(session.getDay())) {
|
return;
|
}
|
long now = System.currentTimeMillis();
|
Long lastRefreshAtMs = session.getLastManifestRefreshAtMs();
|
if (lastRefreshAtMs != null && (now - lastRefreshAtMs) < CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS) {
|
return;
|
}
|
|
String sessionDay = session.getDay();
|
long windowStartTimeMs = safeLong(session.getWindowStartTimeMs());
|
long windowEndTimeMs = safeLong(session.getWindowEndTimeMs());
|
List<DeviceReplayManifest> manifests = loadSessionManifests(sessionDay, true, false);
|
if (manifests.isEmpty()) {
|
manifestService.scheduleDayManifestRefresh(sessionDay);
|
synchronized (session) {
|
long currentTime = System.currentTimeMillis();
|
Long currentLastRefreshAtMs = session.getLastManifestRefreshAtMs();
|
if (currentLastRefreshAtMs != null
|
&& (currentTime - currentLastRefreshAtMs) < CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS) {
|
return;
|
}
|
session.setLastManifestRefreshAtMs(currentTime);
|
}
|
return;
|
}
|
|
boolean manifestChanged;
|
String manifestSnapshotKey = manifests.isEmpty() ? "" : buildManifestSnapshotKey(manifests);
|
Map<String, DeviceReplayManifest> manifestByStream = manifests.isEmpty()
|
? Collections.emptyMap()
|
: buildManifestSnapshot(manifests);
|
TimelineChunkSnapshotData timelineSnapshot = manifests.isEmpty()
|
? TimelineChunkSnapshotData.empty()
|
: buildTimelineChunkSnapshot(manifests, windowStartTimeMs, windowEndTimeMs);
|
synchronized (session) {
|
long currentTime = System.currentTimeMillis();
|
Long currentLastRefreshAtMs = session.getLastManifestRefreshAtMs();
|
if (currentLastRefreshAtMs != null
|
&& (currentTime - currentLastRefreshAtMs) < CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS) {
|
return;
|
}
|
manifestChanged = !manifests.isEmpty() && !manifestSnapshotKey.equals(session.getManifestSnapshotKey());
|
if (manifestChanged) {
|
session.setManifestByStream(manifestByStream);
|
session.setTimelineChunks(timelineSnapshot.timelineChunks);
|
session.setChunkStreamIdsByChunk(timelineSnapshot.chunkStreamIdsByChunk);
|
session.setStartTimeMs(windowStartTimeMs);
|
session.setEndTimeMs(windowEndTimeMs);
|
session.setManifestSnapshotKey(manifestSnapshotKey);
|
session.setTailCheckpointWarmupScheduled(false);
|
clearSessionCaches(session);
|
}
|
session.setLastManifestRefreshAtMs(currentTime);
|
}
|
|
manifestService.scheduleDayManifestRefresh(sessionDay);
|
if (manifestChanged) {
|
scheduleTailCheckpointWarmup(session.getSessionId());
|
}
|
}
|
|
private List<DeviceReplayManifest> loadSessionManifests(String day,
|
boolean currentReplayDay,
|
boolean allowBuildIfMissing) {
|
List<DeviceReplayManifest> manifests = manifestService.loadDayManifests(day, false);
|
if (!manifests.isEmpty()) {
|
return manifests;
|
}
|
if (allowBuildIfMissing) {
|
return manifestService.loadDayManifests(day, true);
|
}
|
if (!currentReplayDay) {
|
return manifests;
|
}
|
return manifests;
|
}
|
|
private void applyManifestSnapshot(ReplaySessionContext session,
|
List<DeviceReplayManifest> manifests,
|
String manifestSnapshotKey) {
|
if (session == null || manifests == null || manifests.isEmpty()) {
|
return;
|
}
|
long windowStartTimeMs = safeLong(session.getWindowStartTimeMs());
|
long windowEndTimeMs = safeLong(session.getWindowEndTimeMs());
|
Map<String, DeviceReplayManifest> manifestByStream = buildManifestSnapshot(manifests);
|
TimelineChunkSnapshotData timelineSnapshot = buildTimelineChunkSnapshot(manifests, windowStartTimeMs, windowEndTimeMs);
|
synchronized (session) {
|
session.setManifestByStream(manifestByStream);
|
session.setTimelineChunks(timelineSnapshot.timelineChunks);
|
session.setChunkStreamIdsByChunk(timelineSnapshot.chunkStreamIdsByChunk);
|
session.setStartTimeMs(windowStartTimeMs);
|
session.setEndTimeMs(windowEndTimeMs);
|
session.setManifestSnapshotKey(manifestSnapshotKey);
|
session.setTailCheckpointWarmupScheduled(false);
|
clearSessionCaches(session);
|
}
|
}
|
|
private void clearSessionCaches(ReplaySessionContext session) {
|
synchronized (session.getChunkCache()) {
|
session.getChunkCache().clear();
|
}
|
synchronized (session.getCompactChunkCache()) {
|
session.getCompactChunkCache().clear();
|
}
|
synchronized (session.getChunkStateCheckpointCache()) {
|
session.getChunkStateCheckpointCache().clear();
|
}
|
synchronized (session.getSparseChunkStateCheckpointCache()) {
|
session.getSparseChunkStateCheckpointCache().clear();
|
}
|
synchronized (session.getTimelineSummaryCache()) {
|
session.getTimelineSummaryCache().clear();
|
}
|
}
|
|
private ReplayChunk getCachedChunk(ReplaySessionContext session, Integer chunkIndex, boolean fullFrames) {
|
if (session == null || chunkIndex == null) {
|
return null;
|
}
|
Map<Integer, ReplayChunk> cache = fullFrames ? session.getChunkCache() : session.getCompactChunkCache();
|
synchronized (cache) {
|
return cache.get(chunkIndex);
|
}
|
}
|
|
private void cacheChunk(ReplaySessionContext session, Integer chunkIndex, ReplayChunk chunk, boolean fullFrames) {
|
if (session == null || chunkIndex == null || chunk == null) {
|
return;
|
}
|
Map<Integer, ReplayChunk> cache = fullFrames ? session.getChunkCache() : session.getCompactChunkCache();
|
int configuredMaxCacheSize = safePositive(maxChunkCacheEntries, 128);
|
int minimumWarmCacheSize = Math.max(fullFrames ? 24 : 48, safePositive(maxPrefetchChunks, 3) + 8);
|
int maxCacheSize = Math.max(minimumWarmCacheSize, configuredMaxCacheSize);
|
synchronized (cache) {
|
cache.remove(chunkIndex);
|
cache.put(chunkIndex, chunk);
|
while (cache.size() > maxCacheSize) {
|
Integer eldestKey = cache.keySet().iterator().next();
|
cache.remove(eldestKey);
|
}
|
}
|
}
|
|
private void cacheChunkCheckpoint(ReplaySessionContext session, Integer chunkIndex, Map<String, Object> currentStateByStream) {
|
if (session == null || chunkIndex == null || currentStateByStream == null || currentStateByStream.isEmpty()) {
|
return;
|
}
|
Map<Integer, Map<String, Object>> cache = session.getChunkStateCheckpointCache();
|
int configuredMaxCacheSize = safePositive(maxChunkCacheEntries, 128);
|
int maxCacheSize = Math.max(32, configuredMaxCacheSize);
|
synchronized (cache) {
|
cache.remove(chunkIndex);
|
cache.put(chunkIndex, new LinkedHashMap<>(currentStateByStream));
|
while (cache.size() > maxCacheSize) {
|
Integer eldestKey = cache.keySet().iterator().next();
|
cache.remove(eldestKey);
|
}
|
}
|
Map<Integer, Map<String, Object>> sparseCache = session.getSparseChunkStateCheckpointCache();
|
int sparseMaxCacheSize = Math.max(512, safePositive(maxChunkCacheEntries, 128) * 8);
|
synchronized (sparseCache) {
|
sparseCache.remove(chunkIndex);
|
sparseCache.put(chunkIndex, new LinkedHashMap<>(currentStateByStream));
|
while (sparseCache.size() > sparseMaxCacheSize) {
|
Integer eldestKey = sparseCache.keySet().iterator().next();
|
sparseCache.remove(eldestKey);
|
}
|
}
|
}
|
|
private boolean isCurrentReplayDay(String day) {
|
if (day == null || day.isBlank()) {
|
return false;
|
}
|
return day.equals(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE));
|
}
|
|
private ReplayWindow resolveReplayWindow(String day,
|
Long targetTimestamp,
|
Long windowStartTimeMs,
|
Long windowEndTimeMs) {
|
DayBoundary dayBoundary = resolveDayBoundary(day);
|
boolean hasTargetTimestamp = targetTimestamp != null;
|
boolean hasWindowStart = windowStartTimeMs != null;
|
boolean hasWindowEnd = windowEndTimeMs != null;
|
|
if (!hasTargetTimestamp && !hasWindowStart && !hasWindowEnd) {
|
throwReplayWindowException(REPLAY_WINDOW_MISSING, "请选择历史回放目标时间或时间窗口");
|
}
|
if (hasWindowStart != hasWindowEnd) {
|
throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放时间窗口开始和结束时间必须同时传入");
|
}
|
if (hasWindowStart) {
|
return validateExplicitReplayWindow(dayBoundary, windowStartTimeMs, windowEndTimeMs);
|
}
|
return resolveTargetReplayWindow(dayBoundary, targetTimestamp);
|
}
|
|
private DayBoundary resolveDayBoundary(String day) {
|
try {
|
LocalDate replayDay = LocalDate.parse(day, DateTimeFormatter.BASIC_ISO_DATE);
|
long dayStartTimeMs = replayDay.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
return new DayBoundary(dayStartTimeMs, dayStartTimeMs + DAY_END_OFFSET_MS);
|
} catch (DateTimeParseException e) {
|
throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放日期格式无效");
|
return null;
|
}
|
}
|
|
private ReplayWindow validateExplicitReplayWindow(DayBoundary dayBoundary,
|
Long windowStartTimeMs,
|
Long windowEndTimeMs) {
|
if (windowStartTimeMs >= windowEndTimeMs) {
|
throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放时间窗口开始时间必须早于结束时间");
|
}
|
if (!isWithinDay(dayBoundary, windowStartTimeMs) || !isWithinDay(dayBoundary, windowEndTimeMs)) {
|
throwReplayWindowException(REPLAY_WINDOW_CROSS_DAY, "历史回放时间窗口不能跨自然日");
|
}
|
validateReplayWindowSize(windowStartTimeMs, windowEndTimeMs);
|
return new ReplayWindow(windowStartTimeMs, windowEndTimeMs);
|
}
|
|
private ReplayWindow resolveTargetReplayWindow(DayBoundary dayBoundary, Long targetTimestamp) {
|
if (!isWithinDay(dayBoundary, targetTimestamp)) {
|
throwReplayWindowException(REPLAY_WINDOW_CROSS_DAY, "历史回放目标时间不在所选日期内");
|
}
|
long windowStartTimeMs = Math.max(dayBoundary.startTimeMs, targetTimestamp - DEFAULT_REPLAY_WINDOW_HALF_MS);
|
long windowEndTimeMs = Math.min(dayBoundary.endTimeMs, targetTimestamp + DEFAULT_REPLAY_WINDOW_HALF_MS);
|
if (windowStartTimeMs >= windowEndTimeMs) {
|
throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放时间窗口无效");
|
}
|
validateReplayWindowSize(windowStartTimeMs, windowEndTimeMs);
|
return new ReplayWindow(windowStartTimeMs, windowEndTimeMs);
|
}
|
|
private boolean isWithinDay(DayBoundary dayBoundary, Long timestamp) {
|
return dayBoundary != null
|
&& timestamp != null
|
&& timestamp >= dayBoundary.startTimeMs
|
&& timestamp <= dayBoundary.endTimeMs;
|
}
|
|
private void validateReplayWindowSize(long windowStartTimeMs, long windowEndTimeMs) {
|
long maxWindowMs = safePositive(maxWindowMinutes, 240) * 60L * 1000L;
|
if ((windowEndTimeMs - windowStartTimeMs) > maxWindowMs) {
|
throwReplayWindowException(REPLAY_WINDOW_TOO_LARGE, "历史回放时间窗口超过最大允许范围");
|
}
|
}
|
|
private void throwReplayWindowException(String errorCode, String message) {
|
throw new DeviceLogReplayService.ReplayWindowException(errorCode, message);
|
}
|
|
private static class DayBoundary {
|
private final long startTimeMs;
|
private final long endTimeMs;
|
|
private DayBoundary(long startTimeMs, long endTimeMs) {
|
this.startTimeMs = startTimeMs;
|
this.endTimeMs = endTimeMs;
|
}
|
}
|
|
private static class ReplayWindow {
|
private final long startTimeMs;
|
private final long endTimeMs;
|
|
private ReplayWindow(long startTimeMs, long endTimeMs) {
|
this.startTimeMs = startTimeMs;
|
this.endTimeMs = endTimeMs;
|
}
|
}
|
|
private void scheduleTailCheckpointWarmup(String sessionId) {
|
ReplaySessionContext session = sessions.get(sessionId);
|
if (session == null) {
|
return;
|
}
|
synchronized (session) {
|
if (session.isTailCheckpointWarmupScheduled()) {
|
return;
|
}
|
session.setTailCheckpointWarmupScheduled(true);
|
}
|
replayWarmupExecutor.submit(() -> warmupTailCheckpoints(sessionId));
|
}
|
|
private void warmupTailCheckpoints(String sessionId) {
|
ReplaySessionContext session = sessions.get(sessionId);
|
if (session == null || session.getTimelineChunks() == null || session.getTimelineChunks().isEmpty()) {
|
return;
|
}
|
int chunkCount = session.getTimelineChunks().size();
|
int warmupSpan = Math.min(chunkCount, 2048);
|
int warmupStep = 32;
|
int startChunkIndex = Math.max(0, chunkCount - warmupSpan);
|
for (int chunkIndex = startChunkIndex; chunkIndex < chunkCount; chunkIndex += warmupStep) {
|
if (!sessions.containsKey(sessionId)) {
|
return;
|
}
|
try {
|
loadFullChunk(session, chunkIndex);
|
} catch (Exception e) {
|
log.debug("预热 replay 尾部 checkpoint 失败, sessionId={}, chunkIndex={}", sessionId, chunkIndex, e);
|
}
|
}
|
}
|
|
private void cleanupExpiredSessions() {
|
long ttlMs = safePositive(sessionTtlSeconds, 1800) * 1000L;
|
long now = System.currentTimeMillis();
|
for (Map.Entry<String, ReplaySessionContext> entry : sessions.entrySet()) {
|
ReplaySessionContext session = entry.getValue();
|
if (session == null || session.getLastAccessAtMs() == null || (now - session.getLastAccessAtMs()) > ttlMs) {
|
sessions.remove(entry.getKey());
|
}
|
}
|
}
|
|
private void ensureFileMode() {
|
if (!"file".equalsIgnoreCase(storageType)) {
|
throw new IllegalStateException("当前仅支持 file 模式历史回放");
|
}
|
}
|
|
private Path resolveLogPath(DeviceReplayManifest manifest, DeviceReplayChunkMeta chunkMeta) {
|
return Paths.get(loggingPath, manifest.getDay(), chunkMeta.getRelativePath());
|
}
|
|
private long resolveSampleTime(DeviceDataLog logItem) {
|
if (logItem.getSampleTimeMs() != null && logItem.getSampleTimeMs() > 0) {
|
return logItem.getSampleTimeMs();
|
}
|
if (logItem.getCreateTime() != null) {
|
return logItem.getCreateTime().getTime();
|
}
|
return 0L;
|
}
|
|
private long resolveSampleSeq(DeviceDataLog logItem) {
|
if (logItem.getSampleSeq() != null && logItem.getSampleSeq() > 0) {
|
return logItem.getSampleSeq();
|
}
|
return resolveSampleTime(logItem);
|
}
|
|
private long safeLong(Long value) {
|
return value == null ? 0L : value;
|
}
|
|
private int safePositive(Integer value, int fallback) {
|
return value == null || value <= 0 ? fallback : value;
|
}
|
|
private long numberValue(Object value) {
|
if (value instanceof Number) {
|
return ((Number) value).longValue();
|
}
|
if (value == null) {
|
return 0L;
|
}
|
try {
|
return Long.parseLong(String.valueOf(value));
|
} catch (Exception e) {
|
return 0L;
|
}
|
}
|
|
private int intValue(Object value) {
|
if (value instanceof Number) {
|
return ((Number) value).intValue();
|
}
|
if (value == null) {
|
return 0;
|
}
|
try {
|
return Integer.parseInt(String.valueOf(value));
|
} catch (Exception e) {
|
return 0;
|
}
|
}
|
|
private String stringValue(Object value) {
|
return value == null ? "" : String.valueOf(value);
|
}
|
|
private int resolveAbnormalPriority(String level) {
|
if ("ERROR".equalsIgnoreCase(level)) {
|
return 0;
|
}
|
if ("BLOCK".equalsIgnoreCase(level)) {
|
return 1;
|
}
|
return 2;
|
}
|
|
private void warmupEntryManifest(String day, String type, Integer deviceNo, Integer stationId) {
|
if (day == null || type == null || type.isBlank() || deviceNo == null) {
|
return;
|
}
|
manifestService.loadManifest(DeviceReplayStreamKey.of(day, type, deviceNo, stationId), true);
|
}
|
|
private String buildManifestSnapshotKey(List<DeviceReplayManifest> manifests) {
|
if (manifests == null || manifests.isEmpty()) {
|
return "";
|
}
|
StringBuilder builder = new StringBuilder();
|
for (DeviceReplayManifest manifest : manifests) {
|
if (manifest == null) {
|
continue;
|
}
|
builder.append(manifest.getStreamId())
|
.append('#')
|
.append(manifest.getChunks() == null ? 0 : manifest.getChunks().size())
|
.append('#')
|
.append(safeLong(manifest.getFirstSampleSeq()))
|
.append('#')
|
.append(safeLong(manifest.getLastSampleSeq()))
|
.append('#')
|
.append(safeLong(manifest.getFirstSampleTimeMs()))
|
.append('#')
|
.append(safeLong(manifest.getLastSampleTimeMs()))
|
.append(';');
|
}
|
return builder.toString();
|
}
|
|
private boolean equalsLong(Long left, Long right) {
|
if (left == null) {
|
return right == null;
|
}
|
return left.equals(right);
|
}
|
|
private String buildStreamId(String day, String type, Integer deviceNo, Integer stationId) {
|
StringBuilder builder = new StringBuilder();
|
if (day != null) {
|
builder.append(day).append('|');
|
}
|
builder.append(type == null ? "" : type)
|
.append('|')
|
.append(deviceNo == null ? "" : deviceNo);
|
if (stationId != null) {
|
builder.append('|').append(stationId);
|
}
|
return builder.toString();
|
}
|
|
private static class StreamReplayScanResult {
|
private Object baselineState;
|
private List<DeviceReplayState> eventList = new ArrayList<>();
|
}
|
|
private static class TimelineSummarySnapshot {
|
private String sessionId;
|
private String day;
|
private long startTimeMs;
|
private long endTimeMs;
|
private long windowStartTimeMs;
|
private long windowEndTimeMs;
|
private List<Map<String, Object>> timelineChunks = new ArrayList<>();
|
private List<DeviceReplayManifest> manifests = new ArrayList<>();
|
}
|
|
private static class ChunkCheckpointSeed {
|
private Integer chunkIndex;
|
private long scanStartExclusiveTimeMs;
|
private Map<String, Object> stateByStream;
|
}
|
|
private static class ChunkWindowMeta {
|
private String streamId;
|
private DeviceReplayChunkMeta chunkMeta;
|
}
|
|
private static class TimelineChunkSnapshotData {
|
private List<Map<String, Object>> timelineChunks = Collections.emptyList();
|
private Map<Integer, List<String>> chunkStreamIdsByChunk = Collections.emptyMap();
|
|
private static TimelineChunkSnapshotData empty() {
|
return new TimelineChunkSnapshotData();
|
}
|
}
|
|
private static class ResolvedSeekFrame {
|
private Integer chunkIndex;
|
private ReplayChunk chunk;
|
private Map<String, Object> frame;
|
private long drift;
|
}
|
}
|