package com.zy.core.task;
|
|
import com.zy.asrs.entity.DeviceDataLog;
|
import com.zy.common.utils.RedisUtil;
|
import com.zy.core.utils.DeviceLogRedisKeyBuilder;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.atomic.AtomicReference;
|
|
@Slf4j
|
@Component
|
public class DeviceAsyncLogPublisher {
|
|
private static final String LANE_PREFIX = "device-log-publish-";
|
private static final String TASK_NAME = "publish-device-log";
|
private static final long LOG_TTL_SECONDS = 60L * 60L * 24L;
|
private static final long MIN_INTERVAL_MS = 0L;
|
|
private final ConcurrentHashMap<String, AtomicReference<DeviceDataLog>> pendingByLane = new ConcurrentHashMap<>();
|
|
@Autowired
|
private MainProcessTaskSubmitter mainProcessTaskSubmitter;
|
@Autowired
|
private RedisUtil redisUtil;
|
|
public void publishLatest(DeviceDataLog deviceDataLog) {
|
String laneKey = buildLaneKey(deviceDataLog);
|
if (laneKey == null) {
|
return;
|
}
|
pendingByLane
|
.computeIfAbsent(laneKey, key -> new AtomicReference<>())
|
.set(deviceDataLog);
|
boolean submitted = mainProcessTaskSubmitter.submitKeyedSerialTask(
|
LANE_PREFIX,
|
laneKey,
|
TASK_NAME,
|
MIN_INTERVAL_MS,
|
() -> drain(laneKey)
|
);
|
if (!submitted) {
|
log.debug("Skip duplicate device async log publish submit, laneKey={}", laneKey);
|
}
|
}
|
|
private void drain(String laneKey) {
|
AtomicReference<DeviceDataLog> pending = pendingByLane.get(laneKey);
|
if (pending == null) {
|
return;
|
}
|
while (true) {
|
DeviceDataLog deviceDataLog = pending.getAndSet(null);
|
if (deviceDataLog == null) {
|
return;
|
}
|
try {
|
boolean success = redisUtil.set(DeviceLogRedisKeyBuilder.build(deviceDataLog), deviceDataLog, LOG_TTL_SECONDS);
|
if (!success) {
|
pending.compareAndSet(null, deviceDataLog);
|
log.warn("Device async log publish failed, keep latest pending snapshot, type={}, deviceNo={}, stationId={}",
|
deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId());
|
return;
|
}
|
} catch (Exception e) {
|
pending.compareAndSet(null, deviceDataLog);
|
log.error("Device async log publish error, type={}, deviceNo={}, stationId={}",
|
deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId(), e);
|
return;
|
}
|
}
|
}
|
|
private String buildLaneKey(DeviceDataLog deviceDataLog) {
|
if (deviceDataLog == null || deviceDataLog.getType() == null || deviceDataLog.getDeviceNo() == null) {
|
return null;
|
}
|
return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo();
|
}
|
}
|