package com.zy.core.task;
|
|
import com.zy.asrs.entity.DeviceDataLog;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
@Slf4j
|
@Component
|
public class DeviceAsyncLogPublisher {
|
|
private static final int REPLAY_SCHEMA_VERSION = 1;
|
|
private final AtomicLong sampleSequence = new AtomicLong(System.currentTimeMillis() * 1000L);
|
private final DeviceLogFileWriter deviceLogFileWriter;
|
|
public DeviceAsyncLogPublisher(DeviceLogFileWriter deviceLogFileWriter) {
|
this.deviceLogFileWriter = deviceLogFileWriter;
|
}
|
|
public void publishLatest(DeviceDataLog deviceDataLog) {
|
enrichReplayFields(deviceDataLog);
|
String laneKey = buildLaneKey(deviceDataLog);
|
if (laneKey == null) {
|
return;
|
}
|
// Writer owns lane state, buffering, flush, rotation and shutdown semantics.
|
deviceLogFileWriter.publishLatest(laneKey, deviceDataLog);
|
}
|
|
private String buildLaneKey(DeviceDataLog deviceDataLog) {
|
if (deviceDataLog == null || deviceDataLog.getType() == null || deviceDataLog.getDeviceNo() == null) {
|
return null;
|
}
|
if ("Devp".equals(deviceDataLog.getType())) {
|
if (deviceDataLog.getStationId() == null) {
|
return null;
|
}
|
return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo() + ":" + deviceDataLog.getStationId();
|
}
|
return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo();
|
}
|
|
private void enrichReplayFields(DeviceDataLog deviceDataLog) {
|
if (deviceDataLog == null) {
|
return;
|
}
|
if (deviceDataLog.getSampleSeq() == null || deviceDataLog.getSampleSeq() <= 0) {
|
deviceDataLog.setSampleSeq(sampleSequence.incrementAndGet());
|
}
|
if (deviceDataLog.getSampleTimeMs() == null || deviceDataLog.getSampleTimeMs() <= 0) {
|
long timestamp = deviceDataLog.getCreateTime() == null
|
? System.currentTimeMillis()
|
: deviceDataLog.getCreateTime().getTime();
|
deviceDataLog.setSampleTimeMs(timestamp);
|
}
|
if (deviceDataLog.getSchemaVersion() == null || deviceDataLog.getSchemaVersion() <= 0) {
|
deviceDataLog.setSchemaVersion(REPLAY_SCHEMA_VERSION);
|
}
|
}
|
|
}
|