package com.zy.core.task; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.zy.asrs.entity.DeviceDataLog; import com.zy.asrs.service.DeviceLogReplayManifestService; import com.zy.core.enums.SlaveType; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.BufferedWriter; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @Slf4j @Component public class DeviceLogFileWriter { private static final long MAX_FILE_SIZE_BYTES = 1024L * 1024L; private static final long SHUTDOWN_FLUSH_TIMEOUT_MS = 2_000L; private static final String DATE_PATTERN = "yyyyMMdd"; private static final byte[] LINE_SEPARATOR_BYTES = System.lineSeparator().getBytes(StandardCharsets.UTF_8); private final ConcurrentHashMap> pendingByLane = new ConcurrentHashMap<>(); private final ConcurrentHashMap runningByLane = new ConcurrentHashMap<>(); private final ConcurrentHashMap activeContexts = new ConcurrentHashMap<>(); private final AtomicLong submittedCount = new AtomicLong(); private final AtomicLong overwrittenCount = new AtomicLong(); private final AtomicLong writtenCount = new AtomicLong(); private final AtomicLong flushCount = new AtomicLong(); private final AtomicLong writeFailureCount = new AtomicLong(); private final AtomicLong idleCloseCount = new AtomicLong(); private final AtomicLong droppedLaneCount = new AtomicLong(); private final DeviceLogReplayManifestService manifestService; @Value("${deviceLogStorage.loggingPath}") private String loggingPath; @Value("${deviceLogStorage.asyncPublishWorkers:6}") private Integer asyncPublishWorkers; @Value("${deviceLogStorage.directFileQueueCapacity:2048}") private Integer directFileQueueCapacity; @Value("${deviceLogStorage.writerFlushBatchSize:64}") private Integer writerFlushBatchSize; @Value("${deviceLogStorage.writerFlushIntervalMs:200}") private Long writerFlushIntervalMs; @Value("${deviceLogStorage.writerIdleCloseMs:30000}") private Long writerIdleCloseMs; @Value("${deviceLogStorage.manifestRefreshDebounceMs:1000}") private Long manifestRefreshDebounceMs; private volatile boolean shuttingDown; private ExecutorService writerExecutor; private ScheduledExecutorService maintenanceExecutor; public DeviceLogFileWriter(DeviceLogReplayManifestService manifestService) { this.manifestService = manifestService; } @PostConstruct public void initExecutor() { int workerCount = Math.max(1, asyncPublishWorkers == null ? 6 : asyncPublishWorkers); this.writerExecutor = Executors.newFixedThreadPool(workerCount, new DeviceLogFileWriterThreadFactory()); this.maintenanceExecutor = Executors.newSingleThreadScheduledExecutor( new NamedThreadFactory("device-log-file-maintenance")); long maintenanceIntervalMs = resolveMaintenanceIntervalMs(); this.maintenanceExecutor.scheduleWithFixedDelay( this::runMaintenanceSafely, maintenanceIntervalMs, maintenanceIntervalMs, TimeUnit.MILLISECONDS ); } @PreDestroy public void shutdownExecutor() { shuttingDown = true; long deadlineNanos = System.nanoTime() + Duration.ofMillis(SHUTDOWN_FLUSH_TIMEOUT_MS).toNanos(); shutdownMaintenanceExecutor(); drainPendingDirectly(deadlineNanos); shutdownWriterExecutor(deadlineNanos); flushAndCloseActiveContexts(deadlineNanos); log.info("Device log file writer shutdown, submittedCount={}, overwrittenCount={}, writtenCount={}, flushCount={}, " + "writeFailureCount={}, idleCloseCount={}, droppedLaneCount={}, activeLaneCount={}", submittedCount.get(), overwrittenCount.get(), writtenCount.get(), flushCount.get(), writeFailureCount.get(), idleCloseCount.get(), droppedLaneCount.get(), getActiveLaneCount()); } public void publishLatest(String laneKey, DeviceDataLog deviceDataLog) { if (shuttingDown || laneKey == null || deviceDataLog == null) { return; } AtomicReference pending = pendingByLane.get(laneKey); if (pending == null) { int laneCapacity = Math.max(64, directFileQueueCapacity == null ? 2048 : directFileQueueCapacity); if (pendingByLane.size() >= laneCapacity) { droppedLaneCount.incrementAndGet(); log.warn("Device direct-file writer lane capacity exceeded, laneKey={}, pendingLanes={}", laneKey, pendingByLane.size()); return; } AtomicReference created = new AtomicReference<>(); AtomicReference previous = pendingByLane.putIfAbsent(laneKey, created); pending = previous == null ? created : previous; } DeviceLogFileWriteRequest request = new DeviceLogFileWriteRequest(); request.setLaneKey(laneKey); request.setDeviceDataLog(deviceDataLog); DeviceLogFileWriteRequest previousRequest = pending.getAndSet(request); submittedCount.incrementAndGet(); if (previousRequest != null) { overwrittenCount.incrementAndGet(); } scheduleLaneDrain(laneKey, pending); } public long getSubmittedCount() { return submittedCount.get(); } public long getOverwrittenCount() { return overwrittenCount.get(); } public long getWrittenCount() { return writtenCount.get(); } public long getFlushCount() { return flushCount.get(); } public long getWriteFailureCount() { return writeFailureCount.get(); } public long getIdleCloseCount() { return idleCloseCount.get(); } public long getDroppedLaneCount() { return droppedLaneCount.get(); } public int getActiveLaneCount() { return activeContexts.size(); } private void scheduleLaneDrain(String laneKey, AtomicReference pending) { if (shuttingDown || pending == null || writerExecutor == null) { return; } AtomicBoolean running = runningByLane.computeIfAbsent(laneKey, key -> new AtomicBoolean(false)); if (!running.compareAndSet(false, true)) { return; } try { writerExecutor.execute(() -> drain(laneKey, pending, running)); } catch (Exception e) { running.set(false); log.warn("Schedule device log writer lane drain failed, laneKey={}", laneKey, e); } } private void drain(String laneKey, AtomicReference pending, AtomicBoolean running) { try { while (true) { DeviceLogFileWriteRequest request = pending.getAndSet(null); if (request == null) { return; } writeRequest(request); if (shuttingDown) { return; } } } finally { running.set(false); AtomicReference currentPending = pendingByLane.get(laneKey); if (currentPending == null) { runningByLane.remove(laneKey); return; } if (currentPending.get() == null) { pendingByLane.remove(laneKey, currentPending); runningByLane.remove(laneKey, running); return; } scheduleLaneDrain(laneKey, currentPending); } } private void writeRequest(DeviceLogFileWriteRequest request) { DeviceDataLog logItem = request.getDeviceDataLog(); if (logItem == null) { return; } try { String day = resolveDay(logItem); String filePrefix = buildFilePrefix(logItem, day); if (filePrefix == null) { return; } Path deviceDir = resolveDeviceDir(Paths.get(loggingPath).resolve(day), logItem); if (deviceDir == null) { return; } String jsonLine = buildJsonLine(logItem); long lineBytes = jsonLine.getBytes(StandardCharsets.UTF_8).length + LINE_SEPARATOR_BYTES.length; long now = System.currentTimeMillis(); LaneWriterContext context = ensureLaneContext(request.getLaneKey(), logItem, day, deviceDir, filePrefix, now); appendLine(context, logItem, jsonLine, lineBytes, now); } catch (Exception e) { writeFailureCount.incrementAndGet(); log.error("设备日志写文件失败, laneKey={}, type={}, deviceNo={}, stationId={}", request.getLaneKey(), logItem.getType(), logItem.getDeviceNo(), logItem.getStationId(), e); recycleLaneContext(request.getLaneKey(), null); } } private LaneWriterContext ensureLaneContext(String laneKey, DeviceDataLog logItem, String day, Path deviceDir, String filePrefix, long now) throws IOException { LaneWriterContext currentContext = activeContexts.get(laneKey); if (currentContext != null && day.equals(currentContext.day)) { synchronized (currentContext) { if (activeContexts.get(laneKey) == currentContext && day.equals(currentContext.day) && currentContext.bufferedWriter != null) { currentContext.lastWriteTimeMs = now; return currentContext; } } } if (currentContext != null) { closeLaneContext(laneKey, currentContext, closeReasonFlushManifest(), now); } LaneWriterContext newContext = openLaneContext(laneKey, logItem, day, deviceDir, filePrefix, now); LaneWriterContext previousContext = activeContexts.put(laneKey, newContext); if (previousContext != null && previousContext != currentContext && previousContext != newContext) { closeLaneContext(laneKey, previousContext, closeReasonFlushManifest(), now); } return newContext; } private LaneWriterContext openLaneContext(String laneKey, DeviceDataLog logItem, String day, Path deviceDir, String filePrefix, long now) throws IOException { Files.createDirectories(deviceDir); LaneWriterContext context = new LaneWriterContext(); context.laneKey = laneKey; context.day = day; context.type = logItem.getType(); context.deviceNo = logItem.getDeviceNo(); context.stationId = logItem.getStationId(); context.deviceDir = deviceDir; context.filePrefix = filePrefix; context.currentFileIndex = findStartIndex(deviceDir, filePrefix); context.currentFile = deviceDir.resolve(filePrefix + context.currentFileIndex + ".log"); context.bufferedWriter = openBufferedWriter(context.currentFile); context.currentFileBytes = Files.exists(context.currentFile) ? Files.size(context.currentFile) : 0L; context.pendingFlushLines = 0; context.lastWriteTimeMs = now; context.lastFlushTimeMs = now; context.manifestDirty = false; context.lastManifestScheduleTimeMs = 0L; return context; } private void appendLine(LaneWriterContext context, DeviceDataLog logItem, String jsonLine, long lineBytes, long now) throws IOException { synchronized (context) { rotateIfNeeded(context, lineBytes, now); context.bufferedWriter.write(jsonLine); context.bufferedWriter.newLine(); context.currentFileBytes += lineBytes; context.pendingFlushLines++; context.lastWriteTimeMs = now; context.manifestDirty = true; if (shouldFlushAfterWrite(context, now)) { flushContext(context, buildManifestLogItem(context), now, false); } } writtenCount.incrementAndGet(); } private void rotateIfNeeded(LaneWriterContext context, long lineBytes, long now) throws IOException { if (context.currentFileBytes + lineBytes <= MAX_FILE_SIZE_BYTES) { return; } flushContext(context, buildManifestLogItem(context), now, true); closeBufferedWriterQuietly(context); context.currentFileIndex++; context.currentFile = context.deviceDir.resolve(context.filePrefix + context.currentFileIndex + ".log"); context.bufferedWriter = openBufferedWriter(context.currentFile); context.currentFileBytes = Files.exists(context.currentFile) ? Files.size(context.currentFile) : 0L; context.pendingFlushLines = 0; context.lastFlushTimeMs = now; context.lastWriteTimeMs = now; context.manifestDirty = false; } private boolean shouldFlushAfterWrite(LaneWriterContext context, long now) { int flushBatchSize = Math.max(1, writerFlushBatchSize == null ? 64 : writerFlushBatchSize); long flushIntervalMs = Math.max(1L, writerFlushIntervalMs == null ? 200L : writerFlushIntervalMs); return context.pendingFlushLines >= flushBatchSize || now - context.lastFlushTimeMs >= flushIntervalMs; } private void runMaintenanceSafely() { try { long now = System.currentTimeMillis(); for (LaneWriterContext context : activeContexts.values()) { inspectLaneContext(context, now); } } catch (Exception e) { log.warn("Device log writer maintenance failed", e); } } private void inspectLaneContext(LaneWriterContext context, long now) { if (context == null) { return; } boolean closedByIdle = false; synchronized (context) { if (activeContexts.get(context.laneKey) != context) { return; } long flushIntervalMs = Math.max(1L, writerFlushIntervalMs == null ? 200L : writerFlushIntervalMs); long idleCloseMs = Math.max(flushIntervalMs, writerIdleCloseMs == null ? 30_000L : writerIdleCloseMs); if (context.pendingFlushLines > 0 && now - context.lastFlushTimeMs >= flushIntervalMs) { try { flushContext(context, buildManifestLogItem(context), now, false); } catch (IOException e) { closeLaneContextLocked(context.laneKey, context, closeReasonSkipManifest(), now); return; } } if (now - context.lastWriteTimeMs < idleCloseMs) { return; } if (activeContexts.get(context.laneKey) != context || now - context.lastWriteTimeMs < idleCloseMs) { return; } closeLaneContextLocked(context.laneKey, context, closeReasonFlushManifest(), now); closedByIdle = true; } if (closedByIdle) { idleCloseCount.incrementAndGet(); log.debug("Device log writer idle close, laneKey={}, file={}", context.laneKey, context.currentFile); } } private void flushContext(LaneWriterContext context, DeviceDataLog manifestLogItem, long now, boolean forceManifestSchedule) throws IOException { if (context.pendingFlushLines <= 0 && !shouldScheduleManifest(context, now, forceManifestSchedule)) { return; } try { if (context.pendingFlushLines > 0) { context.bufferedWriter.flush(); flushCount.incrementAndGet(); } context.pendingFlushLines = 0; context.lastFlushTimeMs = now; if (shouldScheduleManifest(context, now, forceManifestSchedule) && manifestLogItem != null) { try { manifestService.scheduleManifestRefreshForLog(context.day, manifestLogItem); context.manifestDirty = false; context.lastManifestScheduleTimeMs = now; } catch (Exception e) { log.warn("调度设备日志 manifest 刷新失败, laneKey={}, day={}, type={}, deviceNo={}, stationId={}", context.laneKey, context.day, context.type, context.deviceNo, context.stationId, e); } } } catch (IOException e) { writeFailureCount.incrementAndGet(); log.error("设备日志 flush 失败, laneKey={}, file={}, type={}, deviceNo={}, stationId={}", context.laneKey, context.currentFile, context.type, context.deviceNo, context.stationId, e); throw e; } } private boolean shouldScheduleManifest(LaneWriterContext context, long now, boolean forceManifestSchedule) { if (!context.manifestDirty) { return false; } if (forceManifestSchedule) { return true; } long debounceMs = Math.max(0L, manifestRefreshDebounceMs == null ? 1000L : manifestRefreshDebounceMs); return now - context.lastManifestScheduleTimeMs >= debounceMs; } private void closeLaneContext(String laneKey, LaneWriterContext context, CloseOptions closeOptions, long now) { if (context == null) { return; } synchronized (context) { closeLaneContextLocked(laneKey, context, closeOptions, now); } } private void closeLaneContextLocked(String laneKey, LaneWriterContext context, CloseOptions closeOptions, long now) { if (activeContexts.get(laneKey) != context && context.bufferedWriter == null) { return; } try { if (closeOptions.flushBeforeClose) { flushContext(context, buildManifestLogItem(context), now, closeOptions.forceManifestSchedule); } } catch (Exception e) { log.warn("Close lane context after flush failure, laneKey={}, file={}", laneKey, context.currentFile, e); } finally { closeBufferedWriterQuietly(context); context.bufferedWriter = null; activeContexts.remove(laneKey, context); } } private void recycleLaneContext(String laneKey, LaneWriterContext expectedContext) { if (laneKey == null) { return; } LaneWriterContext context = expectedContext; if (context == null) { context = activeContexts.get(laneKey); } if (context == null) { return; } closeLaneContext(laneKey, context, closeReasonSkipManifest(), System.currentTimeMillis()); } private void drainPendingDirectly(long deadlineNanos) { for (AtomicReference pending : pendingByLane.values()) { if (System.nanoTime() > deadlineNanos) { log.warn("Stop draining pending device logs during shutdown because deadline reached"); return; } try { DeviceLogFileWriteRequest request = pending == null ? null : pending.getAndSet(null); if (request != null) { writeRequest(request); } } catch (Exception e) { log.warn("Drain pending device log during shutdown failed", e); } } } private void flushAndCloseActiveContexts(long deadlineNanos) { boolean warnedDeadlineExceeded = false; for (LaneWriterContext context : activeContexts.values()) { boolean deadlineExceeded = System.nanoTime() > deadlineNanos; if (deadlineExceeded && !warnedDeadlineExceeded) { log.warn("Stop flushing active device log writers during shutdown because deadline reached, remainingActiveLanes={}", activeContexts.size()); warnedDeadlineExceeded = true; } CloseOptions closeOptions = deadlineExceeded ? closeReasonCloseOnly() : closeReasonFlushManifest(); closeLaneContext(context.laneKey, context, closeOptions, System.currentTimeMillis()); } } private void shutdownMaintenanceExecutor() { if (maintenanceExecutor == null) { return; } maintenanceExecutor.shutdownNow(); } private void shutdownWriterExecutor(long deadlineNanos) { if (writerExecutor == null) { return; } writerExecutor.shutdown(); long remainingMs = Math.max(1L, Duration.ofNanos(Math.max(0L, deadlineNanos - System.nanoTime())).toMillis()); try { if (!writerExecutor.awaitTermination(remainingMs, TimeUnit.MILLISECONDS)) { log.warn("Device log writer executor did not stop before shutdown deadline"); writerExecutor.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); writerExecutor.shutdownNow(); } } private long resolveMaintenanceIntervalMs() { long flushIntervalMs = Math.max(1L, writerFlushIntervalMs == null ? 200L : writerFlushIntervalMs); long idleCloseMs = Math.max(flushIntervalMs, writerIdleCloseMs == null ? 30_000L : writerIdleCloseMs); return Math.max(200L, Math.min(flushIntervalMs, Math.min(idleCloseMs, 1000L))); } private String buildJsonLine(DeviceDataLog logItem) { return JSON.toJSONStringWithDateFormat( logItem, "yyyy-MM-dd HH:mm:ss.SSS", SerializerFeature.WriteDateUseDateFormat); } private String resolveDay(DeviceDataLog logItem) { Date createTime = logItem.getCreateTime() == null ? new Date() : logItem.getCreateTime(); return new SimpleDateFormat(DATE_PATTERN).format(createTime); } private Path resolveDeviceDir(Path dayDir, DeviceDataLog logItem) { if (dayDir == null || logItem == null || logItem.getType() == null || logItem.getDeviceNo() == null) { return null; } return dayDir.resolve(logItem.getType()).resolve(String.valueOf(logItem.getDeviceNo())); } private String buildFilePrefix(DeviceDataLog logItem, String day) { if (logItem == null || logItem.getType() == null || logItem.getDeviceNo() == null) { return null; } if (String.valueOf(SlaveType.Devp).equals(logItem.getType())) { if (logItem.getStationId() == null) { log.warn("跳过缺少站点号的输送设备日志, deviceNo={}, createTime={}", logItem.getDeviceNo(), logItem.getCreateTime()); return null; } return logItem.getType() + "_" + logItem.getDeviceNo() + "_station_" + logItem.getStationId() + "_" + day + "_"; } return logItem.getType() + "_" + logItem.getDeviceNo() + "_" + day + "_"; } private int findStartIndex(Path baseDir, String prefix) throws IOException { int maxIndex = 0; try (Stream stream = Files.list(baseDir)) { for (Path path : (Iterable) stream::iterator) { String fileName = path.getFileName().toString(); if (!fileName.startsWith(prefix) || !fileName.endsWith(".log")) { continue; } String suffix = fileName.substring(prefix.length(), fileName.length() - 4); if (suffix.isBlank()) { continue; } try { maxIndex = Math.max(maxIndex, Integer.parseInt(suffix)); } catch (NumberFormatException ignored) { } } } int candidateIndex = maxIndex == 0 ? 1 : maxIndex; Path candidatePath = baseDir.resolve(prefix + candidateIndex + ".log"); if (Files.exists(candidatePath) && Files.size(candidatePath) >= MAX_FILE_SIZE_BYTES) { return candidateIndex + 1; } return candidateIndex; } private BufferedWriter openBufferedWriter(Path filePath) throws IOException { if (filePath.getParent() != null) { Files.createDirectories(filePath.getParent()); } return Files.newBufferedWriter( filePath, StandardCharsets.UTF_8, StandardOpenOption.CREATE, StandardOpenOption.APPEND ); } private DeviceDataLog buildManifestLogItem(LaneWriterContext context) { if (context == null) { return null; } DeviceDataLog logItem = new DeviceDataLog(); logItem.setType(context.type); logItem.setDeviceNo(context.deviceNo); logItem.setStationId(context.stationId); return logItem; } private void closeBufferedWriterQuietly(LaneWriterContext context) { if (context == null || context.bufferedWriter == null) { return; } try { context.bufferedWriter.close(); } catch (Exception e) { log.warn("Close device log writer failed, laneKey={}, file={}", context.laneKey, context.currentFile, e); } } private CloseOptions closeReasonFlushManifest() { return new CloseOptions(true, true); } private CloseOptions closeReasonSkipManifest() { return new CloseOptions(false, true); } private CloseOptions closeReasonCloseOnly() { return new CloseOptions(false, false); } private static class CloseOptions { private final boolean forceManifestSchedule; private final boolean flushBeforeClose; private CloseOptions(boolean forceManifestSchedule, boolean flushBeforeClose) { this.forceManifestSchedule = forceManifestSchedule; this.flushBeforeClose = flushBeforeClose; } } private static class LaneWriterContext { private String laneKey; private String day; private String type; private Integer deviceNo; private Integer stationId; private Path deviceDir; private String filePrefix; private int currentFileIndex; private Path currentFile; private BufferedWriter bufferedWriter; private long currentFileBytes; private int pendingFlushLines; private long lastWriteTimeMs; private long lastFlushTimeMs; private boolean manifestDirty; private long lastManifestScheduleTimeMs; } private static class DeviceLogFileWriterThreadFactory implements ThreadFactory { private final AtomicInteger index = new AtomicInteger(1); @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "device-log-file-writer-" + index.getAndIncrement()); thread.setDaemon(true); return thread; } } private static class NamedThreadFactory implements ThreadFactory { private final String prefix; private final AtomicInteger index = new AtomicInteger(1); private NamedThreadFactory(String prefix) { this.prefix = prefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, prefix + "-" + index.getAndIncrement()); thread.setDaemon(true); return thread; } } }