From dc3f9cc91759823ce59486f19b138be4b296a0f1 Mon Sep 17 00:00:00 2001
From: Junjie <fallin.jie@qq.com>
Date: 星期二, 28 四月 2026 09:43:28 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java | 89 +++++++++++++++++---------------------------
1 files changed, 35 insertions(+), 54 deletions(-)
diff --git a/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java b/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java
index 01642fb..d6bb07e 100644
--- a/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java
+++ b/src/main/java/com/zy/core/task/DeviceAsyncLogPublisher.java
@@ -1,82 +1,63 @@
package com.zy.core.task;
import com.zy.asrs.entity.DeviceDataLog;
-import com.zy.common.utils.RedisUtil;
-import com.zy.core.utils.DeviceLogRedisKeyBuilder;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Component
public class DeviceAsyncLogPublisher {
- private static final String LANE_PREFIX = "device-log-publish-";
- private static final String TASK_NAME = "publish-device-log";
- private static final long LOG_TTL_SECONDS = 60L * 60L * 24L;
- private static final long MIN_INTERVAL_MS = 0L;
+ private static final int REPLAY_SCHEMA_VERSION = 1;
- private final ConcurrentHashMap<String, AtomicReference<DeviceDataLog>> pendingByLane = new ConcurrentHashMap<>();
+ private final AtomicLong sampleSequence = new AtomicLong(System.currentTimeMillis() * 1000L);
+ private final DeviceLogFileWriter deviceLogFileWriter;
- @Autowired
- private MainProcessTaskSubmitter mainProcessTaskSubmitter;
- @Autowired
- private RedisUtil redisUtil;
+ public DeviceAsyncLogPublisher(DeviceLogFileWriter deviceLogFileWriter) {
+ this.deviceLogFileWriter = deviceLogFileWriter;
+ }
public void publishLatest(DeviceDataLog deviceDataLog) {
+ enrichReplayFields(deviceDataLog);
String laneKey = buildLaneKey(deviceDataLog);
if (laneKey == null) {
return;
}
- pendingByLane
- .computeIfAbsent(laneKey, key -> new AtomicReference<>())
- .set(deviceDataLog);
- boolean submitted = mainProcessTaskSubmitter.submitKeyedSerialTask(
- LANE_PREFIX,
- laneKey,
- TASK_NAME,
- MIN_INTERVAL_MS,
- () -> drain(laneKey)
- );
- if (!submitted) {
- log.debug("Skip duplicate device async log publish submit, laneKey={}", laneKey);
- }
- }
-
- private void drain(String laneKey) {
- AtomicReference<DeviceDataLog> pending = pendingByLane.get(laneKey);
- if (pending == null) {
- return;
- }
- while (true) {
- DeviceDataLog deviceDataLog = pending.getAndSet(null);
- if (deviceDataLog == null) {
- return;
- }
- try {
- boolean success = redisUtil.set(DeviceLogRedisKeyBuilder.build(deviceDataLog), deviceDataLog, LOG_TTL_SECONDS);
- if (!success) {
- pending.compareAndSet(null, deviceDataLog);
- log.warn("Device async log publish failed, keep latest pending snapshot, type={}, deviceNo={}, stationId={}",
- deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId());
- return;
- }
- } catch (Exception e) {
- pending.compareAndSet(null, deviceDataLog);
- log.error("Device async log publish error, type={}, deviceNo={}, stationId={}",
- deviceDataLog.getType(), deviceDataLog.getDeviceNo(), deviceDataLog.getStationId(), e);
- return;
- }
- }
+ // Writer owns lane state, buffering, flush, rotation and shutdown semantics.
+ deviceLogFileWriter.publishLatest(laneKey, deviceDataLog);
}
private String buildLaneKey(DeviceDataLog deviceDataLog) {
if (deviceDataLog == null || deviceDataLog.getType() == null || deviceDataLog.getDeviceNo() == null) {
return null;
}
+ if ("Devp".equals(deviceDataLog.getType())) {
+ if (deviceDataLog.getStationId() == null) {
+ return null;
+ }
+ return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo() + ":" + deviceDataLog.getStationId();
+ }
return deviceDataLog.getType() + ":" + deviceDataLog.getDeviceNo();
}
+
+ private void enrichReplayFields(DeviceDataLog deviceDataLog) {
+ if (deviceDataLog == null) {
+ return;
+ }
+ if (deviceDataLog.getSampleSeq() == null || deviceDataLog.getSampleSeq() <= 0) {
+ deviceDataLog.setSampleSeq(sampleSequence.incrementAndGet());
+ }
+ if (deviceDataLog.getSampleTimeMs() == null || deviceDataLog.getSampleTimeMs() <= 0) {
+ long timestamp = deviceDataLog.getCreateTime() == null
+ ? System.currentTimeMillis()
+ : deviceDataLog.getCreateTime().getTime();
+ deviceDataLog.setSampleTimeMs(timestamp);
+ }
+ if (deviceDataLog.getSchemaVersion() == null || deviceDataLog.getSchemaVersion() <= 0) {
+ deviceDataLog.setSchemaVersion(REPLAY_SCHEMA_VERSION);
+ }
+ }
+
}
--
Gitblit v1.9.1