package com.zy.asrs.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.zy.asrs.domain.replay.DeviceReplayChunkMeta; import com.zy.asrs.domain.replay.DeviceReplayManifest; import com.zy.asrs.domain.replay.DeviceReplayStreamKey; import com.zy.asrs.entity.DeviceDataLog; import com.zy.asrs.service.DeviceLogReplayManifestService; import lombok.extern.slf4j.Slf4j; import jakarta.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.io.BufferedReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.nio.file.Path; import java.nio.file.Paths; import java.time.ZoneId; import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j @Service public class DeviceLogReplayManifestServiceImpl implements DeviceLogReplayManifestService { private static final String MANIFEST_DIR_NAME = ".replay-manifest"; private static final int MANIFEST_SCHEMA_VERSION = 1; private static final Pattern DEFAULT_FILE_PATTERN = Pattern.compile("^([^_]+)_(\\d+)_(\\d{8})_(\\d+)\\.log$"); private static final Pattern DEVP_FILE_PATTERN = Pattern.compile("^([^_]+)_(\\d+)_station_(\\d+)_(\\d{8})_(\\d+)\\.log$"); private static final long MANIFEST_STALE_THRESHOLD_MS = 5_000L; private static final long EXECUTOR_SHUTDOWN_WAIT_MS = 5_000L; private static final ZoneId REPLAY_ZONE = ZoneId.systemDefault(); @Value("${deviceLogStorage.loggingPath}") private String loggingPath; @Value("${deviceLogStorage.manifestRefreshDebounceMs:1000}") private long manifestRefreshDebounceMs; private final ExecutorService dayRefreshExecutor = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "replay-manifest-refresh"); thread.setDaemon(true); return thread; }); private final ScheduledExecutorService streamRefreshExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable, "replay-manifest-stream-refresh"); thread.setDaemon(true); return thread; }); private final ConcurrentHashMap refreshingDays = new ConcurrentHashMap<>(); private final ConcurrentHashMap refreshingStreams = new ConcurrentHashMap<>(); private final ConcurrentHashMap streamRefreshStateByStream = new ConcurrentHashMap<>(); private final AtomicBoolean shuttingDown = new AtomicBoolean(false); @Override public DeviceReplayManifest loadManifest(DeviceReplayStreamKey streamKey, boolean autoBuild) { if (streamKey == null || streamKey.getDay() == null) { return null; } Path manifestPath = resolveManifestPath(streamKey); if (Files.exists(manifestPath)) { try { String json = Files.readString(manifestPath, StandardCharsets.UTF_8); return JSON.parseObject(json, DeviceReplayManifest.class); } catch (Exception e) { log.warn("读取 replay manifest 失败,准备重建, path={}", manifestPath, e); } } if (!autoBuild) { return null; } return rebuildManifest(streamKey); } @Override public List loadDayManifests(String day, boolean autoBuild) { List manifests = new ArrayList<>(); for (DeviceReplayStreamKey streamKey : discoverStreamKeys(day, autoBuild)) { DeviceReplayManifest manifest = loadManifest(streamKey, autoBuild); if (manifest != null) { manifests.add(manifest); } } manifests.sort(Comparator .comparing(DeviceReplayManifest::getFirstSampleTimeMs, Comparator.nullsLast(Long::compareTo)) .thenComparing(DeviceReplayManifest::getType, Comparator.nullsLast(String::compareTo)) .thenComparing(DeviceReplayManifest::getDeviceNo, Comparator.nullsLast(Integer::compareTo)) .thenComparing(DeviceReplayManifest::getStationId, Comparator.nullsLast(Integer::compareTo))); return manifests; } @Override public List refreshDayManifestsIfStale(String day) { if (day == null || day.isBlank()) { return new ArrayList<>(); } List staleStreamKeys = resolveStaleStreamKeys(day); if (staleStreamKeys.isEmpty()) { return loadDayManifests(day, false); } for (DeviceReplayStreamKey streamKey : staleStreamKeys) { refreshStreamManifestIfStale(streamKey); } return loadDayManifests(day, false); } @Override public void scheduleDayManifestRefresh(String day) { if (day == null || day.isBlank()) { return; } if (shuttingDown.get()) { return; } if (refreshingDays.putIfAbsent(day, Boolean.TRUE) != null) { return; } try { dayRefreshExecutor.submit(() -> { try { refreshDayManifestsIfStale(day); } catch (Exception e) { log.warn("异步刷新当天 replay manifest 失败, day={}", day, e); } finally { refreshingDays.remove(day); } }); } catch (RejectedExecutionException rejectedExecutionException) { refreshingDays.remove(day); if (!shuttingDown.get()) { log.warn("提交当天 replay manifest 刷新任务失败, day={}", day, rejectedExecutionException); } } } @Override public DeviceReplayManifest rebuildManifest(DeviceReplayStreamKey streamKey) { if (streamKey == null || streamKey.getDay() == null) { return null; } List files = resolveStreamFiles(streamKey); if (files.isEmpty()) { return null; } DeviceReplayManifest manifest = new DeviceReplayManifest(); manifest.setSchemaVersion(MANIFEST_SCHEMA_VERSION); manifest.setDay(streamKey.getDay()); manifest.setType(streamKey.getType()); manifest.setDeviceNo(streamKey.getDeviceNo()); manifest.setStationId(streamKey.getStationId()); manifest.setSampleMode("SNAPSHOT"); manifest.setTotalSamples(0); long generatedSeqCursor = 0L; int chunkIndex = 0; for (ParsedReplayFile parsedReplayFile : files) { DeviceReplayChunkMeta chunkMeta = new DeviceReplayChunkMeta(); chunkMeta.setChunkIndex(chunkIndex++); chunkMeta.setFileName(parsedReplayFile.file.getFileName().toString()); chunkMeta.setRelativePath(buildRelativeLogPath(streamKey, parsedReplayFile.file.getFileName().toString())); chunkMeta.setSealed(Boolean.TRUE); chunkMeta.setSampleCount(0); try (BufferedReader reader = Files.newBufferedReader(parsedReplayFile.file, StandardCharsets.UTF_8)) { String line; while ((line = reader.readLine()) != null) { if (line == null || line.isBlank()) { continue; } DeviceDataLog logItem; try { logItem = JSON.parseObject(line, DeviceDataLog.class); } catch (Exception parseError) { log.debug("跳过损坏的 replay manifest 日志行, file={}", parsedReplayFile.file, parseError); continue; } if (!matchesStream(streamKey, logItem)) { continue; } generatedSeqCursor++; long sampleSeq = resolveSampleSeq(logItem, generatedSeqCursor); long sampleTimeMs = resolveSampleTimeMs(logItem); if (chunkMeta.getFirstSampleSeq() == null) { chunkMeta.setFirstSampleSeq(sampleSeq); chunkMeta.setFirstSampleTimeMs(sampleTimeMs); } chunkMeta.setLastSampleSeq(sampleSeq); chunkMeta.setLastSampleTimeMs(sampleTimeMs); chunkMeta.setSampleCount(chunkMeta.getSampleCount() + 1); } } catch (Exception e) { log.warn("构建 replay manifest chunk 失败, file={}", parsedReplayFile.file, e); continue; } if (chunkMeta.getSampleCount() == null || chunkMeta.getSampleCount() <= 0) { continue; } manifest.getChunks().add(chunkMeta); manifest.setTotalSamples(manifest.getTotalSamples() + chunkMeta.getSampleCount()); if (manifest.getFirstSampleSeq() == null) { manifest.setFirstSampleSeq(chunkMeta.getFirstSampleSeq()); manifest.setFirstSampleTimeMs(chunkMeta.getFirstSampleTimeMs()); } manifest.setLastSampleSeq(chunkMeta.getLastSampleSeq()); manifest.setLastSampleTimeMs(chunkMeta.getLastSampleTimeMs()); } if (manifest.getChunks().isEmpty()) { return null; } writeManifest(streamKey, manifest); return manifest; } @Override public void scheduleManifestRefreshForLog(String day, DeviceDataLog logItem) { DeviceReplayStreamKey streamKey = DeviceReplayStreamKey.fromLog(day, logItem); if (streamKey == null) { return; } StreamRefreshState refreshState = getOrCreateStreamRefreshState(streamKey.streamId()); synchronized (refreshState.monitor) { refreshState.latestStreamKey = streamKey; } if (shuttingDown.get()) { refreshStreamManifest(streamKey); return; } scheduleStreamManifestRefresh(streamKey); } @Override public void refreshManifestForLog(String day, DeviceDataLog logItem) { scheduleManifestRefreshForLog(day, logItem); } @PreDestroy public void shutdownRefreshExecutor() { if (!shuttingDown.compareAndSet(false, true)) { return; } shutdownExecutor(dayRefreshExecutor, "replay-manifest-refresh"); shutdownExecutor(streamRefreshExecutor, "replay-manifest-stream-refresh"); List pendingStreamKeys = streamRefreshStateByStream.values().stream() .map(refreshState -> refreshState.latestStreamKey) .filter(Objects::nonNull) .collect(Collectors.toList()); for (DeviceReplayStreamKey streamKey : pendingStreamKeys) { try { refreshStreamManifest(streamKey); } catch (Exception e) { log.warn("关闭前补刷 replay manifest 失败, streamId={}", streamKey.streamId(), e); } } streamRefreshStateByStream.clear(); } private void writeManifest(DeviceReplayStreamKey streamKey, DeviceReplayManifest manifest) { Path manifestPath = resolveManifestPath(streamKey); Path tempPath = manifestPath.resolveSibling(manifestPath.getFileName().toString() + ".tmp"); try { Files.createDirectories(manifestPath.getParent()); String json = JSON.toJSONString(manifest, SerializerFeature.PrettyFormat); Files.writeString(tempPath, json, StandardCharsets.UTF_8); Files.move(tempPath, manifestPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); } catch (Exception e) { log.warn("写入 replay manifest 失败, path={}", manifestPath, e); try { Files.deleteIfExists(tempPath); } catch (Exception ignored) { } } } private Set discoverStreamKeys(String day, boolean allowLogScan) { LinkedHashSet streamKeys = new LinkedHashSet<>(); Path manifestDayDir = Paths.get(loggingPath, day, MANIFEST_DIR_NAME); if (Files.exists(manifestDayDir)) { try (Stream stream = Files.walk(manifestDayDir, 3)) { stream.filter(Files::isRegularFile) .filter(path -> path.getFileName().toString().endsWith(".json")) .forEach(path -> { DeviceReplayStreamKey streamKey = parseManifestPath(day, path); if (streamKey != null) { streamKeys.add(streamKey); } }); } catch (Exception e) { log.warn("扫描 replay manifest 失败, day={}", day, e); } } if (!allowLogScan) { return streamKeys; } Path dayDir = Paths.get(loggingPath, day); if (!Files.exists(dayDir)) { return streamKeys; } try (Stream stream = Files.walk(dayDir, 3)) { stream.filter(Files::isRegularFile) .filter(path -> path.getFileName().toString().endsWith(".log")) .forEach(path -> { ParsedReplayFile parsed = parseReplayFile(path.getFileName().toString()); if (parsed == null || !day.equals(parsed.day)) { return; } streamKeys.add(DeviceReplayStreamKey.of(parsed.day, parsed.type, parsed.deviceNo, parsed.stationId)); }); } catch (Exception e) { log.warn("扫描 replay stream 失败, day={}", day, e); } return streamKeys; } private void scheduleStreamManifestRefresh(DeviceReplayStreamKey streamKey) { if (streamKey == null || streamKey.getDay() == null) { return; } long requestTime = System.currentTimeMillis(); StreamRefreshState refreshState = getOrCreateStreamRefreshState(streamKey.streamId()); refreshState.activeUsers.incrementAndGet(); try { synchronized (refreshState.monitor) { refreshState.lastScheduleTimeMs = requestTime; refreshState.latestStreamKey = streamKey; scheduleDebouncedStreamRefreshLocked(refreshState, streamKey, requestTime, normalizeDebounceDelay(manifestRefreshDebounceMs)); } } finally { if (refreshState.activeUsers.decrementAndGet() == 0) { tryCleanupStreamRefreshState(streamKey.streamId(), refreshState); } } } private void scheduleDebouncedStreamRefreshLocked(StreamRefreshState refreshState, DeviceReplayStreamKey streamKey, long requestTime, long delayMs) { if (streamKey == null || streamKey.getDay() == null) { return; } ScheduledFuture existingFuture = refreshState.scheduledFuture; if (existingFuture != null && !existingFuture.isDone()) { return; } try { refreshState.scheduledFuture = streamRefreshExecutor.schedule( () -> executeScheduledStreamRefresh(streamKey, requestTime), normalizeDebounceDelay(delayMs), TimeUnit.MILLISECONDS ); } catch (RejectedExecutionException rejectedExecutionException) { refreshState.scheduledFuture = null; if (shuttingDown.get()) { refreshStreamManifest(streamKey); return; } throw rejectedExecutionException; } } private void executeScheduledStreamRefresh(DeviceReplayStreamKey streamKey, long scheduledRequestTime) { if (streamKey == null || streamKey.getDay() == null) { return; } String streamId = streamKey.streamId(); StreamRefreshState refreshState = streamRefreshStateByStream.get(streamId); if (refreshState == null) { return; } refreshState.activeUsers.incrementAndGet(); try { DeviceReplayStreamKey latestStreamKey; long latestRequestTime; synchronized (refreshState.monitor) { refreshState.scheduledFuture = null; latestRequestTime = refreshState.lastScheduleTimeMs > 0 ? refreshState.lastScheduleTimeMs : scheduledRequestTime; latestStreamKey = resolveLatestStreamKey(refreshState, streamKey); long remainingDelayMs = calculateRemainingDebounceDelay(latestRequestTime); if (remainingDelayMs > 0) { scheduleDebouncedStreamRefreshLocked(refreshState, latestStreamKey, latestRequestTime, remainingDelayMs); return; } } if (refreshingStreams.putIfAbsent(streamId, Boolean.TRUE) != null) { return; } long refreshStartTime = System.currentTimeMillis(); try { refreshStreamManifest(latestStreamKey); } catch (Exception e) { log.warn("异步刷新 replay manifest stream 失败, streamId={}", streamId, e); } finally { refreshingStreams.remove(streamId); scheduleTrailingStreamRefreshIfNeeded(refreshState, latestStreamKey, refreshStartTime); } } finally { if (refreshState.activeUsers.decrementAndGet() == 0) { tryCleanupStreamRefreshState(streamId, refreshState); } } } private void scheduleTrailingStreamRefreshIfNeeded(StreamRefreshState refreshState, DeviceReplayStreamKey fallbackStreamKey, long refreshStartTime) { synchronized (refreshState.monitor) { long latestRequestTime = refreshState.lastScheduleTimeMs; if (latestRequestTime <= 0L) { return; } if (latestRequestTime <= refreshStartTime) { if (refreshState.scheduledFuture == null || refreshState.scheduledFuture.isDone()) { refreshState.lastScheduleTimeMs = 0L; refreshState.latestStreamKey = null; } return; } DeviceReplayStreamKey latestStreamKey = resolveLatestStreamKey(refreshState, fallbackStreamKey); scheduleDebouncedStreamRefreshLocked( refreshState, latestStreamKey, latestRequestTime, calculateRemainingDebounceDelay(latestRequestTime) ); } } private long calculateRemainingDebounceDelay(long requestTime) { long debounceDelayMs = normalizeDebounceDelay(manifestRefreshDebounceMs); if (debounceDelayMs <= 0) { return 0L; } long elapsedMs = System.currentTimeMillis() - requestTime; if (elapsedMs >= debounceDelayMs) { return 0L; } return debounceDelayMs - elapsedMs; } private long normalizeDebounceDelay(long delayMs) { return Math.max(delayMs, 0L); } private DeviceReplayManifest refreshStreamManifestIfStale(DeviceReplayStreamKey streamKey) { return refreshStreamManifest(streamKey, true); } private DeviceReplayManifest refreshStreamManifest(DeviceReplayStreamKey streamKey) { return refreshStreamManifest(streamKey, false); } private DeviceReplayManifest refreshStreamManifest(DeviceReplayStreamKey streamKey, boolean onlyIfStale) { if (streamKey == null) { return null; } String streamId = streamKey.streamId(); StreamRefreshState refreshState = getOrCreateStreamRefreshState(streamId); refreshState.activeUsers.incrementAndGet(); try { synchronized (refreshState.monitor) { if (onlyIfStale && !isManifestStale(streamKey)) { return loadManifest(streamKey, false); } return rebuildManifest(streamKey); } } finally { if (refreshState.activeUsers.decrementAndGet() == 0) { tryCleanupStreamRefreshState(streamId, refreshState); } } } private StreamRefreshState getOrCreateStreamRefreshState(String streamId) { return streamRefreshStateByStream.computeIfAbsent(streamId, key -> new StreamRefreshState()); } private DeviceReplayStreamKey resolveLatestStreamKey(StreamRefreshState refreshState, DeviceReplayStreamKey fallbackStreamKey) { DeviceReplayStreamKey latestStreamKey = refreshState.latestStreamKey; return latestStreamKey == null ? fallbackStreamKey : latestStreamKey; } private void tryCleanupStreamRefreshState(String streamId, StreamRefreshState refreshState) { if (shuttingDown.get() || refreshState.activeUsers.get() > 0) { return; } synchronized (refreshState.monitor) { if (shuttingDown.get() || refreshState.activeUsers.get() > 0) { return; } ScheduledFuture scheduledFuture = refreshState.scheduledFuture; if (scheduledFuture != null && !scheduledFuture.isDone()) { return; } if (refreshState.lastScheduleTimeMs > 0L || refreshState.latestStreamKey != null) { return; } if (refreshingStreams.containsKey(streamId)) { return; } streamRefreshStateByStream.remove(streamId, refreshState); } } private void shutdownExecutor(ExecutorService executorService, String executorName) { executorService.shutdown(); try { if (executorService.awaitTermination(EXECUTOR_SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)) { return; } List droppedTasks = executorService.shutdownNow(); log.warn("等待 {} 关闭超时,强制停止剩余任务, droppedTaskCount={}", executorName, droppedTasks.size()); if (!executorService.awaitTermination(EXECUTOR_SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)) { log.warn("{} 强制停止后仍未完全退出", executorName); } } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); List droppedTasks = executorService.shutdownNow(); log.warn("等待 {} 关闭被中断,已强制停止剩余任务, droppedTaskCount={}", executorName, droppedTasks.size(), interruptedException); } } private List resolveStaleStreamKeys(String day) { Set streamKeys = discoverStreamKeys(day, true); List staleStreamKeys = new ArrayList<>(); for (DeviceReplayStreamKey streamKey : streamKeys) { if (isManifestStale(streamKey)) { staleStreamKeys.add(streamKey); } } return staleStreamKeys; } private boolean isManifestStale(DeviceReplayStreamKey streamKey) { if (streamKey == null) { return false; } long latestLogModifiedTime = resolveLatestLogModifiedTime(streamKey); if (latestLogModifiedTime <= 0) { return false; } Path manifestPath = resolveManifestPath(streamKey); if (!Files.exists(manifestPath)) { return true; } try { long manifestModifiedTime = Files.getLastModifiedTime(manifestPath).toMillis(); return latestLogModifiedTime - manifestModifiedTime > MANIFEST_STALE_THRESHOLD_MS; } catch (Exception e) { log.warn("判断 replay manifest 是否过期失败, path={}", manifestPath, e); return true; } } private long resolveLatestLogModifiedTime(DeviceReplayStreamKey streamKey) { long latestModifiedTime = 0L; for (ParsedReplayFile replayFile : resolveStreamFiles(streamKey)) { try { latestModifiedTime = Math.max(latestModifiedTime, Files.getLastModifiedTime(replayFile.file).toMillis()); } catch (Exception e) { log.debug("读取 replay 日志文件修改时间失败, file={}", replayFile.file, e); } } return latestModifiedTime; } private DeviceReplayStreamKey parseManifestPath(String day, Path manifestPath) { if (manifestPath == null || manifestPath.getParent() == null) { return null; } Path typePath = manifestPath.getParent(); Path filePath = manifestPath.getFileName(); if (typePath == null || filePath == null) { return null; } String type = typePath.getFileName() == null ? null : typePath.getFileName().toString(); String fileName = filePath.toString(); Matcher stationMatcher = Pattern.compile("^(\\d+)_station_(\\d+)\\.json$").matcher(fileName); if (stationMatcher.matches()) { return DeviceReplayStreamKey.of(day, type, Integer.parseInt(stationMatcher.group(1)), Integer.parseInt(stationMatcher.group(2))); } Matcher defaultMatcher = Pattern.compile("^(\\d+)\\.json$").matcher(fileName); if (defaultMatcher.matches()) { return DeviceReplayStreamKey.of(day, type, Integer.parseInt(defaultMatcher.group(1)), null); } return null; } private List resolveStreamFiles(DeviceReplayStreamKey streamKey) { Path deviceDir = Paths.get(loggingPath, streamKey.getDay(), streamKey.getType(), String.valueOf(streamKey.getDeviceNo())); if (!Files.exists(deviceDir)) { return new ArrayList<>(); } try (Stream stream = Files.list(deviceDir)) { return stream.filter(Files::isRegularFile) .map(path -> parseReplayFile(path.getFileName().toString(), path)) .filter(parsed -> parsed != null && streamKey.getDay().equals(parsed.day) && streamKey.getType().equals(parsed.type) && streamKey.getDeviceNo().equals(parsed.deviceNo) && equalsNullable(streamKey.getStationId(), parsed.stationId)) .sorted(Comparator.comparing(parsed -> parsed.index)) .collect(Collectors.toList()); } catch (Exception e) { log.warn("解析 replay stream 文件失败, streamKey={}", streamKey.streamId(), e); return new ArrayList<>(); } } private Path resolveManifestPath(DeviceReplayStreamKey streamKey) { String fileName = streamKey.getStationId() == null ? streamKey.getDeviceNo() + ".json" : streamKey.getDeviceNo() + "_station_" + streamKey.getStationId() + ".json"; return Paths.get(loggingPath, streamKey.getDay(), MANIFEST_DIR_NAME, streamKey.getType(), fileName); } private String buildRelativeLogPath(DeviceReplayStreamKey streamKey, String fileName) { return streamKey.getType() + "/" + streamKey.getDeviceNo() + "/" + fileName; } private long resolveSampleSeq(DeviceDataLog logItem, long fallback) { if (logItem != null && logItem.getSampleSeq() != null && logItem.getSampleSeq() > 0) { return logItem.getSampleSeq(); } return fallback; } private long resolveSampleTimeMs(DeviceDataLog logItem) { if (logItem != null && logItem.getSampleTimeMs() != null && logItem.getSampleTimeMs() > 0) { return logItem.getSampleTimeMs(); } if (logItem != null && logItem.getCreateTime() != null) { return logItem.getCreateTime().getTime(); } return 0L; } private boolean matchesStream(DeviceReplayStreamKey streamKey, DeviceDataLog logItem) { if (streamKey == null || logItem == null) { return false; } if (!streamKey.getType().equals(logItem.getType())) { return false; } if (!streamKey.getDeviceNo().equals(logItem.getDeviceNo())) { return false; } return equalsNullable(streamKey.getStationId(), logItem.getStationId()); } private boolean equalsNullable(Integer left, Integer right) { if (left == null) { return right == null; } return left.equals(right); } private ParsedReplayFile parseReplayFile(String fileName) { return parseReplayFile(fileName, null); } private ParsedReplayFile parseReplayFile(String fileName, Path filePath) { Matcher devpMatcher = DEVP_FILE_PATTERN.matcher(fileName); if (devpMatcher.matches()) { ParsedReplayFile parsed = new ParsedReplayFile(); parsed.type = devpMatcher.group(1); parsed.deviceNo = Integer.parseInt(devpMatcher.group(2)); parsed.stationId = Integer.parseInt(devpMatcher.group(3)); parsed.day = devpMatcher.group(4); parsed.index = Integer.parseInt(devpMatcher.group(5)); parsed.file = filePath; return parsed; } Matcher defaultMatcher = DEFAULT_FILE_PATTERN.matcher(fileName); if (defaultMatcher.matches()) { ParsedReplayFile parsed = new ParsedReplayFile(); parsed.type = defaultMatcher.group(1); parsed.deviceNo = Integer.parseInt(defaultMatcher.group(2)); parsed.day = defaultMatcher.group(3); parsed.index = Integer.parseInt(defaultMatcher.group(4)); parsed.file = filePath; return parsed; } return null; } private static class ParsedReplayFile { private String type; private Integer deviceNo; private Integer stationId; private String day; private Integer index; private Path file; } private static class StreamRefreshState { private final Object monitor = new Object(); private final AtomicInteger activeUsers = new AtomicInteger(0); private volatile long lastScheduleTimeMs; private volatile DeviceReplayStreamKey latestStreamKey; private volatile ScheduledFuture scheduledFuture; } }