package com.zy.asrs.service.impl; import com.alibaba.fastjson.JSON; import com.zy.asrs.domain.replay.DeviceReplayChunkMeta; import com.zy.asrs.domain.replay.DeviceReplayManifest; import com.zy.asrs.domain.replay.DeviceReplayState; import com.zy.asrs.domain.replay.DeviceReplayStreamKey; import com.zy.asrs.domain.replay.ReplayChunk; import com.zy.asrs.domain.replay.ReplayFrameSummary; import com.zy.asrs.domain.replay.ReplaySeekResult; import com.zy.asrs.domain.replay.ReplaySessionContext; import com.zy.asrs.entity.DeviceDataLog; import com.zy.asrs.service.DeviceLogReplayManifestService; import com.zy.asrs.service.DeviceLogReplayNormalizer; import com.zy.asrs.service.DeviceLogReplayService; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.io.BufferedReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.LocalDate; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.LinkedHashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Service public class DeviceLogReplayServiceImpl implements DeviceLogReplayService { private static final long CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS = 2_000L; private static final long DEFAULT_REPLAY_WINDOW_HALF_MS = 15L * 60L * 1000L; private static final long DAY_END_OFFSET_MS = 24L * 60L * 60L * 1000L - 1L; private static final String REPLAY_WINDOW_MISSING = "REPLAY_WINDOW_MISSING"; private static final String REPLAY_WINDOW_INVALID = "REPLAY_WINDOW_INVALID"; private static final String REPLAY_WINDOW_CROSS_DAY = "REPLAY_WINDOW_CROSS_DAY"; private static final String REPLAY_WINDOW_TOO_LARGE = "REPLAY_WINDOW_TOO_LARGE"; private static final String REPLAY_SEEK_OUT_OF_WINDOW = "REPLAY_SEEK_OUT_OF_WINDOW"; private final DeviceLogReplayManifestService manifestService; private final DeviceLogReplayNormalizer replayNormalizer; private final ConcurrentHashMap sessions = new ConcurrentHashMap<>(); private final ExecutorService replayWarmupExecutor = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "replay-tail-checkpoint-warmup"); thread.setDaemon(true); return thread; }); @Value("${deviceLogStorage.type}") private String storageType; @Value("${deviceLogStorage.loggingPath}") private String loggingPath; @Value("${deviceReplay.chunkSampleSize:400}") private Integer chunkSampleSize; @Value("${deviceReplay.maxChunkFrames:32}") private Integer maxChunkFrames; @Value("${deviceReplay.timelineBucketSeconds:20}") private Integer timelineBucketSeconds; @Value("${deviceReplay.maxWindowMinutes:240}") private Integer maxWindowMinutes; @Value("${deviceReplay.maxConcurrentSessions:4}") private Integer maxConcurrentSessions; @Value("${deviceReplay.maxPrefetchChunks:3}") private Integer maxPrefetchChunks; @Value("${deviceReplay.maxChunkCacheEntries:128}") private Integer maxChunkCacheEntries; @Value("${deviceReplay.sessionTtlSeconds:1800}") private Integer sessionTtlSeconds; public DeviceLogReplayServiceImpl(DeviceLogReplayManifestService manifestService, DeviceLogReplayNormalizer replayNormalizer) { this.manifestService = manifestService; this.replayNormalizer = replayNormalizer; } @Override public Map createSession(String day, String type, Integer deviceNo, Integer stationId, Long targetTimestamp, Long windowStartTimeMs, Long windowEndTimeMs) { ensureFileMode(); ReplayWindow replayWindow = resolveReplayWindow(day, targetTimestamp, windowStartTimeMs, windowEndTimeMs); cleanupExpiredSessions(); if (sessions.size() >= safePositive(maxConcurrentSessions, 4)) { throw new IllegalStateException("历史回放会话已满,请先关闭未使用的回放页面"); } boolean currentReplayDay = isCurrentReplayDay(day); if (type != null && !type.isBlank() && deviceNo != null) { warmupEntryManifest(day, type, deviceNo, stationId); } List manifests = loadSessionManifests(day, currentReplayDay, true); if (manifests.isEmpty()) { throw new IllegalArgumentException("当天没有可回放清单,请确认该日期日志已生成 replay manifest"); } TimelineChunkSnapshotData timelineSnapshot = buildTimelineChunkSnapshot(manifests, replayWindow.startTimeMs, replayWindow.endTimeMs); ReplaySessionContext session = new ReplaySessionContext(); session.setSessionId(UUID.randomUUID().toString().replace("-", "")); session.setDay(day); session.setCreatedAtMs(System.currentTimeMillis()); session.setLastAccessAtMs(session.getCreatedAtMs()); session.setWindowStartTimeMs(replayWindow.startTimeMs); session.setWindowEndTimeMs(replayWindow.endTimeMs); session.setStartTimeMs(replayWindow.startTimeMs); session.setEndTimeMs(replayWindow.endTimeMs); session.setTimelineChunks(timelineSnapshot.timelineChunks); session.setChunkStreamIdsByChunk(timelineSnapshot.chunkStreamIdsByChunk); session.setLastManifestRefreshAtMs(System.currentTimeMillis()); session.setManifestSnapshotKey(buildManifestSnapshotKey(manifests)); session.setManifestByStream(buildManifestSnapshot(manifests)); sessions.put(session.getSessionId(), session); if (currentReplayDay) { manifestService.scheduleDayManifestRefresh(day); scheduleTailCheckpointWarmup(session.getSessionId()); } Map data = new LinkedHashMap<>(); data.put("sessionId", session.getSessionId()); data.put("day", day); data.put("startTimeMs", session.getStartTimeMs()); data.put("endTimeMs", session.getEndTimeMs()); data.put("windowStartTimeMs", session.getWindowStartTimeMs()); data.put("windowEndTimeMs", session.getWindowEndTimeMs()); data.put("chunkCount", session.getTimelineChunks().size()); data.put("streamCount", session.getManifestByStream().size()); data.put("entryType", type); data.put("entryDeviceNo", deviceNo); data.put("entryStationId", stationId); data.put("maxPrefetchChunks", safePositive(maxPrefetchChunks, 3)); return data; } @Override public Map loadTimeline(String sessionId) { ReplaySessionContext session = requireSession(sessionId); Map data = new LinkedHashMap<>(); data.put("sessionId", session.getSessionId()); data.put("day", session.getDay()); data.put("startTimeMs", session.getStartTimeMs()); data.put("endTimeMs", session.getEndTimeMs()); data.put("windowStartTimeMs", session.getWindowStartTimeMs()); data.put("windowEndTimeMs", session.getWindowEndTimeMs()); data.put("chunks", session.getTimelineChunks()); data.put("maxPrefetchChunks", safePositive(maxPrefetchChunks, 3)); return data; } @Override public Map loadTimelineSummary(String sessionId, Integer bucketCount) { ReplaySessionContext session = requireSession(sessionId); int effectiveBucketCount = resolveSummaryBucketCount(session, bucketCount); synchronized (session.getTimelineSummaryCache()) { Map cachedSummary = session.getTimelineSummaryCache().get(effectiveBucketCount); if (cachedSummary != null) { return cachedSummary; } } TimelineSummarySnapshot summarySnapshot = buildTimelineSummarySnapshot(session); Map summary = buildTimelineSummary(summarySnapshot, effectiveBucketCount); synchronized (session.getTimelineSummaryCache()) { Map cachedSummary = session.getTimelineSummaryCache().get(effectiveBucketCount); if (cachedSummary != null) { return cachedSummary; } session.getTimelineSummaryCache().put(effectiveBucketCount, summary); return summary; } } private TimelineSummarySnapshot buildTimelineSummarySnapshot(ReplaySessionContext session) { TimelineSummarySnapshot snapshot = new TimelineSummarySnapshot(); synchronized (session) { snapshot.sessionId = session.getSessionId(); snapshot.day = session.getDay(); snapshot.startTimeMs = safeLong(session.getStartTimeMs()); snapshot.endTimeMs = safeLong(session.getEndTimeMs()); snapshot.windowStartTimeMs = safeLong(session.getWindowStartTimeMs()); snapshot.windowEndTimeMs = safeLong(session.getWindowEndTimeMs()); snapshot.timelineChunks = new ArrayList<>(session.getTimelineChunks()); snapshot.manifests = new ArrayList<>(session.getManifestByStream().values()); } return snapshot; } private Map buildTimelineSummary(TimelineSummarySnapshot snapshot, int bucketCount) { Map summary = new LinkedHashMap<>(); List> buckets = new ArrayList<>(); if (bucketCount <= 0 || snapshot.timelineChunks.isEmpty()) { summary.put("sessionId", snapshot.sessionId); summary.put("day", snapshot.day); summary.put("bucketCount", 0); summary.put("startTimeMs", snapshot.startTimeMs); summary.put("endTimeMs", snapshot.endTimeMs); summary.put("buckets", buckets); return summary; } long totalDurationMs = Math.max(1L, snapshot.endTimeMs - snapshot.startTimeMs + 1L); long bucketDurationMs = Math.max(1L, (long) Math.ceil(totalDurationMs / (double) bucketCount)); for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) { long bucketStartTime = snapshot.startTimeMs + bucketIndex * bucketDurationMs; if (bucketStartTime > snapshot.endTimeMs) { bucketStartTime = snapshot.endTimeMs; } long bucketEndTime = Math.min(snapshot.endTimeMs, bucketStartTime + bucketDurationMs - 1L); int startChunkIndex = resolveChunkIndex(snapshot.timelineChunks, bucketStartTime, 0L); int endChunkIndex = resolveChunkIndex(snapshot.timelineChunks, bucketEndTime, 0L); if (endChunkIndex < startChunkIndex) { endChunkIndex = startChunkIndex; } buckets.add(buildSummaryBucket(snapshot, bucketIndex, bucketStartTime, bucketEndTime, startChunkIndex, endChunkIndex)); } summary.put("sessionId", snapshot.sessionId); summary.put("day", snapshot.day); summary.put("bucketCount", bucketCount); summary.put("startTimeMs", snapshot.startTimeMs); summary.put("endTimeMs", snapshot.endTimeMs); summary.put("buckets", buckets); return summary; } private Map buildSummaryBucket(TimelineSummarySnapshot snapshot, int bucketIndex, long bucketStartTime, long bucketEndTime, int startChunkIndex, int endChunkIndex) { Map bucket = new LinkedHashMap<>(); bucket.put("bucketIndex", bucketIndex); bucket.put("startTimeMs", clampTimestampToWindow(snapshot, bucketStartTime)); bucket.put("endTimeMs", clampTimestampToWindow(snapshot, bucketEndTime)); int firstDataChunkIndex = -1; int anchorChunkIndex = -1; int anchorFrameIndex = -1; int firstAbnormalChunkIndex = -1; int firstAbnormalFrameIndex = -1; String topEventType = ""; int bucketEstimatedSamples = 0; for (int chunkIndex = startChunkIndex; chunkIndex <= endChunkIndex; chunkIndex++) { Map chunkMeta = resolveTimelineChunk(snapshot.timelineChunks, chunkIndex); int estimatedSamples = intValue(chunkMeta.get("estimatedSamples")); bucketEstimatedSamples += Math.max(0, estimatedSamples); if (firstDataChunkIndex >= 0 || estimatedSamples <= 0) { continue; } firstDataChunkIndex = chunkIndex; } boolean hasReplayData = bucketEstimatedSamples > 0 && firstDataChunkIndex >= 0; int activityScore = hasReplayData ? 100 : 0; int abnormalCount = 0; int errorCount = 0; int blockCount = 0; boolean hasCrnData = false; boolean hasStationData = false; boolean hasRgvData = false; if (hasReplayData) { long visibleBucketStartTime = clampTimestampToWindow(snapshot, bucketStartTime); long visibleBucketEndTime = clampTimestampToWindow(snapshot, bucketEndTime); for (DeviceReplayManifest manifest : snapshot.manifests) { if (manifest == null) { continue; } long manifestStartTime = safeLong(manifest.getFirstSampleTimeMs()); long manifestEndTime = safeLong(manifest.getLastSampleTimeMs()); if (manifestEndTime < visibleBucketStartTime || manifestStartTime > visibleBucketEndTime) { continue; } if ("Devp".equalsIgnoreCase(manifest.getType())) { hasStationData = true; } else if ("Rgv".equalsIgnoreCase(manifest.getType())) { hasRgvData = true; } else if ("Crn".equalsIgnoreCase(manifest.getType()) || "DualCrn".equalsIgnoreCase(manifest.getType())) { hasCrnData = true; } } } if (hasReplayData && anchorChunkIndex < 0) { anchorChunkIndex = firstDataChunkIndex; anchorFrameIndex = 0; } bucket.put("activityScore", activityScore); bucket.put("abnormalCount", abnormalCount); bucket.put("errorCount", errorCount); bucket.put("blockCount", blockCount); bucket.put("hasReplayData", hasReplayData); bucket.put("hasCrnData", hasCrnData); bucket.put("hasStationData", hasStationData); bucket.put("hasRgvData", hasRgvData); bucket.put("anchorChunkIndex", anchorChunkIndex); bucket.put("anchorFrameIndex", anchorFrameIndex); bucket.put("firstAbnormalChunkIndex", firstAbnormalChunkIndex); bucket.put("firstAbnormalFrameIndex", firstAbnormalFrameIndex); bucket.put("topEventType", topEventType); return bucket; } @Override public ReplayChunk loadChunk(String sessionId, Integer chunkIndex) { ReplaySessionContext session = requireSession(sessionId); return projectChunkForClient(session, loadFullChunk(session, chunkIndex)); } private ReplayChunk loadFullChunk(ReplaySessionContext session, Integer chunkIndex) { ReplayChunk cachedChunk = getCachedChunk(session, chunkIndex, true); if (cachedChunk != null) { return cachedChunk; } String manifestSnapshotKey = session.getManifestSnapshotKey(); ReplayChunk chunk = buildChunk(session, chunkIndex, true); if (isSameManifestSnapshot(session, manifestSnapshotKey)) { cacheChunk(session, chunkIndex, chunk, true); } else { chunk = buildChunk(session, chunkIndex, true); } return chunk; } @Override public ReplayChunk loadChunk(String sessionId, Integer chunkIndex, boolean fullFrames) { ReplaySessionContext session = requireSession(sessionId); ReplayChunk cachedChunk = getCachedChunk(session, chunkIndex, fullFrames); if (cachedChunk != null) { return projectChunkForClient(session, cachedChunk); } String manifestSnapshotKey = session.getManifestSnapshotKey(); ReplayChunk chunk = fullFrames ? buildChunk(session, chunkIndex, true) : buildCompactChunk(session, chunkIndex); if (isSameManifestSnapshot(session, manifestSnapshotKey)) { cacheChunk(session, chunkIndex, chunk, fullFrames); } else { chunk = fullFrames ? buildChunk(session, chunkIndex, true) : buildCompactChunk(session, chunkIndex); } return projectChunkForClient(session, chunk); } private ReplayChunk buildCompactChunk(ReplaySessionContext session, Integer chunkIndex) { Map chunkMeta = resolveTimelineChunk(session, chunkIndex); long startTime = numberValue(chunkMeta.get("startTimeMs")); long endTime = numberValue(chunkMeta.get("endTimeMs")); long sampleSeq = numberValue(chunkMeta.get("lastSampleSeq")); ReplayChunk chunk = new ReplayChunk(); chunk.setChunkIndex(chunkIndex); chunk.setStartTimeMs(startTime); chunk.setEndTimeMs(endTime); Map summary = new LinkedHashMap<>(); summary.put("stationCount", 0); summary.put("crnCount", 0); summary.put("dualCrnCount", 0); summary.put("rgvCount", 0); summary.put("abnormalCount", 0); summary.put("errorCount", 0); summary.put("blockCount", 0); summary.put("estimatedSamples", intValue(chunkMeta.get("estimatedSamples"))); Map frame = new LinkedHashMap<>(); frame.put("timestamp", endTime > 0 ? endTime : startTime); frame.put("sampleSeq", sampleSeq > 0 ? sampleSeq : 0L); frame.put("abnormalList", new ArrayList<>()); frame.put("summary", summary); appendFrame(chunk, frame); chunk.setFrameCount(chunk.getFrames().size()); return chunk; } private ReplayChunk buildChunk(ReplaySessionContext session, Integer chunkIndex, boolean fullFrames) { String manifestSnapshotKey = session.getManifestSnapshotKey(); Map chunkMeta = resolveTimelineChunk(session, chunkIndex); long startTime = numberValue(chunkMeta.get("startTimeMs")); long endTime = numberValue(chunkMeta.get("endTimeMs")); Map currentStateByStream = new LinkedHashMap<>(); List eventList = new ArrayList<>(); ChunkCheckpointSeed checkpointSeed = resolveCheckpointSeed(session, chunkIndex); boolean enableBaselineRecovery = fullFrames || checkpointSeed != null; if (checkpointSeed != null && checkpointSeed.stateByStream != null) { currentStateByStream.putAll(checkpointSeed.stateByStream); } long scanStartExclusiveTimeMs = checkpointSeed == null ? 0L : checkpointSeed.scanStartExclusiveTimeMs; boolean allowBaselineLookup = fullFrames && checkpointSeed == null; for (DeviceReplayManifest manifest : resolveChunkManifests(session, chunkIndex)) { StreamReplayScanResult scanResult = scanManifestRange( manifest, startTime, endTime, allowBaselineLookup, scanStartExclusiveTimeMs); if (enableBaselineRecovery && scanResult.baselineState != null) { currentStateByStream.put(manifest.getStreamId(), scanResult.baselineState); } eventList.addAll(scanResult.eventList); } eventList.sort(Comparator .comparing(DeviceReplayState::getTimestamp, Comparator.nullsLast(Long::compareTo)) .thenComparing(DeviceReplayState::getSampleSeq, Comparator.nullsLast(Long::compareTo)) .thenComparing(DeviceReplayState::getType, Comparator.nullsLast(String::compareTo)) .thenComparing(DeviceReplayState::getDeviceNo, Comparator.nullsLast(Integer::compareTo)) .thenComparing(DeviceReplayState::getStationId, Comparator.nullsLast(Integer::compareTo))); ReplayChunk chunk = new ReplayChunk(); chunk.setChunkIndex(chunkIndex); chunk.setStartTimeMs(startTime); chunk.setEndTimeMs(endTime); if (fullFrames && !currentStateByStream.isEmpty()) { Map baseState = replayNormalizer.buildFrame(startTime, 0L, currentStateByStream); chunk.setBaseState(baseState); appendFrame(chunk, baseState); } List keptGroupIndexes = fullFrames ? null : resolveCompactGroupIndexes(eventList); int groupIndex = -1; int cursor = 0; while (cursor < eventList.size()) { groupIndex++; DeviceReplayState currentEvent = eventList.get(cursor); Long frameTime = currentEvent.getTimestamp(); Long frameSeq = currentEvent.getSampleSeq(); while (cursor < eventList.size()) { DeviceReplayState nextEvent = eventList.get(cursor); if (!equalsLong(frameTime, nextEvent.getTimestamp()) || !equalsLong(frameSeq, nextEvent.getSampleSeq())) { break; } String streamId = buildStreamId(session.getDay(), nextEvent.getType(), nextEvent.getDeviceNo(), nextEvent.getStationId()); currentStateByStream.put(streamId, nextEvent.getPayload()); frameSeq = nextEvent.getSampleSeq(); cursor++; } if (fullFrames || keptGroupIndexes.contains(groupIndex)) { Map frame = fullFrames ? replayNormalizer.buildFrame(frameTime, frameSeq, currentStateByStream) : replayNormalizer.buildCompactFrame(frameTime, frameSeq, currentStateByStream); appendFrame(chunk, frame); } } if (chunk.getFrames().isEmpty()) { Map emptyFrame = fullFrames ? replayNormalizer.buildFrame(startTime, 0L, currentStateByStream) : replayNormalizer.buildCompactFrame(startTime, 0L, currentStateByStream); appendFrame(chunk, emptyFrame); } if (fullFrames) { chunk.setFullFrames(new ArrayList<>(chunk.getFrames())); chunk.setFullFrameSummaryList(new ArrayList<>(chunk.getFrameSummaryList())); shrinkChunkFrames(chunk); } chunk.setFrameCount(chunk.getFrames().size()); if (fullFrames && !currentStateByStream.isEmpty() && isSameManifestSnapshot(session, manifestSnapshotKey)) { cacheChunkCheckpoint(session, chunkIndex, currentStateByStream); } return chunk; } private ReplayChunk projectChunkForClient(ReplaySessionContext session, ReplayChunk sourceChunk) { ReplayChunk chunk = new ReplayChunk(); chunk.setChunkIndex(sourceChunk.getChunkIndex()); chunk.setStartTimeMs(clampTimestampToWindow(session, safeLong(sourceChunk.getStartTimeMs()))); chunk.setEndTimeMs(clampTimestampToWindow(session, safeLong(sourceChunk.getEndTimeMs()))); chunk.setBaseState(copyObjectForClient(sourceChunk.getBaseState())); copyVisibleFrames(sourceChunk, chunk); chunk.setFrameCount(chunk.getFrames().size()); return chunk; } @SuppressWarnings("unchecked") private Map copyObjectForClient(Map sourceMap) { if (sourceMap == null) { return null; } return (Map) copyValueForClient(sourceMap); } @SuppressWarnings("unchecked") private Object copyValueForClient(Object sourceValue) { if (sourceValue instanceof Map sourceMap) { Map copiedMap = new LinkedHashMap<>(); for (Map.Entry entry : sourceMap.entrySet()) { copiedMap.put(String.valueOf(entry.getKey()), copyValueForClient(entry.getValue())); } return copiedMap; } if (sourceValue instanceof List sourceList) { List copiedList = new ArrayList<>(sourceList.size()); for (Object item : sourceList) { copiedList.add(copyValueForClient(item)); } return copiedList; } return sourceValue; } private void copyVisibleFrames(ReplayChunk sourceChunk, ReplayChunk targetChunk) { List> sourceFrames = resolveFullFrameList(sourceChunk); List sourceSummaries = resolveFullFrameSummaryList(sourceChunk); long startTime = safeLong(targetChunk.getStartTimeMs()); long endTime = safeLong(targetChunk.getEndTimeMs()); for (int frameIndex = 0; frameIndex < sourceFrames.size(); frameIndex++) { Map frame = sourceFrames.get(frameIndex); long frameTime = numberValue(frame.get("timestamp")); if (frameTime < startTime || frameTime > endTime) { continue; } targetChunk.getFrames().add(copyObjectForClient(frame)); ReplayFrameSummary summary = frameIndex < sourceSummaries.size() ? sourceSummaries.get(frameIndex) : null; targetChunk.getFrameSummaryList().add(copyFrameSummary(summary, targetChunk.getFrameSummaryList().size(), frameTime)); } } private List resolveFullFrameSummaryList(ReplayChunk replayChunk) { if (replayChunk == null) { return new ArrayList<>(); } if (replayChunk.getFullFrameSummaryList() != null && !replayChunk.getFullFrameSummaryList().isEmpty()) { return replayChunk.getFullFrameSummaryList(); } return replayChunk.getFrameSummaryList(); } private ReplayFrameSummary copyFrameSummary(ReplayFrameSummary sourceSummary, int frameIndex, long frameTime) { ReplayFrameSummary summary = new ReplayFrameSummary(); summary.setFrameIndex(frameIndex); summary.setTimestamp(frameTime); summary.setAbnormalCount(sourceSummary == null ? 0 : sourceSummary.getAbnormalCount()); summary.setErrorCount(sourceSummary == null ? 0 : sourceSummary.getErrorCount()); summary.setBlockCount(sourceSummary == null ? 0 : sourceSummary.getBlockCount()); summary.setManualCount(sourceSummary == null ? 0 : sourceSummary.getManualCount()); return summary; } private List> resolveFullFrameList(ReplayChunk replayChunk) { if (replayChunk == null) { return new ArrayList<>(); } if (replayChunk.getFullFrames() != null && !replayChunk.getFullFrames().isEmpty()) { return replayChunk.getFullFrames(); } return replayChunk.getFrames(); } private void shrinkChunkFrames(ReplayChunk chunk) { if (chunk == null || chunk.getFrames() == null || chunk.getFrames().isEmpty()) { return; } int frameLimit = Math.max(16, safePositive(maxChunkFrames, 32)); if (chunk.getFrames().size() <= frameLimit) { return; } LinkedHashSet keptIndexes = new LinkedHashSet<>(); keptIndexes.add(0); keptIndexes.add(chunk.getFrames().size() - 1); for (int index = 0; index < chunk.getFrameSummaryList().size(); index++) { ReplayFrameSummary frameSummary = chunk.getFrameSummaryList().get(index); if (frameSummary != null && safeSummaryActivity(frameSummary) > 0) { keptIndexes.add(index); } } int remainingSlots = Math.max(0, frameLimit - keptIndexes.size()); if (remainingSlots > 0) { double step = (chunk.getFrames().size() - 1D) / Math.max(1D, remainingSlots); for (int slot = 1; slot <= remainingSlots; slot++) { int sampledIndex = (int) Math.round(slot * step); sampledIndex = Math.max(0, Math.min(chunk.getFrames().size() - 1, sampledIndex)); keptIndexes.add(sampledIndex); if (keptIndexes.size() >= frameLimit) { break; } } } List orderedIndexes = new ArrayList<>(keptIndexes); orderedIndexes.sort(Integer::compareTo); List> compactFrames = new ArrayList<>(orderedIndexes.size()); List compactSummaryList = new ArrayList<>(orderedIndexes.size()); for (int compactIndex = 0; compactIndex < orderedIndexes.size(); compactIndex++) { int originalIndex = orderedIndexes.get(compactIndex); compactFrames.add(chunk.getFrames().get(originalIndex)); ReplayFrameSummary originalSummary = originalIndex < chunk.getFrameSummaryList().size() ? chunk.getFrameSummaryList().get(originalIndex) : null; if (originalSummary == null) { originalSummary = new ReplayFrameSummary(); } ReplayFrameSummary compactSummary = new ReplayFrameSummary(); compactSummary.setFrameIndex(compactIndex); compactSummary.setTimestamp(originalSummary.getTimestamp()); compactSummary.setAbnormalCount(originalSummary.getAbnormalCount()); compactSummary.setErrorCount(originalSummary.getErrorCount()); compactSummary.setBlockCount(originalSummary.getBlockCount()); compactSummary.setManualCount(originalSummary.getManualCount()); compactSummaryList.add(compactSummary); } chunk.setFrames(compactFrames); chunk.setFrameSummaryList(compactSummaryList); } private List resolveCompactGroupIndexes(List eventList) { List keptIndexes = new ArrayList<>(); if (eventList == null || eventList.isEmpty()) { return keptIndexes; } int groupCount = 0; int cursor = 0; while (cursor < eventList.size()) { DeviceReplayState currentEvent = eventList.get(cursor); Long frameTime = currentEvent.getTimestamp(); Long frameSeq = currentEvent.getSampleSeq(); while (cursor < eventList.size()) { DeviceReplayState nextEvent = eventList.get(cursor); if (!equalsLong(frameTime, nextEvent.getTimestamp()) || !equalsLong(frameSeq, nextEvent.getSampleSeq())) { break; } cursor++; } groupCount++; } int compactFrameLimit = Math.max(8, safePositive(maxChunkFrames, 32)); if (groupCount <= compactFrameLimit) { for (int index = 0; index < groupCount; index++) { keptIndexes.add(index); } return keptIndexes; } keptIndexes.add(0); keptIndexes.add(groupCount - 1); int remainingSlots = Math.max(0, compactFrameLimit - keptIndexes.size()); double step = (groupCount - 1D) / Math.max(1D, remainingSlots); for (int slot = 1; slot <= remainingSlots; slot++) { int sampledIndex = (int) Math.round(slot * step); sampledIndex = Math.max(0, Math.min(groupCount - 1, sampledIndex)); if (!keptIndexes.contains(sampledIndex)) { keptIndexes.add(sampledIndex); } } keptIndexes.sort(Integer::compareTo); return keptIndexes; } private int safeSummaryActivity(ReplayFrameSummary frameSummary) { if (frameSummary == null) { return 0; } return intValue(frameSummary.getAbnormalCount()) + intValue(frameSummary.getErrorCount()) + intValue(frameSummary.getBlockCount()); } @Override public ReplaySeekResult seek(String sessionId, Long timestamp, Long sampleSeq) { ReplaySessionContext session = requireSession(sessionId); long requestTime = timestamp == null || timestamp <= 0 ? session.getStartTimeMs() : timestamp; rejectTimestampOutsideWindow(session, requestTime, "seek 目标时间不在当前回放窗口内"); long requestSeq = sampleSeq == null || sampleSeq <= 0 ? 0L : sampleSeq; rejectSampleSeqOutsideWindow(session, requestSeq); int initialChunkIndex = resolveChunkIndex(session, requestTime, requestSeq); if (requestSeq <= 0) { ReplaySeekResult result = new ReplaySeekResult(); result.setChunkIndex(initialChunkIndex); result.setFrameIndex(0); result.setRequestedTimestamp(requestTime); result.setResolvedTimestamp(requestTime); result.setRequestedSampleSeq(null); result.setResolvedSampleSeq(null); result.setDriftMs(0L); result.setResolvedFrame(null); return result; } ResolvedSeekFrame resolvedSeekFrame = findNearestSeekFrame(session, sessionId, initialChunkIndex, requestTime, requestSeq); int chunkIndex = resolvedSeekFrame == null ? initialChunkIndex : resolvedSeekFrame.chunkIndex; ReplayChunk chunk = resolvedSeekFrame == null ? loadFullChunk(session, chunkIndex) : resolvedSeekFrame.chunk; long resolvedTime = resolvedSeekFrame == null ? requestTime : numberValue(resolvedSeekFrame.frame.get("timestamp")); long resolvedSeq = resolvedSeekFrame == null ? requestSeq : numberValue(resolvedSeekFrame.frame.get("sampleSeq")); int displayFrameIndex = findNearestDisplayFrameIndex(chunk, resolvedTime); ReplaySeekResult result = new ReplaySeekResult(); result.setChunkIndex(chunkIndex); result.setFrameIndex(displayFrameIndex); result.setRequestedTimestamp(requestTime); result.setResolvedTimestamp(resolvedTime); result.setRequestedSampleSeq(requestSeq > 0 ? requestSeq : null); result.setResolvedSampleSeq(resolvedSeq > 0 ? resolvedSeq : null); result.setDriftMs(requestSeq > 0 ? Math.abs(resolvedSeq - requestSeq) : Math.abs(resolvedTime - requestTime)); result.setResolvedFrame(resolvedSeekFrame == null ? null : resolvedSeekFrame.frame); return result; } private ResolvedSeekFrame findNearestSeekFrame(ReplaySessionContext session, String sessionId, int initialChunkIndex, long requestTime, long requestSeq) { ResolvedSeekFrame bestMatch = evaluateSeekChunk(session, initialChunkIndex, requestTime, requestSeq); long bestDrift = bestMatch == null ? Long.MAX_VALUE : bestMatch.drift; int maxSearchDistance = Math.min(session.getTimelineChunks().size(), 512); for (int distance = 1; distance < maxSearchDistance; distance++) { boolean checkedAny = false; int leftChunkIndex = initialChunkIndex - distance; if (leftChunkIndex >= 0) { long lowerBoundDrift = resolveChunkBoundaryDrift(session, leftChunkIndex, requestTime); if (lowerBoundDrift <= bestDrift) { ResolvedSeekFrame leftMatch = evaluateSeekChunk(session, leftChunkIndex, requestTime, requestSeq); bestMatch = chooseBetterSeekFrame(bestMatch, leftMatch); bestDrift = bestMatch == null ? Long.MAX_VALUE : bestMatch.drift; checkedAny = true; } } int rightChunkIndex = initialChunkIndex + distance; if (rightChunkIndex < session.getTimelineChunks().size()) { long lowerBoundDrift = resolveChunkBoundaryDrift(session, rightChunkIndex, requestTime); if (lowerBoundDrift <= bestDrift) { ResolvedSeekFrame rightMatch = evaluateSeekChunk(session, rightChunkIndex, requestTime, requestSeq); bestMatch = chooseBetterSeekFrame(bestMatch, rightMatch); bestDrift = bestMatch == null ? Long.MAX_VALUE : bestMatch.drift; checkedAny = true; } } if (!checkedAny && bestMatch != null) { break; } } return bestMatch; } private ResolvedSeekFrame evaluateSeekChunk(ReplaySessionContext session, int chunkIndex, long requestTime, long requestSeq) { if (chunkIndex < 0) { return null; } ReplayChunk chunk = loadFullChunk(session, chunkIndex); List> seekFrames = chunk.getFullFrames() == null || chunk.getFullFrames().isEmpty() ? chunk.getFrames() : chunk.getFullFrames(); if (seekFrames == null || seekFrames.isEmpty()) { return null; } Map bestFrame = null; long bestDrift = Long.MAX_VALUE; for (Map frame : seekFrames) { long drift = requestSeq > 0 ? Math.abs(numberValue(frame.get("sampleSeq")) - requestSeq) : Math.abs(numberValue(frame.get("timestamp")) - requestTime); if (drift <= bestDrift) { bestDrift = drift; bestFrame = frame; } } if (bestFrame == null) { return null; } ResolvedSeekFrame resolvedSeekFrame = new ResolvedSeekFrame(); resolvedSeekFrame.chunkIndex = chunkIndex; resolvedSeekFrame.chunk = chunk; resolvedSeekFrame.frame = bestFrame; resolvedSeekFrame.drift = bestDrift; return resolvedSeekFrame; } private ResolvedSeekFrame chooseBetterSeekFrame(ResolvedSeekFrame currentBest, ResolvedSeekFrame challenger) { if (challenger == null) { return currentBest; } if (currentBest == null || challenger.drift <= currentBest.drift) { return challenger; } return currentBest; } private long resolveChunkBoundaryDrift(ReplaySessionContext session, int chunkIndex, long requestTime) { Map chunkMeta = resolveTimelineChunk(session, chunkIndex); long chunkStartTime = numberValue(chunkMeta.get("startTimeMs")); long chunkEndTime = numberValue(chunkMeta.get("endTimeMs")); if (requestTime < chunkStartTime) { return chunkStartTime - requestTime; } if (requestTime > chunkEndTime) { return requestTime - chunkEndTime; } return 0L; } private int findNearestDisplayFrameIndex(ReplayChunk chunk, long resolvedTime) { List> displayFrames = chunk.getFrames(); if (displayFrames == null || displayFrames.isEmpty()) { return 0; } int matchedIndex = 0; long bestDrift = Long.MAX_VALUE; for (int index = 0; index < displayFrames.size(); index++) { Map frame = displayFrames.get(index); long drift = Math.abs(numberValue(frame.get("timestamp")) - resolvedTime); if (drift <= bestDrift) { bestDrift = drift; matchedIndex = index; } } return matchedIndex; } @Override public Map resolveDevice(String sessionId, String type, Integer deviceNo, Integer stationId, Long timestamp) { ReplaySessionContext session = requireSession(sessionId); DeviceReplayManifest manifest = resolveDeviceManifest(session, type, deviceNo, stationId); if (manifest == null) { throw new IllegalArgumentException("未找到对应设备的历史日志"); } long targetTimestamp = timestamp == null || timestamp <= 0 ? session.getStartTimeMs() : timestamp; rejectTimestampOutsideWindow(session, targetTimestamp, "设备详情时间不在当前回放窗口内"); Object resolvedState = resolveDeviceStateAt(manifest, targetTimestamp); if (resolvedState == null) { throw new IllegalArgumentException("当前时间没有该设备的历史详情"); } Map data = new LinkedHashMap<>(); data.put("day", session.getDay()); data.put("type", type); data.put("deviceNo", deviceNo); data.put("stationId", stationId); data.put("timestamp", targetTimestamp); data.put("detail", JSON.parseObject(JSON.toJSONString(resolvedState))); return data; } private DeviceReplayManifest resolveDeviceManifest(ReplaySessionContext session, String type, Integer deviceNo, Integer stationId) { String manifestKey = buildStreamId(session.getDay(), type, deviceNo, stationId); DeviceReplayManifest manifest = session.getManifestByStream().get(manifestKey); if (manifest != null) { return manifest; } if (!"Devp".equalsIgnoreCase(type) || stationId == null) { return null; } for (DeviceReplayManifest currentManifest : session.getManifestByStream().values()) { if (currentManifest == null) { continue; } if (!"Devp".equalsIgnoreCase(currentManifest.getType())) { continue; } if (stationId.equals(currentManifest.getStationId())) { return currentManifest; } } return null; } @Override public void closeSession(String sessionId) { if (sessionId != null) { sessions.remove(sessionId); } } @PreDestroy public void shutdownReplayWarmupExecutor() { replayWarmupExecutor.shutdownNow(); } private TimelineChunkSnapshotData buildTimelineChunkSnapshot(List manifests, long windowStartTimeMs, long windowEndTimeMs) { List allChunkMetas = collectTimelineChunkMetas(manifests, windowStartTimeMs, windowEndTimeMs); if (allChunkMetas.isEmpty() || windowEndTimeMs < windowStartTimeMs) { return TimelineChunkSnapshotData.empty(); } List> chunks = new ArrayList<>(); Map> chunkStreamIdsByChunk = new LinkedHashMap<>(); long maxWindowMs = safePositive(maxWindowMinutes, 240) * 60L * 1000L; long bucketWindowMs = safePositive(timelineBucketSeconds, 20) * 1000L; if (maxWindowMs > 0) { bucketWindowMs = Math.min(bucketWindowMs, maxWindowMs); } bucketWindowMs = Math.max(1_000L, bucketWindowMs); int chunkIndex = 0; for (long windowStart = windowStartTimeMs; windowStart <= windowEndTimeMs; windowStart += bucketWindowMs) { long windowEnd = Math.min(windowEndTimeMs, windowStart + bucketWindowMs - 1); int estimatedSamples = estimateWindowSamples(allChunkMetas, windowStart, windowEnd); int targetChunkSampleSize = Math.max(1, safePositive(chunkSampleSize, 400)); int desiredSplitCount = Math.max(1, (int) Math.ceil(estimatedSamples / (double) targetChunkSampleSize)); int splitCount = Math.min(128, desiredSplitCount); long windowDurationMs = Math.max(1L, windowEnd - windowStart + 1L); long splitDurationMs = Math.max(1L, (long) Math.ceil(windowDurationMs / (double) splitCount)); for (int splitIndex = 0; splitIndex < splitCount; splitIndex++) { long splitStartTime = windowStart + splitIndex * splitDurationMs; if (splitStartTime > windowEnd) { break; } long splitEndTime = Math.min(windowEnd, splitStartTime + splitDurationMs - 1L); int splitEstimatedSamples = estimateWindowSamples(allChunkMetas, splitStartTime, splitEndTime); long firstSampleSeq = resolveWindowFirstSampleSeq(allChunkMetas, splitStartTime, splitEndTime); long lastSampleSeq = resolveWindowLastSampleSeq(allChunkMetas, splitStartTime, splitEndTime); List activeStreamIds = resolveWindowStreamIds(allChunkMetas, splitStartTime, splitEndTime); chunks.add(buildChunkMeta( chunkIndex++, splitStartTime, splitEndTime, splitEstimatedSamples, firstSampleSeq, lastSampleSeq)); chunkStreamIdsByChunk.put(chunkIndex - 1, activeStreamIds); } } TimelineChunkSnapshotData snapshotData = new TimelineChunkSnapshotData(); snapshotData.timelineChunks = Collections.unmodifiableList(new ArrayList<>(chunks)); snapshotData.chunkStreamIdsByChunk = Collections.unmodifiableMap(new LinkedHashMap<>(chunkStreamIdsByChunk)); return snapshotData; } private Map buildManifestSnapshot(List manifests) { Map manifestByStream = new LinkedHashMap<>(); for (DeviceReplayManifest manifest : manifests) { manifestByStream.put(manifest.getStreamId(), manifest); } return Collections.unmodifiableMap(manifestByStream); } private int estimateWindowSamples(List chunkMetas, long windowStart, long windowEnd) { int estimatedSamples = 0; for (ChunkWindowMeta chunkWindowMeta : chunkMetas) { DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta; long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs()); long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs()); if (chunkEndTime < windowStart || chunkStartTime > windowEnd) { continue; } estimatedSamples += Math.max(1, chunkMeta.getSampleCount() == null ? 1 : chunkMeta.getSampleCount()); } return estimatedSamples; } private StreamReplayScanResult scanManifestRange(DeviceReplayManifest manifest, long startTime, long endTime, boolean allowBaselineLookup, long scanStartExclusiveTimeMs) { StreamReplayScanResult result = new StreamReplayScanResult(); List candidates = new ArrayList<>(); DeviceReplayChunkMeta baselineChunk = null; for (DeviceReplayChunkMeta chunkMeta : manifest.getChunks()) { long chunkStart = safeLong(chunkMeta.getFirstSampleTimeMs()); long chunkEnd = safeLong(chunkMeta.getLastSampleTimeMs()); if (allowBaselineLookup && chunkEnd < startTime) { baselineChunk = chunkMeta; continue; } if (chunkEnd <= scanStartExclusiveTimeMs) { continue; } if (chunkStart > endTime) { break; } candidates.add(chunkMeta); } if (allowBaselineLookup && baselineChunk != null) { candidates.add(0, baselineChunk); } for (DeviceReplayChunkMeta chunkMeta : candidates) { Path logPath = resolveLogPath(manifest, chunkMeta); if (!Files.exists(logPath)) { continue; } try (BufferedReader reader = Files.newBufferedReader(logPath, StandardCharsets.UTF_8)) { String line; while ((line = reader.readLine()) != null) { if (line == null || line.isBlank()) { continue; } DeviceDataLog logItem; try { logItem = JSON.parseObject(line, DeviceDataLog.class); } catch (Exception parseError) { log.debug("跳过损坏的 replay 日志行, path={}", logPath, parseError); continue; } if (logItem == null) { continue; } long sampleTime = resolveSampleTime(logItem); Object normalizedState = replayNormalizer.normalizeState(logItem); if (normalizedState == null) { continue; } if (allowBaselineLookup && sampleTime < startTime) { result.baselineState = normalizedState; continue; } if (sampleTime <= scanStartExclusiveTimeMs) { continue; } if (sampleTime > endTime) { break; } DeviceReplayState replayState = new DeviceReplayState(); replayState.setType(manifest.getType()); replayState.setDeviceNo(manifest.getDeviceNo()); replayState.setStationId(manifest.getStationId()); replayState.setTimestamp(sampleTime); replayState.setSampleSeq(resolveSampleSeq(logItem)); replayState.setPayload(normalizedState); result.eventList.add(replayState); } } catch (Exception e) { log.warn("读取 replay chunk 失败, path={}", logPath, e); } } return result; } private Object resolveDeviceStateAt(DeviceReplayManifest manifest, long targetTimestamp) { Object latestState = null; for (DeviceReplayChunkMeta chunkMeta : manifest.getChunks()) { long chunkStart = safeLong(chunkMeta.getFirstSampleTimeMs()); if (chunkStart > targetTimestamp) { break; } Path logPath = resolveLogPath(manifest, chunkMeta); if (!Files.exists(logPath)) { continue; } try (BufferedReader reader = Files.newBufferedReader(logPath, StandardCharsets.UTF_8)) { String line; while ((line = reader.readLine()) != null) { if (line == null || line.isBlank()) { continue; } DeviceDataLog logItem; try { logItem = JSON.parseObject(line, DeviceDataLog.class); } catch (Exception parseError) { log.debug("跳过损坏的回放设备详情日志行, path={}", logPath, parseError); continue; } if (logItem == null) { continue; } long sampleTime = resolveSampleTime(logItem); if (sampleTime > targetTimestamp) { return latestState; } Object normalizedState = replayNormalizer.normalizeState(logItem); if (normalizedState != null) { latestState = normalizedState; } } } catch (Exception e) { log.warn("读取回放设备详情失败, path={}", logPath, e); } } return latestState; } private ChunkCheckpointSeed resolveCheckpointSeed(ReplaySessionContext session, Integer chunkIndex) { if (session == null || chunkIndex == null || chunkIndex <= 0) { return null; } ChunkCheckpointSeed hotCheckpoint = resolveCheckpointSeed(session.getChunkStateCheckpointCache(), session, chunkIndex); if (hotCheckpoint != null) { return hotCheckpoint; } return resolveCheckpointSeed(session.getSparseChunkStateCheckpointCache(), session, chunkIndex); } private ChunkCheckpointSeed resolveCheckpointSeed(Map> checkpointCache, ReplaySessionContext session, Integer chunkIndex) { synchronized (checkpointCache) { for (int checkpointChunkIndex = chunkIndex - 1; checkpointChunkIndex >= 0; checkpointChunkIndex--) { Map checkpointState = checkpointCache.get(checkpointChunkIndex); if (checkpointState == null || checkpointState.isEmpty()) { continue; } Map checkpointChunkMeta = resolveTimelineChunk(session, checkpointChunkIndex); ChunkCheckpointSeed checkpointSeed = new ChunkCheckpointSeed(); checkpointSeed.chunkIndex = checkpointChunkIndex; checkpointSeed.scanStartExclusiveTimeMs = numberValue(checkpointChunkMeta.get("endTimeMs")); checkpointSeed.stateByStream = new LinkedHashMap<>(checkpointState); return checkpointSeed; } } return null; } private void appendFrame(ReplayChunk chunk, Map frame) { int frameIndex = chunk.getFrames().size(); chunk.getFrames().add(frame); ReplayFrameSummary summary = new ReplayFrameSummary(); summary.setFrameIndex(frameIndex); summary.setTimestamp(numberValue(frame.get("timestamp"))); Map frameSummary = frame.get("summary") instanceof Map ? (Map) frame.get("summary") : new LinkedHashMap<>(); summary.setAbnormalCount(intValue(frameSummary.get("abnormalCount"))); summary.setErrorCount(intValue(frameSummary.get("errorCount"))); summary.setBlockCount(intValue(frameSummary.get("blockCount"))); summary.setManualCount(0); chunk.getFrameSummaryList().add(summary); } private Map resolveTimelineChunk(ReplaySessionContext session, Integer chunkIndex) { return resolveTimelineChunk(session.getTimelineChunks(), chunkIndex); } private Map resolveTimelineChunk(List> timelineChunks, Integer chunkIndex) { if (chunkIndex == null || chunkIndex < 0 || chunkIndex >= timelineChunks.size()) { throw new IllegalArgumentException("回放分片不存在"); } return timelineChunks.get(chunkIndex); } private int resolveChunkIndex(ReplaySessionContext session, long timestamp, long sampleSeq) { return resolveChunkIndex(session.getTimelineChunks(), timestamp, sampleSeq); } private int resolveChunkIndex(List> timelineChunks, long timestamp, long sampleSeq) { if (sampleSeq > 0) { for (int i = 0; i < timelineChunks.size(); i++) { Map chunkMeta = timelineChunks.get(i); long firstSampleSeq = numberValue(chunkMeta.get("firstSampleSeq")); long lastSampleSeq = numberValue(chunkMeta.get("lastSampleSeq")); if (sampleSeq >= firstSampleSeq && sampleSeq <= lastSampleSeq) { return i; } } } for (int i = 0; i < timelineChunks.size(); i++) { Map chunkMeta = timelineChunks.get(i); long startTime = numberValue(chunkMeta.get("startTimeMs")); long endTime = numberValue(chunkMeta.get("endTimeMs")); if (timestamp >= startTime && timestamp <= endTime) { return i; } } return Math.max(0, timelineChunks.size() - 1); } private int resolveSummaryBucketCount(ReplaySessionContext session, Integer bucketCount) { int totalChunkCount = session.getTimelineChunks().size(); if (totalChunkCount <= 0) { return 0; } int defaultBucketCount = Math.min(totalChunkCount, 96); int safeBucketCount = bucketCount == null || bucketCount <= 0 ? defaultBucketCount : bucketCount; return Math.max(1, Math.min(totalChunkCount, safeBucketCount)); } private Map buildChunkMeta(int chunkIndex, long startTimeMs, long endTimeMs, int estimatedSamples, long firstSampleSeq, long lastSampleSeq) { Map chunkMeta = new LinkedHashMap<>(); chunkMeta.put("chunkIndex", chunkIndex); chunkMeta.put("startTimeMs", startTimeMs); chunkMeta.put("endTimeMs", endTimeMs); chunkMeta.put("estimatedSamples", estimatedSamples); chunkMeta.put("firstSampleSeq", firstSampleSeq); chunkMeta.put("lastSampleSeq", lastSampleSeq); return chunkMeta; } private long resolveWindowFirstSampleSeq(List chunkMetas, long windowStart, long windowEnd) { long firstSampleSeq = Long.MAX_VALUE; for (ChunkWindowMeta chunkWindowMeta : chunkMetas) { DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta; long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs()); long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs()); if (chunkEndTime < windowStart || chunkStartTime > windowEnd) { continue; } long candidate = safeLong(chunkMeta.getFirstSampleSeq()); if (candidate > 0) { firstSampleSeq = Math.min(firstSampleSeq, candidate); } } return firstSampleSeq == Long.MAX_VALUE ? 0L : firstSampleSeq; } private long resolveWindowLastSampleSeq(List chunkMetas, long windowStart, long windowEnd) { long lastSampleSeq = 0L; for (ChunkWindowMeta chunkWindowMeta : chunkMetas) { DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta; long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs()); long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs()); if (chunkEndTime < windowStart || chunkStartTime > windowEnd) { continue; } lastSampleSeq = Math.max(lastSampleSeq, safeLong(chunkMeta.getLastSampleSeq())); } return lastSampleSeq; } private List resolveChunkManifests(ReplaySessionContext session, Integer chunkIndex) { if (session == null || chunkIndex == null) { return new ArrayList<>(); } List streamIds = session.getChunkStreamIdsByChunk().get(chunkIndex); if (streamIds == null) { return new ArrayList<>(session.getManifestByStream().values()); } List manifests = new ArrayList<>(streamIds.size()); for (String streamId : streamIds) { DeviceReplayManifest manifest = session.getManifestByStream().get(streamId); if (manifest != null) { manifests.add(manifest); } } return manifests; } private List collectTimelineChunkMetas(List manifests, long windowStartTimeMs, long windowEndTimeMs) { List chunkMetas = new ArrayList<>(); for (DeviceReplayManifest manifest : manifests) { if (manifest == null || manifest.getChunks() == null || manifest.getChunks().isEmpty()) { continue; } for (DeviceReplayChunkMeta chunkMeta : manifest.getChunks()) { long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs()); long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs()); if (chunkStartTime <= 0 || chunkEndTime < chunkStartTime) { continue; } if (chunkEndTime < windowStartTimeMs || chunkStartTime > windowEndTimeMs) { continue; } ChunkWindowMeta chunkWindowMeta = new ChunkWindowMeta(); chunkWindowMeta.streamId = manifest.getStreamId(); chunkWindowMeta.chunkMeta = chunkMeta; chunkMetas.add(chunkWindowMeta); } } return chunkMetas; } private List resolveWindowStreamIds(List chunkMetas, long windowStartTimeMs, long windowEndTimeMs) { LinkedHashSet streamIds = new LinkedHashSet<>(); for (ChunkWindowMeta chunkWindowMeta : chunkMetas) { DeviceReplayChunkMeta chunkMeta = chunkWindowMeta.chunkMeta; long chunkStartTime = safeLong(chunkMeta.getFirstSampleTimeMs()); long chunkEndTime = safeLong(chunkMeta.getLastSampleTimeMs()); if (chunkEndTime < windowStartTimeMs || chunkStartTime > windowEndTimeMs) { continue; } streamIds.add(chunkWindowMeta.streamId); } return new ArrayList<>(streamIds); } private long clampTimestampToWindow(ReplaySessionContext session, long timestamp) { long windowStartTimeMs = safeLong(session.getWindowStartTimeMs()); long windowEndTimeMs = safeLong(session.getWindowEndTimeMs()); if (timestamp < windowStartTimeMs) { return windowStartTimeMs; } if (timestamp > windowEndTimeMs) { return windowEndTimeMs; } return timestamp; } private long clampTimestampToWindow(TimelineSummarySnapshot snapshot, long timestamp) { if (timestamp < snapshot.windowStartTimeMs) { return snapshot.windowStartTimeMs; } if (timestamp > snapshot.windowEndTimeMs) { return snapshot.windowEndTimeMs; } return timestamp; } private void rejectTimestampOutsideWindow(ReplaySessionContext session, long timestamp, String message) { long windowStartTimeMs = safeLong(session.getWindowStartTimeMs()); long windowEndTimeMs = safeLong(session.getWindowEndTimeMs()); if (timestamp < windowStartTimeMs || timestamp > windowEndTimeMs) { throwReplayWindowException(REPLAY_SEEK_OUT_OF_WINDOW, message); } } private void rejectSampleSeqOutsideWindow(ReplaySessionContext session, long sampleSeq) { if (sampleSeq <= 0) { return; } long firstWindowSampleSeq = Long.MAX_VALUE; long lastWindowSampleSeq = 0L; for (Map chunkMeta : session.getTimelineChunks()) { long firstSampleSeq = numberValue(chunkMeta.get("firstSampleSeq")); long lastSampleSeq = numberValue(chunkMeta.get("lastSampleSeq")); if (firstSampleSeq <= 0 || lastSampleSeq <= 0) { continue; } firstWindowSampleSeq = Math.min(firstWindowSampleSeq, firstSampleSeq); lastWindowSampleSeq = Math.max(lastWindowSampleSeq, lastSampleSeq); } if (firstWindowSampleSeq == Long.MAX_VALUE || lastWindowSampleSeq <= 0) { return; } if (sampleSeq < firstWindowSampleSeq || sampleSeq > lastWindowSampleSeq) { throwReplayWindowException(REPLAY_SEEK_OUT_OF_WINDOW, "seek 采样序号不在当前回放窗口内"); } } private boolean isSameManifestSnapshot(ReplaySessionContext session, String manifestSnapshotKey) { if (session == null) { return false; } String currentManifestSnapshotKey = session.getManifestSnapshotKey(); return manifestSnapshotKey == null ? currentManifestSnapshotKey == null : manifestSnapshotKey.equals(currentManifestSnapshotKey); } private ReplaySessionContext requireSession(String sessionId) { cleanupExpiredSessions(); ReplaySessionContext session = sessions.get(sessionId); if (session == null) { throw new IllegalArgumentException("回放会话不存在或已过期"); } session.setLastAccessAtMs(System.currentTimeMillis()); refreshCurrentDaySessionIfNeeded(session); return session; } private void refreshCurrentDaySessionIfNeeded(ReplaySessionContext session) { if (session == null || !isCurrentReplayDay(session.getDay())) { return; } long now = System.currentTimeMillis(); Long lastRefreshAtMs = session.getLastManifestRefreshAtMs(); if (lastRefreshAtMs != null && (now - lastRefreshAtMs) < CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS) { return; } String sessionDay = session.getDay(); long windowStartTimeMs = safeLong(session.getWindowStartTimeMs()); long windowEndTimeMs = safeLong(session.getWindowEndTimeMs()); List manifests = loadSessionManifests(sessionDay, true, false); if (manifests.isEmpty()) { manifestService.scheduleDayManifestRefresh(sessionDay); synchronized (session) { long currentTime = System.currentTimeMillis(); Long currentLastRefreshAtMs = session.getLastManifestRefreshAtMs(); if (currentLastRefreshAtMs != null && (currentTime - currentLastRefreshAtMs) < CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS) { return; } session.setLastManifestRefreshAtMs(currentTime); } return; } boolean manifestChanged; String manifestSnapshotKey = manifests.isEmpty() ? "" : buildManifestSnapshotKey(manifests); Map manifestByStream = manifests.isEmpty() ? Collections.emptyMap() : buildManifestSnapshot(manifests); TimelineChunkSnapshotData timelineSnapshot = manifests.isEmpty() ? TimelineChunkSnapshotData.empty() : buildTimelineChunkSnapshot(manifests, windowStartTimeMs, windowEndTimeMs); synchronized (session) { long currentTime = System.currentTimeMillis(); Long currentLastRefreshAtMs = session.getLastManifestRefreshAtMs(); if (currentLastRefreshAtMs != null && (currentTime - currentLastRefreshAtMs) < CURRENT_DAY_MANIFEST_REFRESH_INTERVAL_MS) { return; } manifestChanged = !manifests.isEmpty() && !manifestSnapshotKey.equals(session.getManifestSnapshotKey()); if (manifestChanged) { session.setManifestByStream(manifestByStream); session.setTimelineChunks(timelineSnapshot.timelineChunks); session.setChunkStreamIdsByChunk(timelineSnapshot.chunkStreamIdsByChunk); session.setStartTimeMs(windowStartTimeMs); session.setEndTimeMs(windowEndTimeMs); session.setManifestSnapshotKey(manifestSnapshotKey); session.setTailCheckpointWarmupScheduled(false); clearSessionCaches(session); } session.setLastManifestRefreshAtMs(currentTime); } manifestService.scheduleDayManifestRefresh(sessionDay); if (manifestChanged) { scheduleTailCheckpointWarmup(session.getSessionId()); } } private List loadSessionManifests(String day, boolean currentReplayDay, boolean allowBuildIfMissing) { List manifests = manifestService.loadDayManifests(day, false); if (!manifests.isEmpty()) { return manifests; } if (allowBuildIfMissing) { return manifestService.loadDayManifests(day, true); } if (!currentReplayDay) { return manifests; } return manifests; } private void applyManifestSnapshot(ReplaySessionContext session, List manifests, String manifestSnapshotKey) { if (session == null || manifests == null || manifests.isEmpty()) { return; } long windowStartTimeMs = safeLong(session.getWindowStartTimeMs()); long windowEndTimeMs = safeLong(session.getWindowEndTimeMs()); Map manifestByStream = buildManifestSnapshot(manifests); TimelineChunkSnapshotData timelineSnapshot = buildTimelineChunkSnapshot(manifests, windowStartTimeMs, windowEndTimeMs); synchronized (session) { session.setManifestByStream(manifestByStream); session.setTimelineChunks(timelineSnapshot.timelineChunks); session.setChunkStreamIdsByChunk(timelineSnapshot.chunkStreamIdsByChunk); session.setStartTimeMs(windowStartTimeMs); session.setEndTimeMs(windowEndTimeMs); session.setManifestSnapshotKey(manifestSnapshotKey); session.setTailCheckpointWarmupScheduled(false); clearSessionCaches(session); } } private void clearSessionCaches(ReplaySessionContext session) { synchronized (session.getChunkCache()) { session.getChunkCache().clear(); } synchronized (session.getCompactChunkCache()) { session.getCompactChunkCache().clear(); } synchronized (session.getChunkStateCheckpointCache()) { session.getChunkStateCheckpointCache().clear(); } synchronized (session.getSparseChunkStateCheckpointCache()) { session.getSparseChunkStateCheckpointCache().clear(); } synchronized (session.getTimelineSummaryCache()) { session.getTimelineSummaryCache().clear(); } } private ReplayChunk getCachedChunk(ReplaySessionContext session, Integer chunkIndex, boolean fullFrames) { if (session == null || chunkIndex == null) { return null; } Map cache = fullFrames ? session.getChunkCache() : session.getCompactChunkCache(); synchronized (cache) { return cache.get(chunkIndex); } } private void cacheChunk(ReplaySessionContext session, Integer chunkIndex, ReplayChunk chunk, boolean fullFrames) { if (session == null || chunkIndex == null || chunk == null) { return; } Map cache = fullFrames ? session.getChunkCache() : session.getCompactChunkCache(); int configuredMaxCacheSize = safePositive(maxChunkCacheEntries, 128); int minimumWarmCacheSize = Math.max(fullFrames ? 24 : 48, safePositive(maxPrefetchChunks, 3) + 8); int maxCacheSize = Math.max(minimumWarmCacheSize, configuredMaxCacheSize); synchronized (cache) { cache.remove(chunkIndex); cache.put(chunkIndex, chunk); while (cache.size() > maxCacheSize) { Integer eldestKey = cache.keySet().iterator().next(); cache.remove(eldestKey); } } } private void cacheChunkCheckpoint(ReplaySessionContext session, Integer chunkIndex, Map currentStateByStream) { if (session == null || chunkIndex == null || currentStateByStream == null || currentStateByStream.isEmpty()) { return; } Map> cache = session.getChunkStateCheckpointCache(); int configuredMaxCacheSize = safePositive(maxChunkCacheEntries, 128); int maxCacheSize = Math.max(32, configuredMaxCacheSize); synchronized (cache) { cache.remove(chunkIndex); cache.put(chunkIndex, new LinkedHashMap<>(currentStateByStream)); while (cache.size() > maxCacheSize) { Integer eldestKey = cache.keySet().iterator().next(); cache.remove(eldestKey); } } Map> sparseCache = session.getSparseChunkStateCheckpointCache(); int sparseMaxCacheSize = Math.max(512, safePositive(maxChunkCacheEntries, 128) * 8); synchronized (sparseCache) { sparseCache.remove(chunkIndex); sparseCache.put(chunkIndex, new LinkedHashMap<>(currentStateByStream)); while (sparseCache.size() > sparseMaxCacheSize) { Integer eldestKey = sparseCache.keySet().iterator().next(); sparseCache.remove(eldestKey); } } } private boolean isCurrentReplayDay(String day) { if (day == null || day.isBlank()) { return false; } return day.equals(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE)); } private ReplayWindow resolveReplayWindow(String day, Long targetTimestamp, Long windowStartTimeMs, Long windowEndTimeMs) { DayBoundary dayBoundary = resolveDayBoundary(day); boolean hasTargetTimestamp = targetTimestamp != null; boolean hasWindowStart = windowStartTimeMs != null; boolean hasWindowEnd = windowEndTimeMs != null; if (!hasTargetTimestamp && !hasWindowStart && !hasWindowEnd) { throwReplayWindowException(REPLAY_WINDOW_MISSING, "请选择历史回放目标时间或时间窗口"); } if (hasWindowStart != hasWindowEnd) { throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放时间窗口开始和结束时间必须同时传入"); } if (hasWindowStart) { return validateExplicitReplayWindow(dayBoundary, windowStartTimeMs, windowEndTimeMs); } return resolveTargetReplayWindow(dayBoundary, targetTimestamp); } private DayBoundary resolveDayBoundary(String day) { try { LocalDate replayDay = LocalDate.parse(day, DateTimeFormatter.BASIC_ISO_DATE); long dayStartTimeMs = replayDay.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli(); return new DayBoundary(dayStartTimeMs, dayStartTimeMs + DAY_END_OFFSET_MS); } catch (DateTimeParseException e) { throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放日期格式无效"); return null; } } private ReplayWindow validateExplicitReplayWindow(DayBoundary dayBoundary, Long windowStartTimeMs, Long windowEndTimeMs) { if (windowStartTimeMs >= windowEndTimeMs) { throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放时间窗口开始时间必须早于结束时间"); } if (!isWithinDay(dayBoundary, windowStartTimeMs) || !isWithinDay(dayBoundary, windowEndTimeMs)) { throwReplayWindowException(REPLAY_WINDOW_CROSS_DAY, "历史回放时间窗口不能跨自然日"); } validateReplayWindowSize(windowStartTimeMs, windowEndTimeMs); return new ReplayWindow(windowStartTimeMs, windowEndTimeMs); } private ReplayWindow resolveTargetReplayWindow(DayBoundary dayBoundary, Long targetTimestamp) { if (!isWithinDay(dayBoundary, targetTimestamp)) { throwReplayWindowException(REPLAY_WINDOW_CROSS_DAY, "历史回放目标时间不在所选日期内"); } long windowStartTimeMs = Math.max(dayBoundary.startTimeMs, targetTimestamp - DEFAULT_REPLAY_WINDOW_HALF_MS); long windowEndTimeMs = Math.min(dayBoundary.endTimeMs, targetTimestamp + DEFAULT_REPLAY_WINDOW_HALF_MS); if (windowStartTimeMs >= windowEndTimeMs) { throwReplayWindowException(REPLAY_WINDOW_INVALID, "历史回放时间窗口无效"); } validateReplayWindowSize(windowStartTimeMs, windowEndTimeMs); return new ReplayWindow(windowStartTimeMs, windowEndTimeMs); } private boolean isWithinDay(DayBoundary dayBoundary, Long timestamp) { return dayBoundary != null && timestamp != null && timestamp >= dayBoundary.startTimeMs && timestamp <= dayBoundary.endTimeMs; } private void validateReplayWindowSize(long windowStartTimeMs, long windowEndTimeMs) { long maxWindowMs = safePositive(maxWindowMinutes, 240) * 60L * 1000L; if ((windowEndTimeMs - windowStartTimeMs) > maxWindowMs) { throwReplayWindowException(REPLAY_WINDOW_TOO_LARGE, "历史回放时间窗口超过最大允许范围"); } } private void throwReplayWindowException(String errorCode, String message) { throw new DeviceLogReplayService.ReplayWindowException(errorCode, message); } private static class DayBoundary { private final long startTimeMs; private final long endTimeMs; private DayBoundary(long startTimeMs, long endTimeMs) { this.startTimeMs = startTimeMs; this.endTimeMs = endTimeMs; } } private static class ReplayWindow { private final long startTimeMs; private final long endTimeMs; private ReplayWindow(long startTimeMs, long endTimeMs) { this.startTimeMs = startTimeMs; this.endTimeMs = endTimeMs; } } private void scheduleTailCheckpointWarmup(String sessionId) { ReplaySessionContext session = sessions.get(sessionId); if (session == null) { return; } synchronized (session) { if (session.isTailCheckpointWarmupScheduled()) { return; } session.setTailCheckpointWarmupScheduled(true); } replayWarmupExecutor.submit(() -> warmupTailCheckpoints(sessionId)); } private void warmupTailCheckpoints(String sessionId) { ReplaySessionContext session = sessions.get(sessionId); if (session == null || session.getTimelineChunks() == null || session.getTimelineChunks().isEmpty()) { return; } int chunkCount = session.getTimelineChunks().size(); int warmupSpan = Math.min(chunkCount, 2048); int warmupStep = 32; int startChunkIndex = Math.max(0, chunkCount - warmupSpan); for (int chunkIndex = startChunkIndex; chunkIndex < chunkCount; chunkIndex += warmupStep) { if (!sessions.containsKey(sessionId)) { return; } try { loadFullChunk(session, chunkIndex); } catch (Exception e) { log.debug("预热 replay 尾部 checkpoint 失败, sessionId={}, chunkIndex={}", sessionId, chunkIndex, e); } } } private void cleanupExpiredSessions() { long ttlMs = safePositive(sessionTtlSeconds, 1800) * 1000L; long now = System.currentTimeMillis(); for (Map.Entry entry : sessions.entrySet()) { ReplaySessionContext session = entry.getValue(); if (session == null || session.getLastAccessAtMs() == null || (now - session.getLastAccessAtMs()) > ttlMs) { sessions.remove(entry.getKey()); } } } private void ensureFileMode() { if (!"file".equalsIgnoreCase(storageType)) { throw new IllegalStateException("当前仅支持 file 模式历史回放"); } } private Path resolveLogPath(DeviceReplayManifest manifest, DeviceReplayChunkMeta chunkMeta) { return Paths.get(loggingPath, manifest.getDay(), chunkMeta.getRelativePath()); } private long resolveSampleTime(DeviceDataLog logItem) { if (logItem.getSampleTimeMs() != null && logItem.getSampleTimeMs() > 0) { return logItem.getSampleTimeMs(); } if (logItem.getCreateTime() != null) { return logItem.getCreateTime().getTime(); } return 0L; } private long resolveSampleSeq(DeviceDataLog logItem) { if (logItem.getSampleSeq() != null && logItem.getSampleSeq() > 0) { return logItem.getSampleSeq(); } return resolveSampleTime(logItem); } private long safeLong(Long value) { return value == null ? 0L : value; } private int safePositive(Integer value, int fallback) { return value == null || value <= 0 ? fallback : value; } private long numberValue(Object value) { if (value instanceof Number) { return ((Number) value).longValue(); } if (value == null) { return 0L; } try { return Long.parseLong(String.valueOf(value)); } catch (Exception e) { return 0L; } } private int intValue(Object value) { if (value instanceof Number) { return ((Number) value).intValue(); } if (value == null) { return 0; } try { return Integer.parseInt(String.valueOf(value)); } catch (Exception e) { return 0; } } private String stringValue(Object value) { return value == null ? "" : String.valueOf(value); } private int resolveAbnormalPriority(String level) { if ("ERROR".equalsIgnoreCase(level)) { return 0; } if ("BLOCK".equalsIgnoreCase(level)) { return 1; } return 2; } private void warmupEntryManifest(String day, String type, Integer deviceNo, Integer stationId) { if (day == null || type == null || type.isBlank() || deviceNo == null) { return; } manifestService.loadManifest(DeviceReplayStreamKey.of(day, type, deviceNo, stationId), true); } private String buildManifestSnapshotKey(List manifests) { if (manifests == null || manifests.isEmpty()) { return ""; } StringBuilder builder = new StringBuilder(); for (DeviceReplayManifest manifest : manifests) { if (manifest == null) { continue; } builder.append(manifest.getStreamId()) .append('#') .append(manifest.getChunks() == null ? 0 : manifest.getChunks().size()) .append('#') .append(safeLong(manifest.getFirstSampleSeq())) .append('#') .append(safeLong(manifest.getLastSampleSeq())) .append('#') .append(safeLong(manifest.getFirstSampleTimeMs())) .append('#') .append(safeLong(manifest.getLastSampleTimeMs())) .append(';'); } return builder.toString(); } private boolean equalsLong(Long left, Long right) { if (left == null) { return right == null; } return left.equals(right); } private String buildStreamId(String day, String type, Integer deviceNo, Integer stationId) { StringBuilder builder = new StringBuilder(); if (day != null) { builder.append(day).append('|'); } builder.append(type == null ? "" : type) .append('|') .append(deviceNo == null ? "" : deviceNo); if (stationId != null) { builder.append('|').append(stationId); } return builder.toString(); } private static class StreamReplayScanResult { private Object baselineState; private List eventList = new ArrayList<>(); } private static class TimelineSummarySnapshot { private String sessionId; private String day; private long startTimeMs; private long endTimeMs; private long windowStartTimeMs; private long windowEndTimeMs; private List> timelineChunks = new ArrayList<>(); private List manifests = new ArrayList<>(); } private static class ChunkCheckpointSeed { private Integer chunkIndex; private long scanStartExclusiveTimeMs; private Map stateByStream; } private static class ChunkWindowMeta { private String streamId; private DeviceReplayChunkMeta chunkMeta; } private static class TimelineChunkSnapshotData { private List> timelineChunks = Collections.emptyList(); private Map> chunkStreamIdsByChunk = Collections.emptyMap(); private static TimelineChunkSnapshotData empty() { return new TimelineChunkSnapshotData(); } } private static class ResolvedSeekFrame { private Integer chunkIndex; private ReplayChunk chunk; private Map frame; private long drift; } }