package com.zy.core.task; import com.zy.asrs.entity.DeviceDataLog; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.atomic.AtomicLong; @Slf4j @Component public class DeviceAsyncLogPublisher { private static final int REPLAY_SCHEMA_VERSION = 1; private final AtomicLong sampleSequence = new AtomicLong(System.currentTimeMillis() * 1000L); private final DeviceLogFileWriter deviceLogFileWriter; public DeviceAsyncLogPublisher(DeviceLogFileWriter deviceLogFileWriter) { this.deviceLogFileWriter = deviceLogFileWriter; } public void publishLatest(DeviceDataLog deviceDataLog) { enrichReplayFields(deviceDataLog); String laneKey = buildLaneKey(deviceDataLog); if (laneKey == null) { return; } // Writer owns lane state, buffering, flush, rotation and shutdown semantics. deviceLogFileWriter.publishLatest(laneKey, deviceDataLog); } private String buildLaneKey(DeviceDataLog deviceDataLog) { if (deviceDataLog == null || deviceDataLog.getType() == null || deviceDataLog.getDeviceNo() == null) { return null; } if ("Devp".equals(deviceDataLog.getType())) { if (deviceDataLog.getStationId() == null) { return null; } return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo() + ":" + deviceDataLog.getStationId(); } return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo(); } private void enrichReplayFields(DeviceDataLog deviceDataLog) { if (deviceDataLog == null) { return; } if (deviceDataLog.getSampleSeq() == null || deviceDataLog.getSampleSeq() <= 0) { deviceDataLog.setSampleSeq(sampleSequence.incrementAndGet()); } if (deviceDataLog.getSampleTimeMs() == null || deviceDataLog.getSampleTimeMs() <= 0) { long timestamp = deviceDataLog.getCreateTime() == null ? System.currentTimeMillis() : deviceDataLog.getCreateTime().getTime(); deviceDataLog.setSampleTimeMs(timestamp); } if (deviceDataLog.getSchemaVersion() == null || deviceDataLog.getSchemaVersion() <= 0) { deviceDataLog.setSchemaVersion(REPLAY_SCHEMA_VERSION); } } }