package com.zy.asrs.service.impl;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.serializer.SerializerFeature;
|
import com.zy.asrs.domain.replay.DeviceReplayChunkMeta;
|
import com.zy.asrs.domain.replay.DeviceReplayManifest;
|
import com.zy.asrs.domain.replay.DeviceReplayStreamKey;
|
import com.zy.asrs.entity.DeviceDataLog;
|
import com.zy.asrs.service.DeviceLogReplayManifestService;
|
import lombok.extern.slf4j.Slf4j;
|
import jakarta.annotation.PreDestroy;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Service;
|
|
import java.io.BufferedReader;
|
import java.nio.charset.StandardCharsets;
|
import java.nio.file.Files;
|
import java.nio.file.StandardCopyOption;
|
import java.nio.file.Path;
|
import java.nio.file.Paths;
|
import java.time.ZoneId;
|
import java.util.ArrayList;
|
import java.util.Comparator;
|
import java.util.LinkedHashSet;
|
import java.util.List;
|
import java.util.Objects;
|
import java.util.Set;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.regex.Matcher;
|
import java.util.regex.Pattern;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
@Slf4j
|
@Service
|
public class DeviceLogReplayManifestServiceImpl implements DeviceLogReplayManifestService {
|
|
private static final String MANIFEST_DIR_NAME = ".replay-manifest";
|
private static final int MANIFEST_SCHEMA_VERSION = 1;
|
private static final Pattern DEFAULT_FILE_PATTERN = Pattern.compile("^([^_]+)_(\\d+)_(\\d{8})_(\\d+)\\.log$");
|
private static final Pattern DEVP_FILE_PATTERN = Pattern.compile("^([^_]+)_(\\d+)_station_(\\d+)_(\\d{8})_(\\d+)\\.log$");
|
private static final long MANIFEST_STALE_THRESHOLD_MS = 5_000L;
|
private static final long EXECUTOR_SHUTDOWN_WAIT_MS = 5_000L;
|
private static final ZoneId REPLAY_ZONE = ZoneId.systemDefault();
|
|
@Value("${deviceLogStorage.loggingPath}")
|
private String loggingPath;
|
|
@Value("${deviceLogStorage.manifestRefreshDebounceMs:1000}")
|
private long manifestRefreshDebounceMs;
|
|
private final ExecutorService dayRefreshExecutor = Executors.newSingleThreadExecutor(runnable -> {
|
Thread thread = new Thread(runnable, "replay-manifest-refresh");
|
thread.setDaemon(true);
|
return thread;
|
});
|
|
private final ScheduledExecutorService streamRefreshExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
|
Thread thread = new Thread(runnable, "replay-manifest-stream-refresh");
|
thread.setDaemon(true);
|
return thread;
|
});
|
|
private final ConcurrentHashMap<String, Boolean> refreshingDays = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, Boolean> refreshingStreams = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, StreamRefreshState> streamRefreshStateByStream = new ConcurrentHashMap<>();
|
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
|
|
@Override
|
public DeviceReplayManifest loadManifest(DeviceReplayStreamKey streamKey, boolean autoBuild) {
|
if (streamKey == null || streamKey.getDay() == null) {
|
return null;
|
}
|
Path manifestPath = resolveManifestPath(streamKey);
|
if (Files.exists(manifestPath)) {
|
try {
|
String json = Files.readString(manifestPath, StandardCharsets.UTF_8);
|
return JSON.parseObject(json, DeviceReplayManifest.class);
|
} catch (Exception e) {
|
log.warn("读取 replay manifest 失败,准备重建, path={}", manifestPath, e);
|
}
|
}
|
if (!autoBuild) {
|
return null;
|
}
|
return rebuildManifest(streamKey);
|
}
|
|
@Override
|
public List<DeviceReplayManifest> loadDayManifests(String day, boolean autoBuild) {
|
List<DeviceReplayManifest> manifests = new ArrayList<>();
|
for (DeviceReplayStreamKey streamKey : discoverStreamKeys(day, autoBuild)) {
|
DeviceReplayManifest manifest = loadManifest(streamKey, autoBuild);
|
if (manifest != null) {
|
manifests.add(manifest);
|
}
|
}
|
manifests.sort(Comparator
|
.comparing(DeviceReplayManifest::getFirstSampleTimeMs, Comparator.nullsLast(Long::compareTo))
|
.thenComparing(DeviceReplayManifest::getType, Comparator.nullsLast(String::compareTo))
|
.thenComparing(DeviceReplayManifest::getDeviceNo, Comparator.nullsLast(Integer::compareTo))
|
.thenComparing(DeviceReplayManifest::getStationId, Comparator.nullsLast(Integer::compareTo)));
|
return manifests;
|
}
|
|
@Override
|
public List<DeviceReplayManifest> refreshDayManifestsIfStale(String day) {
|
if (day == null || day.isBlank()) {
|
return new ArrayList<>();
|
}
|
List<DeviceReplayStreamKey> staleStreamKeys = resolveStaleStreamKeys(day);
|
if (staleStreamKeys.isEmpty()) {
|
return loadDayManifests(day, false);
|
}
|
for (DeviceReplayStreamKey streamKey : staleStreamKeys) {
|
refreshStreamManifestIfStale(streamKey);
|
}
|
return loadDayManifests(day, false);
|
}
|
|
@Override
|
public void scheduleDayManifestRefresh(String day) {
|
if (day == null || day.isBlank()) {
|
return;
|
}
|
if (shuttingDown.get()) {
|
return;
|
}
|
if (refreshingDays.putIfAbsent(day, Boolean.TRUE) != null) {
|
return;
|
}
|
try {
|
dayRefreshExecutor.submit(() -> {
|
try {
|
refreshDayManifestsIfStale(day);
|
} catch (Exception e) {
|
log.warn("异步刷新当天 replay manifest 失败, day={}", day, e);
|
} finally {
|
refreshingDays.remove(day);
|
}
|
});
|
} catch (RejectedExecutionException rejectedExecutionException) {
|
refreshingDays.remove(day);
|
if (!shuttingDown.get()) {
|
log.warn("提交当天 replay manifest 刷新任务失败, day={}", day, rejectedExecutionException);
|
}
|
}
|
}
|
|
@Override
|
public DeviceReplayManifest rebuildManifest(DeviceReplayStreamKey streamKey) {
|
if (streamKey == null || streamKey.getDay() == null) {
|
return null;
|
}
|
List<ParsedReplayFile> files = resolveStreamFiles(streamKey);
|
if (files.isEmpty()) {
|
return null;
|
}
|
DeviceReplayManifest manifest = new DeviceReplayManifest();
|
manifest.setSchemaVersion(MANIFEST_SCHEMA_VERSION);
|
manifest.setDay(streamKey.getDay());
|
manifest.setType(streamKey.getType());
|
manifest.setDeviceNo(streamKey.getDeviceNo());
|
manifest.setStationId(streamKey.getStationId());
|
manifest.setSampleMode("SNAPSHOT");
|
manifest.setTotalSamples(0);
|
|
long generatedSeqCursor = 0L;
|
int chunkIndex = 0;
|
for (ParsedReplayFile parsedReplayFile : files) {
|
DeviceReplayChunkMeta chunkMeta = new DeviceReplayChunkMeta();
|
chunkMeta.setChunkIndex(chunkIndex++);
|
chunkMeta.setFileName(parsedReplayFile.file.getFileName().toString());
|
chunkMeta.setRelativePath(buildRelativeLogPath(streamKey, parsedReplayFile.file.getFileName().toString()));
|
chunkMeta.setSealed(Boolean.TRUE);
|
chunkMeta.setSampleCount(0);
|
try (BufferedReader reader = Files.newBufferedReader(parsedReplayFile.file, StandardCharsets.UTF_8)) {
|
String line;
|
while ((line = reader.readLine()) != null) {
|
if (line == null || line.isBlank()) {
|
continue;
|
}
|
DeviceDataLog logItem;
|
try {
|
logItem = JSON.parseObject(line, DeviceDataLog.class);
|
} catch (Exception parseError) {
|
log.debug("跳过损坏的 replay manifest 日志行, file={}", parsedReplayFile.file, parseError);
|
continue;
|
}
|
if (!matchesStream(streamKey, logItem)) {
|
continue;
|
}
|
generatedSeqCursor++;
|
long sampleSeq = resolveSampleSeq(logItem, generatedSeqCursor);
|
long sampleTimeMs = resolveSampleTimeMs(logItem);
|
if (chunkMeta.getFirstSampleSeq() == null) {
|
chunkMeta.setFirstSampleSeq(sampleSeq);
|
chunkMeta.setFirstSampleTimeMs(sampleTimeMs);
|
}
|
chunkMeta.setLastSampleSeq(sampleSeq);
|
chunkMeta.setLastSampleTimeMs(sampleTimeMs);
|
chunkMeta.setSampleCount(chunkMeta.getSampleCount() + 1);
|
}
|
} catch (Exception e) {
|
log.warn("构建 replay manifest chunk 失败, file={}", parsedReplayFile.file, e);
|
continue;
|
}
|
if (chunkMeta.getSampleCount() == null || chunkMeta.getSampleCount() <= 0) {
|
continue;
|
}
|
manifest.getChunks().add(chunkMeta);
|
manifest.setTotalSamples(manifest.getTotalSamples() + chunkMeta.getSampleCount());
|
if (manifest.getFirstSampleSeq() == null) {
|
manifest.setFirstSampleSeq(chunkMeta.getFirstSampleSeq());
|
manifest.setFirstSampleTimeMs(chunkMeta.getFirstSampleTimeMs());
|
}
|
manifest.setLastSampleSeq(chunkMeta.getLastSampleSeq());
|
manifest.setLastSampleTimeMs(chunkMeta.getLastSampleTimeMs());
|
}
|
if (manifest.getChunks().isEmpty()) {
|
return null;
|
}
|
writeManifest(streamKey, manifest);
|
return manifest;
|
}
|
|
@Override
|
public void scheduleManifestRefreshForLog(String day, DeviceDataLog logItem) {
|
DeviceReplayStreamKey streamKey = DeviceReplayStreamKey.fromLog(day, logItem);
|
if (streamKey == null) {
|
return;
|
}
|
StreamRefreshState refreshState = getOrCreateStreamRefreshState(streamKey.streamId());
|
synchronized (refreshState.monitor) {
|
refreshState.latestStreamKey = streamKey;
|
}
|
if (shuttingDown.get()) {
|
refreshStreamManifest(streamKey);
|
return;
|
}
|
scheduleStreamManifestRefresh(streamKey);
|
}
|
|
@Override
|
public void refreshManifestForLog(String day, DeviceDataLog logItem) {
|
scheduleManifestRefreshForLog(day, logItem);
|
}
|
|
@PreDestroy
|
public void shutdownRefreshExecutor() {
|
if (!shuttingDown.compareAndSet(false, true)) {
|
return;
|
}
|
shutdownExecutor(dayRefreshExecutor, "replay-manifest-refresh");
|
shutdownExecutor(streamRefreshExecutor, "replay-manifest-stream-refresh");
|
List<DeviceReplayStreamKey> pendingStreamKeys = streamRefreshStateByStream.values().stream()
|
.map(refreshState -> refreshState.latestStreamKey)
|
.filter(Objects::nonNull)
|
.collect(Collectors.toList());
|
for (DeviceReplayStreamKey streamKey : pendingStreamKeys) {
|
try {
|
refreshStreamManifest(streamKey);
|
} catch (Exception e) {
|
log.warn("关闭前补刷 replay manifest 失败, streamId={}", streamKey.streamId(), e);
|
}
|
}
|
streamRefreshStateByStream.clear();
|
}
|
|
private void writeManifest(DeviceReplayStreamKey streamKey, DeviceReplayManifest manifest) {
|
Path manifestPath = resolveManifestPath(streamKey);
|
Path tempPath = manifestPath.resolveSibling(manifestPath.getFileName().toString() + ".tmp");
|
try {
|
Files.createDirectories(manifestPath.getParent());
|
String json = JSON.toJSONString(manifest, SerializerFeature.PrettyFormat);
|
Files.writeString(tempPath, json, StandardCharsets.UTF_8);
|
Files.move(tempPath, manifestPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
|
} catch (Exception e) {
|
log.warn("写入 replay manifest 失败, path={}", manifestPath, e);
|
try {
|
Files.deleteIfExists(tempPath);
|
} catch (Exception ignored) {
|
}
|
}
|
}
|
|
private Set<DeviceReplayStreamKey> discoverStreamKeys(String day, boolean allowLogScan) {
|
LinkedHashSet<DeviceReplayStreamKey> streamKeys = new LinkedHashSet<>();
|
Path manifestDayDir = Paths.get(loggingPath, day, MANIFEST_DIR_NAME);
|
if (Files.exists(manifestDayDir)) {
|
try (Stream<Path> stream = Files.walk(manifestDayDir, 3)) {
|
stream.filter(Files::isRegularFile)
|
.filter(path -> path.getFileName().toString().endsWith(".json"))
|
.forEach(path -> {
|
DeviceReplayStreamKey streamKey = parseManifestPath(day, path);
|
if (streamKey != null) {
|
streamKeys.add(streamKey);
|
}
|
});
|
} catch (Exception e) {
|
log.warn("扫描 replay manifest 失败, day={}", day, e);
|
}
|
}
|
if (!allowLogScan) {
|
return streamKeys;
|
}
|
Path dayDir = Paths.get(loggingPath, day);
|
if (!Files.exists(dayDir)) {
|
return streamKeys;
|
}
|
try (Stream<Path> stream = Files.walk(dayDir, 3)) {
|
stream.filter(Files::isRegularFile)
|
.filter(path -> path.getFileName().toString().endsWith(".log"))
|
.forEach(path -> {
|
ParsedReplayFile parsed = parseReplayFile(path.getFileName().toString());
|
if (parsed == null || !day.equals(parsed.day)) {
|
return;
|
}
|
streamKeys.add(DeviceReplayStreamKey.of(parsed.day, parsed.type, parsed.deviceNo, parsed.stationId));
|
});
|
} catch (Exception e) {
|
log.warn("扫描 replay stream 失败, day={}", day, e);
|
}
|
return streamKeys;
|
}
|
|
private void scheduleStreamManifestRefresh(DeviceReplayStreamKey streamKey) {
|
if (streamKey == null || streamKey.getDay() == null) {
|
return;
|
}
|
long requestTime = System.currentTimeMillis();
|
StreamRefreshState refreshState = getOrCreateStreamRefreshState(streamKey.streamId());
|
refreshState.activeUsers.incrementAndGet();
|
try {
|
synchronized (refreshState.monitor) {
|
refreshState.lastScheduleTimeMs = requestTime;
|
refreshState.latestStreamKey = streamKey;
|
scheduleDebouncedStreamRefreshLocked(refreshState, streamKey, requestTime, normalizeDebounceDelay(manifestRefreshDebounceMs));
|
}
|
} finally {
|
if (refreshState.activeUsers.decrementAndGet() == 0) {
|
tryCleanupStreamRefreshState(streamKey.streamId(), refreshState);
|
}
|
}
|
}
|
|
private void scheduleDebouncedStreamRefreshLocked(StreamRefreshState refreshState,
|
DeviceReplayStreamKey streamKey,
|
long requestTime,
|
long delayMs) {
|
if (streamKey == null || streamKey.getDay() == null) {
|
return;
|
}
|
ScheduledFuture<?> existingFuture = refreshState.scheduledFuture;
|
if (existingFuture != null && !existingFuture.isDone()) {
|
return;
|
}
|
try {
|
refreshState.scheduledFuture = streamRefreshExecutor.schedule(
|
() -> executeScheduledStreamRefresh(streamKey, requestTime),
|
normalizeDebounceDelay(delayMs),
|
TimeUnit.MILLISECONDS
|
);
|
} catch (RejectedExecutionException rejectedExecutionException) {
|
refreshState.scheduledFuture = null;
|
if (shuttingDown.get()) {
|
refreshStreamManifest(streamKey);
|
return;
|
}
|
throw rejectedExecutionException;
|
}
|
}
|
|
private void executeScheduledStreamRefresh(DeviceReplayStreamKey streamKey, long scheduledRequestTime) {
|
if (streamKey == null || streamKey.getDay() == null) {
|
return;
|
}
|
String streamId = streamKey.streamId();
|
StreamRefreshState refreshState = streamRefreshStateByStream.get(streamId);
|
if (refreshState == null) {
|
return;
|
}
|
refreshState.activeUsers.incrementAndGet();
|
try {
|
DeviceReplayStreamKey latestStreamKey;
|
long latestRequestTime;
|
synchronized (refreshState.monitor) {
|
refreshState.scheduledFuture = null;
|
latestRequestTime = refreshState.lastScheduleTimeMs > 0 ? refreshState.lastScheduleTimeMs : scheduledRequestTime;
|
latestStreamKey = resolveLatestStreamKey(refreshState, streamKey);
|
long remainingDelayMs = calculateRemainingDebounceDelay(latestRequestTime);
|
if (remainingDelayMs > 0) {
|
scheduleDebouncedStreamRefreshLocked(refreshState, latestStreamKey, latestRequestTime, remainingDelayMs);
|
return;
|
}
|
}
|
if (refreshingStreams.putIfAbsent(streamId, Boolean.TRUE) != null) {
|
return;
|
}
|
|
long refreshStartTime = System.currentTimeMillis();
|
try {
|
refreshStreamManifest(latestStreamKey);
|
} catch (Exception e) {
|
log.warn("异步刷新 replay manifest stream 失败, streamId={}", streamId, e);
|
} finally {
|
refreshingStreams.remove(streamId);
|
scheduleTrailingStreamRefreshIfNeeded(refreshState, latestStreamKey, refreshStartTime);
|
}
|
} finally {
|
if (refreshState.activeUsers.decrementAndGet() == 0) {
|
tryCleanupStreamRefreshState(streamId, refreshState);
|
}
|
}
|
}
|
|
private void scheduleTrailingStreamRefreshIfNeeded(StreamRefreshState refreshState,
|
DeviceReplayStreamKey fallbackStreamKey,
|
long refreshStartTime) {
|
synchronized (refreshState.monitor) {
|
long latestRequestTime = refreshState.lastScheduleTimeMs;
|
if (latestRequestTime <= 0L) {
|
return;
|
}
|
if (latestRequestTime <= refreshStartTime) {
|
if (refreshState.scheduledFuture == null || refreshState.scheduledFuture.isDone()) {
|
refreshState.lastScheduleTimeMs = 0L;
|
refreshState.latestStreamKey = null;
|
}
|
return;
|
}
|
DeviceReplayStreamKey latestStreamKey = resolveLatestStreamKey(refreshState, fallbackStreamKey);
|
scheduleDebouncedStreamRefreshLocked(
|
refreshState,
|
latestStreamKey,
|
latestRequestTime,
|
calculateRemainingDebounceDelay(latestRequestTime)
|
);
|
}
|
}
|
|
private long calculateRemainingDebounceDelay(long requestTime) {
|
long debounceDelayMs = normalizeDebounceDelay(manifestRefreshDebounceMs);
|
if (debounceDelayMs <= 0) {
|
return 0L;
|
}
|
long elapsedMs = System.currentTimeMillis() - requestTime;
|
if (elapsedMs >= debounceDelayMs) {
|
return 0L;
|
}
|
return debounceDelayMs - elapsedMs;
|
}
|
|
private long normalizeDebounceDelay(long delayMs) {
|
return Math.max(delayMs, 0L);
|
}
|
|
private DeviceReplayManifest refreshStreamManifestIfStale(DeviceReplayStreamKey streamKey) {
|
return refreshStreamManifest(streamKey, true);
|
}
|
|
private DeviceReplayManifest refreshStreamManifest(DeviceReplayStreamKey streamKey) {
|
return refreshStreamManifest(streamKey, false);
|
}
|
|
private DeviceReplayManifest refreshStreamManifest(DeviceReplayStreamKey streamKey, boolean onlyIfStale) {
|
if (streamKey == null) {
|
return null;
|
}
|
String streamId = streamKey.streamId();
|
StreamRefreshState refreshState = getOrCreateStreamRefreshState(streamId);
|
refreshState.activeUsers.incrementAndGet();
|
try {
|
synchronized (refreshState.monitor) {
|
if (onlyIfStale && !isManifestStale(streamKey)) {
|
return loadManifest(streamKey, false);
|
}
|
return rebuildManifest(streamKey);
|
}
|
} finally {
|
if (refreshState.activeUsers.decrementAndGet() == 0) {
|
tryCleanupStreamRefreshState(streamId, refreshState);
|
}
|
}
|
}
|
|
private StreamRefreshState getOrCreateStreamRefreshState(String streamId) {
|
return streamRefreshStateByStream.computeIfAbsent(streamId, key -> new StreamRefreshState());
|
}
|
|
private DeviceReplayStreamKey resolveLatestStreamKey(StreamRefreshState refreshState,
|
DeviceReplayStreamKey fallbackStreamKey) {
|
DeviceReplayStreamKey latestStreamKey = refreshState.latestStreamKey;
|
return latestStreamKey == null ? fallbackStreamKey : latestStreamKey;
|
}
|
|
private void tryCleanupStreamRefreshState(String streamId, StreamRefreshState refreshState) {
|
if (shuttingDown.get() || refreshState.activeUsers.get() > 0) {
|
return;
|
}
|
synchronized (refreshState.monitor) {
|
if (shuttingDown.get() || refreshState.activeUsers.get() > 0) {
|
return;
|
}
|
ScheduledFuture<?> scheduledFuture = refreshState.scheduledFuture;
|
if (scheduledFuture != null && !scheduledFuture.isDone()) {
|
return;
|
}
|
if (refreshState.lastScheduleTimeMs > 0L || refreshState.latestStreamKey != null) {
|
return;
|
}
|
if (refreshingStreams.containsKey(streamId)) {
|
return;
|
}
|
streamRefreshStateByStream.remove(streamId, refreshState);
|
}
|
}
|
|
private void shutdownExecutor(ExecutorService executorService, String executorName) {
|
executorService.shutdown();
|
try {
|
if (executorService.awaitTermination(EXECUTOR_SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)) {
|
return;
|
}
|
List<Runnable> droppedTasks = executorService.shutdownNow();
|
log.warn("等待 {} 关闭超时,强制停止剩余任务, droppedTaskCount={}", executorName, droppedTasks.size());
|
if (!executorService.awaitTermination(EXECUTOR_SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)) {
|
log.warn("{} 强制停止后仍未完全退出", executorName);
|
}
|
} catch (InterruptedException interruptedException) {
|
Thread.currentThread().interrupt();
|
List<Runnable> droppedTasks = executorService.shutdownNow();
|
log.warn("等待 {} 关闭被中断,已强制停止剩余任务, droppedTaskCount={}", executorName, droppedTasks.size(), interruptedException);
|
}
|
}
|
|
private List<DeviceReplayStreamKey> resolveStaleStreamKeys(String day) {
|
Set<DeviceReplayStreamKey> streamKeys = discoverStreamKeys(day, true);
|
List<DeviceReplayStreamKey> staleStreamKeys = new ArrayList<>();
|
for (DeviceReplayStreamKey streamKey : streamKeys) {
|
if (isManifestStale(streamKey)) {
|
staleStreamKeys.add(streamKey);
|
}
|
}
|
return staleStreamKeys;
|
}
|
|
private boolean isManifestStale(DeviceReplayStreamKey streamKey) {
|
if (streamKey == null) {
|
return false;
|
}
|
long latestLogModifiedTime = resolveLatestLogModifiedTime(streamKey);
|
if (latestLogModifiedTime <= 0) {
|
return false;
|
}
|
Path manifestPath = resolveManifestPath(streamKey);
|
if (!Files.exists(manifestPath)) {
|
return true;
|
}
|
try {
|
long manifestModifiedTime = Files.getLastModifiedTime(manifestPath).toMillis();
|
return latestLogModifiedTime - manifestModifiedTime > MANIFEST_STALE_THRESHOLD_MS;
|
} catch (Exception e) {
|
log.warn("判断 replay manifest 是否过期失败, path={}", manifestPath, e);
|
return true;
|
}
|
}
|
|
private long resolveLatestLogModifiedTime(DeviceReplayStreamKey streamKey) {
|
long latestModifiedTime = 0L;
|
for (ParsedReplayFile replayFile : resolveStreamFiles(streamKey)) {
|
try {
|
latestModifiedTime = Math.max(latestModifiedTime, Files.getLastModifiedTime(replayFile.file).toMillis());
|
} catch (Exception e) {
|
log.debug("读取 replay 日志文件修改时间失败, file={}", replayFile.file, e);
|
}
|
}
|
return latestModifiedTime;
|
}
|
|
private DeviceReplayStreamKey parseManifestPath(String day, Path manifestPath) {
|
if (manifestPath == null || manifestPath.getParent() == null) {
|
return null;
|
}
|
Path typePath = manifestPath.getParent();
|
Path filePath = manifestPath.getFileName();
|
if (typePath == null || filePath == null) {
|
return null;
|
}
|
String type = typePath.getFileName() == null ? null : typePath.getFileName().toString();
|
String fileName = filePath.toString();
|
Matcher stationMatcher = Pattern.compile("^(\\d+)_station_(\\d+)\\.json$").matcher(fileName);
|
if (stationMatcher.matches()) {
|
return DeviceReplayStreamKey.of(day, type, Integer.parseInt(stationMatcher.group(1)), Integer.parseInt(stationMatcher.group(2)));
|
}
|
Matcher defaultMatcher = Pattern.compile("^(\\d+)\\.json$").matcher(fileName);
|
if (defaultMatcher.matches()) {
|
return DeviceReplayStreamKey.of(day, type, Integer.parseInt(defaultMatcher.group(1)), null);
|
}
|
return null;
|
}
|
|
private List<ParsedReplayFile> resolveStreamFiles(DeviceReplayStreamKey streamKey) {
|
Path deviceDir = Paths.get(loggingPath, streamKey.getDay(), streamKey.getType(), String.valueOf(streamKey.getDeviceNo()));
|
if (!Files.exists(deviceDir)) {
|
return new ArrayList<>();
|
}
|
try (Stream<Path> stream = Files.list(deviceDir)) {
|
return stream.filter(Files::isRegularFile)
|
.map(path -> parseReplayFile(path.getFileName().toString(), path))
|
.filter(parsed -> parsed != null
|
&& streamKey.getDay().equals(parsed.day)
|
&& streamKey.getType().equals(parsed.type)
|
&& streamKey.getDeviceNo().equals(parsed.deviceNo)
|
&& equalsNullable(streamKey.getStationId(), parsed.stationId))
|
.sorted(Comparator.comparing(parsed -> parsed.index))
|
.collect(Collectors.toList());
|
} catch (Exception e) {
|
log.warn("解析 replay stream 文件失败, streamKey={}", streamKey.streamId(), e);
|
return new ArrayList<>();
|
}
|
}
|
|
private Path resolveManifestPath(DeviceReplayStreamKey streamKey) {
|
String fileName = streamKey.getStationId() == null
|
? streamKey.getDeviceNo() + ".json"
|
: streamKey.getDeviceNo() + "_station_" + streamKey.getStationId() + ".json";
|
return Paths.get(loggingPath, streamKey.getDay(), MANIFEST_DIR_NAME, streamKey.getType(), fileName);
|
}
|
|
private String buildRelativeLogPath(DeviceReplayStreamKey streamKey, String fileName) {
|
return streamKey.getType() + "/" + streamKey.getDeviceNo() + "/" + fileName;
|
}
|
|
private long resolveSampleSeq(DeviceDataLog logItem, long fallback) {
|
if (logItem != null && logItem.getSampleSeq() != null && logItem.getSampleSeq() > 0) {
|
return logItem.getSampleSeq();
|
}
|
return fallback;
|
}
|
|
private long resolveSampleTimeMs(DeviceDataLog logItem) {
|
if (logItem != null && logItem.getSampleTimeMs() != null && logItem.getSampleTimeMs() > 0) {
|
return logItem.getSampleTimeMs();
|
}
|
if (logItem != null && logItem.getCreateTime() != null) {
|
return logItem.getCreateTime().getTime();
|
}
|
return 0L;
|
}
|
|
private boolean matchesStream(DeviceReplayStreamKey streamKey, DeviceDataLog logItem) {
|
if (streamKey == null || logItem == null) {
|
return false;
|
}
|
if (!streamKey.getType().equals(logItem.getType())) {
|
return false;
|
}
|
if (!streamKey.getDeviceNo().equals(logItem.getDeviceNo())) {
|
return false;
|
}
|
return equalsNullable(streamKey.getStationId(), logItem.getStationId());
|
}
|
|
private boolean equalsNullable(Integer left, Integer right) {
|
if (left == null) {
|
return right == null;
|
}
|
return left.equals(right);
|
}
|
|
private ParsedReplayFile parseReplayFile(String fileName) {
|
return parseReplayFile(fileName, null);
|
}
|
|
private ParsedReplayFile parseReplayFile(String fileName, Path filePath) {
|
Matcher devpMatcher = DEVP_FILE_PATTERN.matcher(fileName);
|
if (devpMatcher.matches()) {
|
ParsedReplayFile parsed = new ParsedReplayFile();
|
parsed.type = devpMatcher.group(1);
|
parsed.deviceNo = Integer.parseInt(devpMatcher.group(2));
|
parsed.stationId = Integer.parseInt(devpMatcher.group(3));
|
parsed.day = devpMatcher.group(4);
|
parsed.index = Integer.parseInt(devpMatcher.group(5));
|
parsed.file = filePath;
|
return parsed;
|
}
|
Matcher defaultMatcher = DEFAULT_FILE_PATTERN.matcher(fileName);
|
if (defaultMatcher.matches()) {
|
ParsedReplayFile parsed = new ParsedReplayFile();
|
parsed.type = defaultMatcher.group(1);
|
parsed.deviceNo = Integer.parseInt(defaultMatcher.group(2));
|
parsed.day = defaultMatcher.group(3);
|
parsed.index = Integer.parseInt(defaultMatcher.group(4));
|
parsed.file = filePath;
|
return parsed;
|
}
|
return null;
|
}
|
|
private static class ParsedReplayFile {
|
private String type;
|
private Integer deviceNo;
|
private Integer stationId;
|
private String day;
|
private Integer index;
|
private Path file;
|
}
|
|
private static class StreamRefreshState {
|
private final Object monitor = new Object();
|
private final AtomicInteger activeUsers = new AtomicInteger(0);
|
private volatile long lastScheduleTimeMs;
|
private volatile DeviceReplayStreamKey latestStreamKey;
|
private volatile ScheduledFuture<?> scheduledFuture;
|
}
|
}
|