From dc3f9cc91759823ce59486f19b138be4b296a0f1 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 28 四月 2026 09:43:28 +0800
Subject: [PATCH] #

---
 src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java |   89 +++++++++++++++++---------------------------
 1 files changed, 35 insertions(+), 54 deletions(-)

diff --git a/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java b/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java
index 01642fb..d6bb07e 100644
--- a/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java
+++ b/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java
@@ -1,82 +1,63 @@
 package com.zy.core.task;
 
 import com.zy.asrs.entity.DeviceDataLog;
-import com.zy.common.utils.RedisUtil;
-import com.zy.core.utils.DeviceLogRedisKeyBuilder;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
 
 @Slf4j
 @Component
 public class DeviceAsyncLogPublisher {
 
-    private static final String LANE_PREFIX = "device-log-publish-";
-    private static final String TASK_NAME = "publish-device-log";
-    private static final long LOG_TTL_SECONDS = 60L * 60L * 24L;
-    private static final long MIN_INTERVAL_MS = 0L;
+    private static final int REPLAY_SCHEMA_VERSION = 1;
 
-    private final ConcurrentHashMap<String, AtomicReference<DeviceDataLog>> pendingByLane = new ConcurrentHashMap<>();
+    private final AtomicLong sampleSequence = new AtomicLong(System.currentTimeMillis() * 1000L);
+    private final DeviceLogFileWriter deviceLogFileWriter;
 
-    @Autowired
-    private MainProcessTaskSubmitter mainProcessTaskSubmitter;
-    @Autowired
-    private RedisUtil redisUtil;
+    public DeviceAsyncLogPublisher(DeviceLogFileWriter deviceLogFileWriter) {
+        this.deviceLogFileWriter = deviceLogFileWriter;
+    }
 
     public void publishLatest(DeviceDataLog deviceDataLog) {
+        enrichReplayFields(deviceDataLog);
         String laneKey = buildLaneKey(deviceDataLog);
         if (laneKey == null) {
             return;
         }
-        pendingByLane
-                .computeIfAbsent(laneKey, key -> new AtomicReference<>())
-                .set(deviceDataLog);
-        boolean submitted = mainProcessTaskSubmitter.submitKeyedSerialTask(
-                LANE_PREFIX,
-                laneKey,
-                TASK_NAME,
-                MIN_INTERVAL_MS,
-                () -> drain(laneKey)
-        );
-        if (!submitted) {
-            log.debug("Skip duplicate device async log publish submit, laneKey={}", laneKey);
-        }
-    }
-
-    private void drain(String laneKey) {
-        AtomicReference<DeviceDataLog> pending = pendingByLane.get(laneKey);
-        if (pending == null) {
-            return;
-        }
-        while (true) {
-            DeviceDataLog deviceDataLog = pending.getAndSet(null);
-            if (deviceDataLog == null) {
-                return;
-            }
-            try {
-                boolean success = redisUtil.set(DeviceLogRedisKeyBuilder.build(deviceDataLog), deviceDataLog, LOG_TTL_SECONDS);
-                if (!success) {
-                    pending.compareAndSet(null, deviceDataLog);
-                    log.warn("Device async log publish failed, keep latest pending snapshot, type={}, deviceNo={}, stationId={}",
-                            deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId());
-                    return;
-                }
-            } catch (Exception e) {
-                pending.compareAndSet(null, deviceDataLog);
-                log.error("Device async log publish error, type={}, deviceNo={}, stationId={}",
-                        deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId(), e);
-                return;
-            }
-        }
+        // Writer owns lane state, buffering, flush, rotation and shutdown semantics.
+        deviceLogFileWriter.publishLatest(laneKey, deviceDataLog);
     }
 
     private String buildLaneKey(DeviceDataLog deviceDataLog) {
         if (deviceDataLog == null || deviceDataLog.getType() == null || deviceDataLog.getDeviceNo() == null) {
             return null;
         }
+        if ("Devp".equals(deviceDataLog.getType())) {
+            if (deviceDataLog.getStationId() == null) {
+                return null;
+            }
+            return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo() + ":" + deviceDataLog.getStationId();
+        }
         return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo();
     }
+
+    private void enrichReplayFields(DeviceDataLog deviceDataLog) {
+        if (deviceDataLog == null) {
+            return;
+        }
+        if (deviceDataLog.getSampleSeq() == null || deviceDataLog.getSampleSeq() <= 0) {
+            deviceDataLog.setSampleSeq(sampleSequence.incrementAndGet());
+        }
+        if (deviceDataLog.getSampleTimeMs() == null || deviceDataLog.getSampleTimeMs() <= 0) {
+            long timestamp = deviceDataLog.getCreateTime() == null
+                    ? System.currentTimeMillis()
+                    : deviceDataLog.getCreateTime().getTime();
+            deviceDataLog.setSampleTimeMs(timestamp);
+        }
+        if (deviceDataLog.getSchemaVersion() == null || deviceDataLog.getSchemaVersion() <= 0) {
+            deviceDataLog.setSchemaVersion(REPLAY_SCHEMA_VERSION);
+        }
+    }
+
 }

--
Gitblit v1.9.1