package com.zy.core.task; import com.zy.asrs.entity.DeviceDataLog; import com.zy.common.utils.RedisUtil; import com.zy.core.utils.DeviceLogRedisKeyBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @Slf4j @Component public class DeviceAsyncLogPublisher { private static final String LANE_PREFIX = "device-log-publish-"; private static final String TASK_NAME = "publish-device-log"; private static final long LOG_TTL_SECONDS = 60L * 60L * 24L; private static final long MIN_INTERVAL_MS = 0L; private final ConcurrentHashMap> pendingByLane = new ConcurrentHashMap<>(); @Autowired private MainProcessTaskSubmitter mainProcessTaskSubmitter; @Autowired private RedisUtil redisUtil; public void publishLatest(DeviceDataLog deviceDataLog) { String laneKey = buildLaneKey(deviceDataLog); if (laneKey == null) { return; } pendingByLane .computeIfAbsent(laneKey, key -> new AtomicReference<>()) .set(deviceDataLog); boolean submitted = mainProcessTaskSubmitter.submitKeyedSerialTask( LANE_PREFIX, laneKey, TASK_NAME, MIN_INTERVAL_MS, () -> drain(laneKey) ); if (!submitted) { log.debug("Skip duplicate device async log publish submit, laneKey={}", laneKey); } } private void drain(String laneKey) { AtomicReference pending = pendingByLane.get(laneKey); if (pending == null) { return; } while (true) { DeviceDataLog deviceDataLog = pending.getAndSet(null); if (deviceDataLog == null) { return; } try { boolean success = redisUtil.set(DeviceLogRedisKeyBuilder.build(deviceDataLog), deviceDataLog, LOG_TTL_SECONDS); if (!success) { pending.compareAndSet(null, deviceDataLog); log.warn("Device async log publish failed, keep latest pending snapshot, type={}, deviceNo={}, stationId={}", deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId()); return; } } catch (Exception e) { pending.compareAndSet(null, deviceDataLog); log.error("Device async log publish error, type={}, deviceNo={}, stationId={}", deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId(), e); return; } } } private String buildLaneKey(DeviceDataLog deviceDataLog) { if (deviceDataLog == null || deviceDataLog.getType() == null || deviceDataLog.getDeviceNo() == null) { return null; } return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo(); } }