| | |
| | | package com.zy.core.task; |
| | | |
| | | import com.zy.asrs.entity.DeviceDataLog; |
| | | import com.zy.common.utils.RedisUtil; |
| | | import com.zy.core.utils.DeviceLogRedisKeyBuilder; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class DeviceAsyncLogPublisher { |
| | | |
| | | private static final String LANE_PREFIX = "device-log-publish-"; |
| | | private static final String TASK_NAME = "publish-device-log"; |
| | | private static final long LOG_TTL_SECONDS = 60L * 60L * 24L; |
| | | private static final long MIN_INTERVAL_MS = 0L; |
| | | private static final int REPLAY_SCHEMA_VERSION = 1; |
| | | |
| | | private final ConcurrentHashMap<String, AtomicReference<DeviceDataLog>> pendingByLane = new ConcurrentHashMap<>(); |
| | | private final AtomicLong sampleSequence = new AtomicLong(System.currentTimeMillis() * 1000L); |
| | | private final DeviceLogFileWriter deviceLogFileWriter; |
| | | |
| | | @Autowired |
| | | private MainProcessTaskSubmitter mainProcessTaskSubmitter; |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | public DeviceAsyncLogPublisher(DeviceLogFileWriter deviceLogFileWriter) { |
| | | this.deviceLogFileWriter = deviceLogFileWriter; |
| | | } |
| | | |
| | | public void publishLatest(DeviceDataLog deviceDataLog) { |
| | | enrichReplayFields(deviceDataLog); |
| | | String laneKey = buildLaneKey(deviceDataLog); |
| | | if (laneKey == null) { |
| | | return; |
| | | } |
| | | pendingByLane |
| | | .computeIfAbsent(laneKey, key -> new AtomicReference<>()) |
| | | .set(deviceDataLog); |
| | | boolean submitted = mainProcessTaskSubmitter.submitKeyedSerialTask( |
| | | LANE_PREFIX, |
| | | laneKey, |
| | | TASK_NAME, |
| | | MIN_INTERVAL_MS, |
| | | () -> drain(laneKey) |
| | | ); |
| | | if (!submitted) { |
| | | log.debug("Skip duplicate device async log publish submit, laneKey={}", laneKey); |
| | | } |
| | | } |
| | | |
| | | private void drain(String laneKey) { |
| | | AtomicReference<DeviceDataLog> pending = pendingByLane.get(laneKey); |
| | | if (pending == null) { |
| | | return; |
| | | } |
| | | while (true) { |
| | | DeviceDataLog deviceDataLog = pending.getAndSet(null); |
| | | if (deviceDataLog == null) { |
| | | return; |
| | | } |
| | | try { |
| | | boolean success = redisUtil.set(DeviceLogRedisKeyBuilder.build(deviceDataLog), deviceDataLog, LOG_TTL_SECONDS); |
| | | if (!success) { |
| | | pending.compareAndSet(null, deviceDataLog); |
| | | log.warn("Device async log publish failed, keep latest pending snapshot, type={}, deviceNo={}, stationId={}", |
| | | deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId()); |
| | | return; |
| | | } |
| | | } catch (Exception e) { |
| | | pending.compareAndSet(null, deviceDataLog); |
| | | log.error("Device async log publish error, type={}, deviceNo={}, stationId={}", |
| | | deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId(), e); |
| | | return; |
| | | } |
| | | } |
| | | // Writer owns lane state, buffering, flush, rotation and shutdown semantics. |
| | | deviceLogFileWriter.publishLatest(laneKey, deviceDataLog); |
| | | } |
| | | |
| | | private String buildLaneKey(DeviceDataLog deviceDataLog) { |
| | | if (deviceDataLog == null || deviceDataLog.getType() == null || deviceDataLog.getDeviceNo() == null) { |
| | | return null; |
| | | } |
| | | if ("Devp".equals(deviceDataLog.getType())) { |
| | | if (deviceDataLog.getStationId() == null) { |
| | | return null; |
| | | } |
| | | return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo() + ":" + deviceDataLog.getStationId(); |
| | | } |
| | | return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo(); |
| | | } |
| | | |
| | | private void enrichReplayFields(DeviceDataLog deviceDataLog) { |
| | | if (deviceDataLog == null) { |
| | | return; |
| | | } |
| | | if (deviceDataLog.getSampleSeq() == null || deviceDataLog.getSampleSeq() <= 0) { |
| | | deviceDataLog.setSampleSeq(sampleSequence.incrementAndGet()); |
| | | } |
| | | if (deviceDataLog.getSampleTimeMs() == null || deviceDataLog.getSampleTimeMs() <= 0) { |
| | | long timestamp = deviceDataLog.getCreateTime() == null |
| | | ? System.currentTimeMillis() |
| | | : deviceDataLog.getCreateTime().getTime(); |
| | | deviceDataLog.setSampleTimeMs(timestamp); |
| | | } |
| | | if (deviceDataLog.getSchemaVersion() == null || deviceDataLog.getSchemaVersion() <= 0) { |
| | | deviceDataLog.setSchemaVersion(REPLAY_SCHEMA_VERSION); |
| | | } |
| | | } |
| | | |
| | | } |