Junjie
22 小时以前 c97c04770c17c36c554963bf8bb8d8fafc6a8d43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.zy.core.thread.support;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
 
@Slf4j
@Component
public class StationTaskLocationRegistry {
 
    private static final long DEFAULT_STALE_THRESHOLD_MS = 2_000L;
    private static final long LOOKUP_WARN_INTERVAL_MS = 5_000L;
 
    private final ConcurrentHashMap<Integer, TaskLocationSnapshot> loadingTaskLocationMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, Long> lookupWarnAtMap = new ConcurrentHashMap<>();
 
    public void update(Integer taskNo,
                       Integer deviceNo,
                       Integer stationId,
                       boolean loading,
                       boolean runBlock,
                       long updateTime) {
        if (taskNo == null || taskNo <= 0) {
            return;
        }
        if (!loading || deviceNo == null || stationId == null) {
            remove(taskNo, null, null);
            return;
        }
        loadingTaskLocationMap.put(taskNo, new TaskLocationSnapshot(taskNo, deviceNo, stationId, true, runBlock, updateTime));
    }
 
    public void remove(Integer taskNo, Integer deviceNo, Integer stationId) {
        if (taskNo == null || taskNo <= 0) {
            return;
        }
        loadingTaskLocationMap.computeIfPresent(taskNo, (key, snapshot) -> {
            if (snapshot == null) {
                return null;
            }
            if (deviceNo != null && !Objects.equals(deviceNo, snapshot.getDeviceNo())) {
                return snapshot;
            }
            if (stationId != null && !Objects.equals(stationId, snapshot.getStationId())) {
                return snapshot;
            }
            return null;
        });
    }
 
    public TaskLocationSnapshot findActive(Integer taskNo) {
        return findActive(taskNo, DEFAULT_STALE_THRESHOLD_MS);
    }
 
    public TaskLocationSnapshot findActive(Integer taskNo, long staleThresholdMs) {
        if (taskNo == null || taskNo <= 0) {
            return null;
        }
        TaskLocationSnapshot snapshot = loadingTaskLocationMap.get(taskNo);
        if (snapshot == null) {
            warnLookup(taskNo, "miss", null);
            return null;
        }
        long ageMs = Math.max(0L, System.currentTimeMillis() - snapshot.getUpdateTime());
        if (ageMs > staleThresholdMs) {
            warnLookup(taskNo, "stale", ageMs);
            return null;
        }
        return snapshot;
    }
 
    public void cleanupByDevice(Integer deviceNo, Iterable<Integer> activeTaskNoList) {
        if (deviceNo == null) {
            return;
        }
        ConcurrentHashMap<Integer, Boolean> activeMap = new ConcurrentHashMap<>();
        if (activeTaskNoList != null) {
            for (Integer taskNo : activeTaskNoList) {
                if (taskNo != null && taskNo > 0) {
                    activeMap.put(taskNo, Boolean.TRUE);
                }
            }
        }
        for (Map.Entry<Integer, TaskLocationSnapshot> entry : loadingTaskLocationMap.entrySet()) {
            TaskLocationSnapshot snapshot = entry.getValue();
            if (snapshot == null || !Objects.equals(deviceNo, snapshot.getDeviceNo())) {
                continue;
            }
            if (!activeMap.containsKey(entry.getKey())) {
                loadingTaskLocationMap.remove(entry.getKey(), snapshot);
            }
        }
    }
 
    private void warnLookup(Integer taskNo, String reason, Long ageMs) {
        long now = System.currentTimeMillis();
        Long lastWarnAt = lookupWarnAtMap.get(taskNo);
        if (lastWarnAt != null && now - lastWarnAt < LOOKUP_WARN_INTERVAL_MS) {
            return;
        }
        lookupWarnAtMap.put(taskNo, now);
        if (ageMs == null) {
            log.warn("task-location-registry miss, taskNo={}", taskNo);
            return;
        }
        log.warn("task-location-registry stale, taskNo={}, ageMs={}", taskNo, ageMs);
    }
 
    public static class TaskLocationSnapshot {
        private final Integer taskNo;
        private final Integer deviceNo;
        private final Integer stationId;
        private final boolean loading;
        private final boolean runBlock;
        private final long updateTime;
 
        public TaskLocationSnapshot(Integer taskNo,
                                    Integer deviceNo,
                                    Integer stationId,
                                    boolean loading,
                                    boolean runBlock,
                                    long updateTime) {
            this.taskNo = taskNo;
            this.deviceNo = deviceNo;
            this.stationId = stationId;
            this.loading = loading;
            this.runBlock = runBlock;
            this.updateTime = updateTime;
        }
 
        public Integer getTaskNo() {
            return taskNo;
        }
 
        public Integer getDeviceNo() {
            return deviceNo;
        }
 
        public Integer getStationId() {
            return stationId;
        }
 
        public boolean isLoading() {
            return loading;
        }
 
        public boolean isRunBlock() {
            return runBlock;
        }
 
        public long getUpdateTime() {
            return updateTime;
        }
    }
}