| | |
| | | package com.zy.core.thread.support; |
| | | |
| | | import com.zy.common.utils.RedisUtil; |
| | | |
| | | import java.util.Map; |
| | | import java.util.Objects; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | import java.util.function.LongSupplier; |
| | | |
| | | public class RecentStationArrivalTracker { |
| | | |
| | | static final long DEFAULT_RETAIN_MS = 10_000L; |
| | | static final String KEY_PREFIX = "station_recent_arrival_"; |
| | | |
| | | private final long retainMs; |
| | | private final LongSupplier nowSupplier; |
| | | private final Map<Long, Long> arrivalTimeMap = new ConcurrentHashMap<>(); |
| | | private final AtomicLong lastCleanupAt = new AtomicLong(0L); |
| | | private final RedisUtil redisUtil; |
| | | private final long retainSeconds; |
| | | private final Map<Integer, Integer> lastLoadedTaskByStation = new ConcurrentHashMap<>(); |
| | | |
| | | public RecentStationArrivalTracker() { |
| | | this(DEFAULT_RETAIN_MS, System::currentTimeMillis); |
| | | public RecentStationArrivalTracker(RedisUtil redisUtil) { |
| | | this(redisUtil, DEFAULT_RETAIN_MS); |
| | | } |
| | | |
| | | public RecentStationArrivalTracker(long retainMs, LongSupplier nowSupplier) { |
| | | this.retainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs; |
| | | this.nowSupplier = nowSupplier == null ? System::currentTimeMillis : nowSupplier; |
| | | public RecentStationArrivalTracker(RedisUtil redisUtil, long retainMs) { |
| | | this.redisUtil = redisUtil; |
| | | long effectiveRetainMs = retainMs <= 0L ? DEFAULT_RETAIN_MS : retainMs; |
| | | this.retainSeconds = Math.max(1L, (long) Math.ceil(effectiveRetainMs / 1000d)); |
| | | } |
| | | |
| | | public void observe(Integer stationId, Integer taskNo, boolean loading) { |
| | | if (!loading) { |
| | | if (stationId == null || stationId <= 0) { |
| | | return; |
| | | } |
| | | record(stationId, taskNo); |
| | | } |
| | | |
| | | public void record(Integer stationId, Integer taskNo) { |
| | | Long key = buildKey(stationId, taskNo); |
| | | if (key == null) { |
| | | if (!loading || taskNo == null || taskNo <= 0) { |
| | | lastLoadedTaskByStation.remove(stationId); |
| | | return; |
| | | } |
| | | long now = nowSupplier.getAsLong(); |
| | | arrivalTimeMap.put(key, now); |
| | | cleanupIfNeeded(now); |
| | | Integer previousTaskNo = lastLoadedTaskByStation.put(stationId, taskNo); |
| | | if (Objects.equals(previousTaskNo, taskNo) || redisUtil == null) { |
| | | return; |
| | | } |
| | | redisUtil.set(buildKey(stationId, taskNo), "1", retainSeconds); |
| | | } |
| | | |
| | | public boolean hasRecentArrival(Integer stationId, Integer taskNo) { |
| | | Long key = buildKey(stationId, taskNo); |
| | | if (key == null) { |
| | | if (redisUtil == null) { |
| | | return false; |
| | | } |
| | | long now = nowSupplier.getAsLong(); |
| | | Long arrivalAt = arrivalTimeMap.get(key); |
| | | if (arrivalAt == null) { |
| | | cleanupIfNeeded(now); |
| | | return false; |
| | | } |
| | | if (now - arrivalAt > retainMs) { |
| | | arrivalTimeMap.remove(key, arrivalAt); |
| | | cleanupIfNeeded(now); |
| | | return false; |
| | | } |
| | | cleanupIfNeeded(now); |
| | | return true; |
| | | String key = buildKey(stationId, taskNo); |
| | | return key != null && redisUtil.get(key) != null; |
| | | } |
| | | |
| | | private void cleanupIfNeeded(long now) { |
| | | long lastCleanup = lastCleanupAt.get(); |
| | | if (now - lastCleanup < retainMs || !lastCleanupAt.compareAndSet(lastCleanup, now)) { |
| | | return; |
| | | } |
| | | for (Map.Entry<Long, Long> entry : arrivalTimeMap.entrySet()) { |
| | | if (entry == null || now - entry.getValue() <= retainMs) { |
| | | continue; |
| | | } |
| | | arrivalTimeMap.remove(entry.getKey(), entry.getValue()); |
| | | } |
| | | } |
| | | |
| | | private Long buildKey(Integer stationId, Integer taskNo) { |
| | | private String buildKey(Integer stationId, Integer taskNo) { |
| | | if (stationId == null || stationId <= 0 || taskNo == null || taskNo <= 0) { |
| | | return null; |
| | | } |
| | | return (((long) stationId) << 32) | (taskNo.longValue() & 0xffffffffL); |
| | | return KEY_PREFIX + stationId + "_" + taskNo; |
| | | } |
| | | } |