package com.zy.core.task;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.serializer.SerializerFeature;
|
import com.zy.asrs.entity.DeviceDataLog;
|
import com.zy.asrs.service.DeviceLogReplayManifestService;
|
import com.zy.core.enums.SlaveType;
|
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PreDestroy;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Component;
|
|
import java.io.BufferedWriter;
|
import java.io.IOException;
|
import java.nio.charset.StandardCharsets;
|
import java.nio.file.Files;
|
import java.nio.file.Path;
|
import java.nio.file.Paths;
|
import java.nio.file.StandardOpenOption;
|
import java.text.SimpleDateFormat;
|
import java.time.Duration;
|
import java.util.Date;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.stream.Stream;
|
|
@Slf4j
|
@Component
|
public class DeviceLogFileWriter {
|
|
private static final long MAX_FILE_SIZE_BYTES = 1024L * 1024L;
|
private static final long SHUTDOWN_FLUSH_TIMEOUT_MS = 2_000L;
|
private static final String DATE_PATTERN = "yyyyMMdd";
|
private static final byte[] LINE_SEPARATOR_BYTES = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
|
|
private final ConcurrentHashMap<String, AtomicReference<DeviceLogFileWriteRequest>> pendingByLane = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, AtomicBoolean> runningByLane = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, LaneWriterContext> activeContexts = new ConcurrentHashMap<>();
|
private final AtomicLong submittedCount = new AtomicLong();
|
private final AtomicLong overwrittenCount = new AtomicLong();
|
private final AtomicLong writtenCount = new AtomicLong();
|
private final AtomicLong flushCount = new AtomicLong();
|
private final AtomicLong writeFailureCount = new AtomicLong();
|
private final AtomicLong idleCloseCount = new AtomicLong();
|
private final AtomicLong droppedLaneCount = new AtomicLong();
|
|
private final DeviceLogReplayManifestService manifestService;
|
|
@Value("${deviceLogStorage.loggingPath}")
|
private String loggingPath;
|
@Value("${deviceLogStorage.asyncPublishWorkers:6}")
|
private Integer asyncPublishWorkers;
|
@Value("${deviceLogStorage.directFileQueueCapacity:2048}")
|
private Integer directFileQueueCapacity;
|
@Value("${deviceLogStorage.writerFlushBatchSize:64}")
|
private Integer writerFlushBatchSize;
|
@Value("${deviceLogStorage.writerFlushIntervalMs:200}")
|
private Long writerFlushIntervalMs;
|
@Value("${deviceLogStorage.writerIdleCloseMs:30000}")
|
private Long writerIdleCloseMs;
|
@Value("${deviceLogStorage.manifestRefreshDebounceMs:1000}")
|
private Long manifestRefreshDebounceMs;
|
|
private volatile boolean shuttingDown;
|
private ExecutorService writerExecutor;
|
private ScheduledExecutorService maintenanceExecutor;
|
|
public DeviceLogFileWriter(DeviceLogReplayManifestService manifestService) {
|
this.manifestService = manifestService;
|
}
|
|
@PostConstruct
|
public void initExecutor() {
|
int workerCount = Math.max(1, asyncPublishWorkers == null ? 6 : asyncPublishWorkers);
|
this.writerExecutor = Executors.newFixedThreadPool(workerCount, new DeviceLogFileWriterThreadFactory());
|
this.maintenanceExecutor = Executors.newSingleThreadScheduledExecutor(
|
new NamedThreadFactory("device-log-file-maintenance"));
|
long maintenanceIntervalMs = resolveMaintenanceIntervalMs();
|
this.maintenanceExecutor.scheduleWithFixedDelay(
|
this::runMaintenanceSafely,
|
maintenanceIntervalMs,
|
maintenanceIntervalMs,
|
TimeUnit.MILLISECONDS
|
);
|
}
|
|
@PreDestroy
|
public void shutdownExecutor() {
|
shuttingDown = true;
|
long deadlineNanos = System.nanoTime() + Duration.ofMillis(SHUTDOWN_FLUSH_TIMEOUT_MS).toNanos();
|
|
shutdownMaintenanceExecutor();
|
drainPendingDirectly(deadlineNanos);
|
shutdownWriterExecutor(deadlineNanos);
|
flushAndCloseActiveContexts(deadlineNanos);
|
|
log.info("Device log file writer shutdown, submittedCount={}, overwrittenCount={}, writtenCount={}, flushCount={}, " +
|
"writeFailureCount={}, idleCloseCount={}, droppedLaneCount={}, activeLaneCount={}",
|
submittedCount.get(),
|
overwrittenCount.get(),
|
writtenCount.get(),
|
flushCount.get(),
|
writeFailureCount.get(),
|
idleCloseCount.get(),
|
droppedLaneCount.get(),
|
getActiveLaneCount());
|
}
|
|
public void publishLatest(String laneKey, DeviceDataLog deviceDataLog) {
|
if (shuttingDown || laneKey == null || deviceDataLog == null) {
|
return;
|
}
|
AtomicReference<DeviceLogFileWriteRequest> pending = pendingByLane.get(laneKey);
|
if (pending == null) {
|
int laneCapacity = Math.max(64, directFileQueueCapacity == null ? 2048 : directFileQueueCapacity);
|
if (pendingByLane.size() >= laneCapacity) {
|
droppedLaneCount.incrementAndGet();
|
log.warn("Device direct-file writer lane capacity exceeded, laneKey={}, pendingLanes={}", laneKey, pendingByLane.size());
|
return;
|
}
|
AtomicReference<DeviceLogFileWriteRequest> created = new AtomicReference<>();
|
AtomicReference<DeviceLogFileWriteRequest> previous = pendingByLane.putIfAbsent(laneKey, created);
|
pending = previous == null ? created : previous;
|
}
|
|
DeviceLogFileWriteRequest request = new DeviceLogFileWriteRequest();
|
request.setLaneKey(laneKey);
|
request.setDeviceDataLog(deviceDataLog);
|
|
DeviceLogFileWriteRequest previousRequest = pending.getAndSet(request);
|
submittedCount.incrementAndGet();
|
if (previousRequest != null) {
|
overwrittenCount.incrementAndGet();
|
}
|
scheduleLaneDrain(laneKey, pending);
|
}
|
|
public long getSubmittedCount() {
|
return submittedCount.get();
|
}
|
|
public long getOverwrittenCount() {
|
return overwrittenCount.get();
|
}
|
|
public long getWrittenCount() {
|
return writtenCount.get();
|
}
|
|
public long getFlushCount() {
|
return flushCount.get();
|
}
|
|
public long getWriteFailureCount() {
|
return writeFailureCount.get();
|
}
|
|
public long getIdleCloseCount() {
|
return idleCloseCount.get();
|
}
|
|
public long getDroppedLaneCount() {
|
return droppedLaneCount.get();
|
}
|
|
public int getActiveLaneCount() {
|
return activeContexts.size();
|
}
|
|
private void scheduleLaneDrain(String laneKey, AtomicReference<DeviceLogFileWriteRequest> pending) {
|
if (shuttingDown || pending == null || writerExecutor == null) {
|
return;
|
}
|
AtomicBoolean running = runningByLane.computeIfAbsent(laneKey, key -> new AtomicBoolean(false));
|
if (!running.compareAndSet(false, true)) {
|
return;
|
}
|
try {
|
writerExecutor.execute(() -> drain(laneKey, pending, running));
|
} catch (Exception e) {
|
running.set(false);
|
log.warn("Schedule device log writer lane drain failed, laneKey={}", laneKey, e);
|
}
|
}
|
|
private void drain(String laneKey, AtomicReference<DeviceLogFileWriteRequest> pending, AtomicBoolean running) {
|
try {
|
while (true) {
|
DeviceLogFileWriteRequest request = pending.getAndSet(null);
|
if (request == null) {
|
return;
|
}
|
writeRequest(request);
|
if (shuttingDown) {
|
return;
|
}
|
}
|
} finally {
|
running.set(false);
|
AtomicReference<DeviceLogFileWriteRequest> currentPending = pendingByLane.get(laneKey);
|
if (currentPending == null) {
|
runningByLane.remove(laneKey);
|
return;
|
}
|
if (currentPending.get() == null) {
|
pendingByLane.remove(laneKey, currentPending);
|
runningByLane.remove(laneKey, running);
|
return;
|
}
|
scheduleLaneDrain(laneKey, currentPending);
|
}
|
}
|
|
private void writeRequest(DeviceLogFileWriteRequest request) {
|
DeviceDataLog logItem = request.getDeviceDataLog();
|
if (logItem == null) {
|
return;
|
}
|
try {
|
String day = resolveDay(logItem);
|
String filePrefix = buildFilePrefix(logItem, day);
|
if (filePrefix == null) {
|
return;
|
}
|
Path deviceDir = resolveDeviceDir(Paths.get(loggingPath).resolve(day), logItem);
|
if (deviceDir == null) {
|
return;
|
}
|
|
String jsonLine = buildJsonLine(logItem);
|
long lineBytes = jsonLine.getBytes(StandardCharsets.UTF_8).length + LINE_SEPARATOR_BYTES.length;
|
long now = System.currentTimeMillis();
|
|
LaneWriterContext context = ensureLaneContext(request.getLaneKey(), logItem, day, deviceDir, filePrefix, now);
|
appendLine(context, logItem, jsonLine, lineBytes, now);
|
} catch (Exception e) {
|
writeFailureCount.incrementAndGet();
|
log.error("设备日志写文件失败, laneKey={}, type={}, deviceNo={}, stationId={}",
|
request.getLaneKey(),
|
logItem.getType(),
|
logItem.getDeviceNo(),
|
logItem.getStationId(),
|
e);
|
recycleLaneContext(request.getLaneKey(), null);
|
}
|
}
|
|
private LaneWriterContext ensureLaneContext(String laneKey,
|
DeviceDataLog logItem,
|
String day,
|
Path deviceDir,
|
String filePrefix,
|
long now) throws IOException {
|
LaneWriterContext currentContext = activeContexts.get(laneKey);
|
if (currentContext != null && day.equals(currentContext.day)) {
|
synchronized (currentContext) {
|
if (activeContexts.get(laneKey) == currentContext
|
&& day.equals(currentContext.day)
|
&& currentContext.bufferedWriter != null) {
|
currentContext.lastWriteTimeMs = now;
|
return currentContext;
|
}
|
}
|
}
|
|
if (currentContext != null) {
|
closeLaneContext(laneKey, currentContext, closeReasonFlushManifest(), now);
|
}
|
|
LaneWriterContext newContext = openLaneContext(laneKey, logItem, day, deviceDir, filePrefix, now);
|
LaneWriterContext previousContext = activeContexts.put(laneKey, newContext);
|
if (previousContext != null && previousContext != currentContext && previousContext != newContext) {
|
closeLaneContext(laneKey, previousContext, closeReasonFlushManifest(), now);
|
}
|
return newContext;
|
}
|
|
private LaneWriterContext openLaneContext(String laneKey,
|
DeviceDataLog logItem,
|
String day,
|
Path deviceDir,
|
String filePrefix,
|
long now) throws IOException {
|
Files.createDirectories(deviceDir);
|
|
LaneWriterContext context = new LaneWriterContext();
|
context.laneKey = laneKey;
|
context.day = day;
|
context.type = logItem.getType();
|
context.deviceNo = logItem.getDeviceNo();
|
context.stationId = logItem.getStationId();
|
context.deviceDir = deviceDir;
|
context.filePrefix = filePrefix;
|
context.currentFileIndex = findStartIndex(deviceDir, filePrefix);
|
context.currentFile = deviceDir.resolve(filePrefix + context.currentFileIndex + ".log");
|
context.bufferedWriter = openBufferedWriter(context.currentFile);
|
context.currentFileBytes = Files.exists(context.currentFile) ? Files.size(context.currentFile) : 0L;
|
context.pendingFlushLines = 0;
|
context.lastWriteTimeMs = now;
|
context.lastFlushTimeMs = now;
|
context.manifestDirty = false;
|
context.lastManifestScheduleTimeMs = 0L;
|
return context;
|
}
|
|
private void appendLine(LaneWriterContext context,
|
DeviceDataLog logItem,
|
String jsonLine,
|
long lineBytes,
|
long now) throws IOException {
|
synchronized (context) {
|
rotateIfNeeded(context, lineBytes, now);
|
|
context.bufferedWriter.write(jsonLine);
|
context.bufferedWriter.newLine();
|
context.currentFileBytes += lineBytes;
|
context.pendingFlushLines++;
|
context.lastWriteTimeMs = now;
|
context.manifestDirty = true;
|
|
if (shouldFlushAfterWrite(context, now)) {
|
flushContext(context, buildManifestLogItem(context), now, false);
|
}
|
}
|
writtenCount.incrementAndGet();
|
}
|
|
private void rotateIfNeeded(LaneWriterContext context, long lineBytes, long now) throws IOException {
|
if (context.currentFileBytes + lineBytes <= MAX_FILE_SIZE_BYTES) {
|
return;
|
}
|
|
flushContext(context, buildManifestLogItem(context), now, true);
|
closeBufferedWriterQuietly(context);
|
|
context.currentFileIndex++;
|
context.currentFile = context.deviceDir.resolve(context.filePrefix + context.currentFileIndex + ".log");
|
context.bufferedWriter = openBufferedWriter(context.currentFile);
|
context.currentFileBytes = Files.exists(context.currentFile) ? Files.size(context.currentFile) : 0L;
|
context.pendingFlushLines = 0;
|
context.lastFlushTimeMs = now;
|
context.lastWriteTimeMs = now;
|
context.manifestDirty = false;
|
}
|
|
private boolean shouldFlushAfterWrite(LaneWriterContext context, long now) {
|
int flushBatchSize = Math.max(1, writerFlushBatchSize == null ? 64 : writerFlushBatchSize);
|
long flushIntervalMs = Math.max(1L, writerFlushIntervalMs == null ? 200L : writerFlushIntervalMs);
|
return context.pendingFlushLines >= flushBatchSize || now - context.lastFlushTimeMs >= flushIntervalMs;
|
}
|
|
private void runMaintenanceSafely() {
|
try {
|
long now = System.currentTimeMillis();
|
for (LaneWriterContext context : activeContexts.values()) {
|
inspectLaneContext(context, now);
|
}
|
} catch (Exception e) {
|
log.warn("Device log writer maintenance failed", e);
|
}
|
}
|
|
private void inspectLaneContext(LaneWriterContext context, long now) {
|
if (context == null) {
|
return;
|
}
|
boolean closedByIdle = false;
|
synchronized (context) {
|
if (activeContexts.get(context.laneKey) != context) {
|
return;
|
}
|
long flushIntervalMs = Math.max(1L, writerFlushIntervalMs == null ? 200L : writerFlushIntervalMs);
|
long idleCloseMs = Math.max(flushIntervalMs, writerIdleCloseMs == null ? 30_000L : writerIdleCloseMs);
|
|
if (context.pendingFlushLines > 0 && now - context.lastFlushTimeMs >= flushIntervalMs) {
|
try {
|
flushContext(context, buildManifestLogItem(context), now, false);
|
} catch (IOException e) {
|
closeLaneContextLocked(context.laneKey, context, closeReasonSkipManifest(), now);
|
return;
|
}
|
}
|
if (now - context.lastWriteTimeMs < idleCloseMs) {
|
return;
|
}
|
|
if (activeContexts.get(context.laneKey) != context || now - context.lastWriteTimeMs < idleCloseMs) {
|
return;
|
}
|
|
closeLaneContextLocked(context.laneKey, context, closeReasonFlushManifest(), now);
|
closedByIdle = true;
|
}
|
|
if (closedByIdle) {
|
idleCloseCount.incrementAndGet();
|
log.debug("Device log writer idle close, laneKey={}, file={}", context.laneKey, context.currentFile);
|
}
|
}
|
|
private void flushContext(LaneWriterContext context,
|
DeviceDataLog manifestLogItem,
|
long now,
|
boolean forceManifestSchedule) throws IOException {
|
if (context.pendingFlushLines <= 0 && !shouldScheduleManifest(context, now, forceManifestSchedule)) {
|
return;
|
}
|
|
try {
|
if (context.pendingFlushLines > 0) {
|
context.bufferedWriter.flush();
|
flushCount.incrementAndGet();
|
}
|
context.pendingFlushLines = 0;
|
context.lastFlushTimeMs = now;
|
|
if (shouldScheduleManifest(context, now, forceManifestSchedule) && manifestLogItem != null) {
|
try {
|
manifestService.scheduleManifestRefreshForLog(context.day, manifestLogItem);
|
context.manifestDirty = false;
|
context.lastManifestScheduleTimeMs = now;
|
} catch (Exception e) {
|
log.warn("调度设备日志 manifest 刷新失败, laneKey={}, day={}, type={}, deviceNo={}, stationId={}",
|
context.laneKey,
|
context.day,
|
context.type,
|
context.deviceNo,
|
context.stationId,
|
e);
|
}
|
}
|
} catch (IOException e) {
|
writeFailureCount.incrementAndGet();
|
log.error("设备日志 flush 失败, laneKey={}, file={}, type={}, deviceNo={}, stationId={}",
|
context.laneKey,
|
context.currentFile,
|
context.type,
|
context.deviceNo,
|
context.stationId,
|
e);
|
throw e;
|
}
|
}
|
|
private boolean shouldScheduleManifest(LaneWriterContext context, long now, boolean forceManifestSchedule) {
|
if (!context.manifestDirty) {
|
return false;
|
}
|
if (forceManifestSchedule) {
|
return true;
|
}
|
long debounceMs = Math.max(0L, manifestRefreshDebounceMs == null ? 1000L : manifestRefreshDebounceMs);
|
return now - context.lastManifestScheduleTimeMs >= debounceMs;
|
}
|
|
private void closeLaneContext(String laneKey, LaneWriterContext context, CloseOptions closeOptions, long now) {
|
if (context == null) {
|
return;
|
}
|
synchronized (context) {
|
closeLaneContextLocked(laneKey, context, closeOptions, now);
|
}
|
}
|
|
private void closeLaneContextLocked(String laneKey, LaneWriterContext context, CloseOptions closeOptions, long now) {
|
if (activeContexts.get(laneKey) != context && context.bufferedWriter == null) {
|
return;
|
}
|
try {
|
if (closeOptions.flushBeforeClose) {
|
flushContext(context, buildManifestLogItem(context), now, closeOptions.forceManifestSchedule);
|
}
|
} catch (Exception e) {
|
log.warn("Close lane context after flush failure, laneKey={}, file={}", laneKey, context.currentFile, e);
|
} finally {
|
closeBufferedWriterQuietly(context);
|
context.bufferedWriter = null;
|
activeContexts.remove(laneKey, context);
|
}
|
}
|
|
private void recycleLaneContext(String laneKey, LaneWriterContext expectedContext) {
|
if (laneKey == null) {
|
return;
|
}
|
LaneWriterContext context = expectedContext;
|
if (context == null) {
|
context = activeContexts.get(laneKey);
|
}
|
if (context == null) {
|
return;
|
}
|
closeLaneContext(laneKey, context, closeReasonSkipManifest(), System.currentTimeMillis());
|
}
|
|
private void drainPendingDirectly(long deadlineNanos) {
|
for (AtomicReference<DeviceLogFileWriteRequest> pending : pendingByLane.values()) {
|
if (System.nanoTime() > deadlineNanos) {
|
log.warn("Stop draining pending device logs during shutdown because deadline reached");
|
return;
|
}
|
try {
|
DeviceLogFileWriteRequest request = pending == null ? null : pending.getAndSet(null);
|
if (request != null) {
|
writeRequest(request);
|
}
|
} catch (Exception e) {
|
log.warn("Drain pending device log during shutdown failed", e);
|
}
|
}
|
}
|
|
private void flushAndCloseActiveContexts(long deadlineNanos) {
|
boolean warnedDeadlineExceeded = false;
|
for (LaneWriterContext context : activeContexts.values()) {
|
boolean deadlineExceeded = System.nanoTime() > deadlineNanos;
|
if (deadlineExceeded && !warnedDeadlineExceeded) {
|
log.warn("Stop flushing active device log writers during shutdown because deadline reached, remainingActiveLanes={}",
|
activeContexts.size());
|
warnedDeadlineExceeded = true;
|
}
|
CloseOptions closeOptions = deadlineExceeded ? closeReasonCloseOnly() : closeReasonFlushManifest();
|
closeLaneContext(context.laneKey, context, closeOptions, System.currentTimeMillis());
|
}
|
}
|
|
private void shutdownMaintenanceExecutor() {
|
if (maintenanceExecutor == null) {
|
return;
|
}
|
maintenanceExecutor.shutdownNow();
|
}
|
|
private void shutdownWriterExecutor(long deadlineNanos) {
|
if (writerExecutor == null) {
|
return;
|
}
|
writerExecutor.shutdown();
|
long remainingMs = Math.max(1L, Duration.ofNanos(Math.max(0L, deadlineNanos - System.nanoTime())).toMillis());
|
try {
|
if (!writerExecutor.awaitTermination(remainingMs, TimeUnit.MILLISECONDS)) {
|
log.warn("Device log writer executor did not stop before shutdown deadline");
|
writerExecutor.shutdownNow();
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
writerExecutor.shutdownNow();
|
}
|
}
|
|
private long resolveMaintenanceIntervalMs() {
|
long flushIntervalMs = Math.max(1L, writerFlushIntervalMs == null ? 200L : writerFlushIntervalMs);
|
long idleCloseMs = Math.max(flushIntervalMs, writerIdleCloseMs == null ? 30_000L : writerIdleCloseMs);
|
return Math.max(200L, Math.min(flushIntervalMs, Math.min(idleCloseMs, 1000L)));
|
}
|
|
private String buildJsonLine(DeviceDataLog logItem) {
|
return JSON.toJSONStringWithDateFormat(
|
logItem,
|
"yyyy-MM-dd HH:mm:ss.SSS",
|
SerializerFeature.WriteDateUseDateFormat);
|
}
|
|
private String resolveDay(DeviceDataLog logItem) {
|
Date createTime = logItem.getCreateTime() == null ? new Date() : logItem.getCreateTime();
|
return new SimpleDateFormat(DATE_PATTERN).format(createTime);
|
}
|
|
private Path resolveDeviceDir(Path dayDir, DeviceDataLog logItem) {
|
if (dayDir == null || logItem == null || logItem.getType() == null || logItem.getDeviceNo() == null) {
|
return null;
|
}
|
return dayDir.resolve(logItem.getType()).resolve(String.valueOf(logItem.getDeviceNo()));
|
}
|
|
private String buildFilePrefix(DeviceDataLog logItem, String day) {
|
if (logItem == null || logItem.getType() == null || logItem.getDeviceNo() == null) {
|
return null;
|
}
|
if (String.valueOf(SlaveType.Devp).equals(logItem.getType())) {
|
if (logItem.getStationId() == null) {
|
log.warn("跳过缺少站点号的输送设备日志, deviceNo={}, createTime={}", logItem.getDeviceNo(), logItem.getCreateTime());
|
return null;
|
}
|
return logItem.getType() + "_" + logItem.getDeviceNo() + "_station_" + logItem.getStationId() + "_" + day + "_";
|
}
|
return logItem.getType() + "_" + logItem.getDeviceNo() + "_" + day + "_";
|
}
|
|
private int findStartIndex(Path baseDir, String prefix) throws IOException {
|
int maxIndex = 0;
|
try (Stream<Path> stream = Files.list(baseDir)) {
|
for (Path path : (Iterable<Path>) stream::iterator) {
|
String fileName = path.getFileName().toString();
|
if (!fileName.startsWith(prefix) || !fileName.endsWith(".log")) {
|
continue;
|
}
|
String suffix = fileName.substring(prefix.length(), fileName.length() - 4);
|
if (suffix.isBlank()) {
|
continue;
|
}
|
try {
|
maxIndex = Math.max(maxIndex, Integer.parseInt(suffix));
|
} catch (NumberFormatException ignored) {
|
}
|
}
|
}
|
|
int candidateIndex = maxIndex == 0 ? 1 : maxIndex;
|
Path candidatePath = baseDir.resolve(prefix + candidateIndex + ".log");
|
if (Files.exists(candidatePath) && Files.size(candidatePath) >= MAX_FILE_SIZE_BYTES) {
|
return candidateIndex + 1;
|
}
|
return candidateIndex;
|
}
|
|
private BufferedWriter openBufferedWriter(Path filePath) throws IOException {
|
if (filePath.getParent() != null) {
|
Files.createDirectories(filePath.getParent());
|
}
|
return Files.newBufferedWriter(
|
filePath,
|
StandardCharsets.UTF_8,
|
StandardOpenOption.CREATE,
|
StandardOpenOption.APPEND
|
);
|
}
|
|
private DeviceDataLog buildManifestLogItem(LaneWriterContext context) {
|
if (context == null) {
|
return null;
|
}
|
DeviceDataLog logItem = new DeviceDataLog();
|
logItem.setType(context.type);
|
logItem.setDeviceNo(context.deviceNo);
|
logItem.setStationId(context.stationId);
|
return logItem;
|
}
|
|
private void closeBufferedWriterQuietly(LaneWriterContext context) {
|
if (context == null || context.bufferedWriter == null) {
|
return;
|
}
|
try {
|
context.bufferedWriter.close();
|
} catch (Exception e) {
|
log.warn("Close device log writer failed, laneKey={}, file={}", context.laneKey, context.currentFile, e);
|
}
|
}
|
|
private CloseOptions closeReasonFlushManifest() {
|
return new CloseOptions(true, true);
|
}
|
|
private CloseOptions closeReasonSkipManifest() {
|
return new CloseOptions(false, true);
|
}
|
|
private CloseOptions closeReasonCloseOnly() {
|
return new CloseOptions(false, false);
|
}
|
|
private static class CloseOptions {
|
|
private final boolean forceManifestSchedule;
|
private final boolean flushBeforeClose;
|
|
private CloseOptions(boolean forceManifestSchedule, boolean flushBeforeClose) {
|
this.forceManifestSchedule = forceManifestSchedule;
|
this.flushBeforeClose = flushBeforeClose;
|
}
|
}
|
|
private static class LaneWriterContext {
|
|
private String laneKey;
|
private String day;
|
private String type;
|
private Integer deviceNo;
|
private Integer stationId;
|
private Path deviceDir;
|
private String filePrefix;
|
private int currentFileIndex;
|
private Path currentFile;
|
private BufferedWriter bufferedWriter;
|
private long currentFileBytes;
|
private int pendingFlushLines;
|
private long lastWriteTimeMs;
|
private long lastFlushTimeMs;
|
private boolean manifestDirty;
|
private long lastManifestScheduleTimeMs;
|
}
|
|
private static class DeviceLogFileWriterThreadFactory implements ThreadFactory {
|
|
private final AtomicInteger index = new AtomicInteger(1);
|
|
@Override
|
public Thread newThread(Runnable runnable) {
|
Thread thread = new Thread(runnable, "device-log-file-writer-" + index.getAndIncrement());
|
thread.setDaemon(true);
|
return thread;
|
}
|
}
|
|
private static class NamedThreadFactory implements ThreadFactory {
|
|
private final String prefix;
|
private final AtomicInteger index = new AtomicInteger(1);
|
|
private NamedThreadFactory(String prefix) {
|
this.prefix = prefix;
|
}
|
|
@Override
|
public Thread newThread(Runnable runnable) {
|
Thread thread = new Thread(runnable, prefix + "-" + index.getAndIncrement());
|
thread.setDaemon(true);
|
return thread;
|
}
|
}
|
}
|