lsh
2026-04-21 720e0926fa1c94b952c26e111206c5d6e1ed5ba2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.zy.core.task;
 
import com.zy.asrs.entity.BasStationOpt;
import com.zy.asrs.service.BasStationOptService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.concurrent.ArrayBlockingQueue;
 
@Slf4j
@Component
public class BasStationOptAsyncPublisher {
 
    private static final String LANE_NAME = "bas-station-opt-save";
    private static final String TASK_NAME = "publish-bas-station-opt";
    private static final long MIN_INTERVAL_MS = 0L;
    private static final int DEFAULT_QUEUE_CAPACITY = 2048;
 
    private final ArrayBlockingQueue<BasStationOpt> pendingQueue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
 
    @Autowired
    private MainProcessTaskSubmitter mainProcessTaskSubmitter;
    @Autowired
    private BasStationOptService basStationOptService;
 
    public boolean publish(BasStationOpt basStationOpt) {
        if (basStationOpt == null) {
            return true;
        }
        if (!pendingQueue.offer(basStationOpt)) {
            log.error("BasStationOpt async publish queue full, fallback to sync save, taskNo={}, stationId={}, targetStationId={}",
                    basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId());
            return false;
        }
        mainProcessTaskSubmitter.submitSerialTask(LANE_NAME, TASK_NAME, MIN_INTERVAL_MS, this::drain);
        return true;
    }
 
    private void drain() {
        while (true) {
            BasStationOpt basStationOpt = pendingQueue.poll();
            if (basStationOpt == null) {
                return;
            }
            try {
                basStationOptService.save(basStationOpt);
            } catch (Exception e) {
                log.error("BasStationOpt async publish error, fallback to sync save next time, taskNo={}, stationId={}, targetStationId={}",
                        basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), e);
                try {
                    basStationOptService.save(basStationOpt);
                } catch (Exception ex) {
                    log.error("BasStationOpt sync fallback save error, taskNo={}, stationId={}, targetStationId={}",
                            basStationOpt.getTaskNo(), basStationOpt.getStationId(), basStationOpt.getTargetStationId(), ex);
                }
            }
        }
    }
}