| | |
| | | import javax.annotation.PreDestroy; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.ThreadFactory; |
| | | import java.util.concurrent.ThreadPoolExecutor; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | |
| | | public class MainProcessAsyncTaskScheduler { |
| | | |
| | | private static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 5L; |
| | | private static final long IDLE_LANE_TTL_MS = 60_000L; |
| | | private static final long STALE_TASK_GUARD_TTL_MS = 60_000L; |
| | | |
| | | private final Map<String, SerialTaskLane> laneMap = new ConcurrentHashMap<>(); |
| | | private final Map<String, TaskGuard> taskGuardMap = new ConcurrentHashMap<>(); |
| | |
| | | } |
| | | |
| | | try { |
| | | ensureLane(laneName).executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task)); |
| | | cleanupStaleState(now); |
| | | SerialTaskLane lane = ensureLane(laneName); |
| | | lane.touch(now); |
| | | lane.executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task)); |
| | | return true; |
| | | } catch (Exception e) { |
| | | taskGuard.running.set(false); |
| | |
| | | log.warn("MainProcess async task executed slowly, lane={}, task={}, cost={}ms, queueWaitMs={}ms", laneName, taskName, costMs, queueWaitMs); |
| | | } |
| | | taskGuard.running.set(false); |
| | | cleanupTaskGuardIfIdle(buildTaskKey(laneName, taskName), taskGuard, System.currentTimeMillis()); |
| | | } |
| | | } |
| | | |
| | |
| | | synchronized (laneMap) { |
| | | lane = laneMap.get(laneName); |
| | | if (lane == null || lane.executorService.isShutdown()) { |
| | | lane = new SerialTaskLane(Executors.newSingleThreadExecutor(new NamedThreadFactory("main-process-" + laneName + "-"))); |
| | | lane = new SerialTaskLane(createLaneExecutor(laneName)); |
| | | laneMap.put(laneName, lane); |
| | | } |
| | | return lane; |
| | | } |
| | | } |
| | | |
| | | private ThreadPoolExecutor createLaneExecutor(String laneName) { |
| | | ThreadPoolExecutor executor = new ThreadPoolExecutor( |
| | | 1, |
| | | 1, |
| | | IDLE_LANE_TTL_MS, |
| | | TimeUnit.MILLISECONDS, |
| | | new LinkedBlockingQueue<>(), |
| | | new NamedThreadFactory("main-process-" + laneName + "-") |
| | | ); |
| | | executor.allowCoreThreadTimeOut(true); |
| | | return executor; |
| | | } |
| | | |
| | | private void cleanupStaleState(long now) { |
| | | cleanupStaleTaskGuards(now); |
| | | cleanupIdleLanes(now); |
| | | } |
| | | |
| | | private void cleanupStaleTaskGuards(long now) { |
| | | for (Map.Entry<String, TaskGuard> entry : taskGuardMap.entrySet()) { |
| | | cleanupTaskGuardIfIdle(entry.getKey(), entry.getValue(), now); |
| | | } |
| | | } |
| | | |
| | | private void cleanupTaskGuardIfIdle(String taskKey, TaskGuard taskGuard, long now) { |
| | | if (taskGuard == null || taskGuard.running.get()) { |
| | | return; |
| | | } |
| | | long idleMs = now - taskGuard.lastSubmitTimeMs; |
| | | if (taskGuard.lastSubmitTimeMs <= 0L || idleMs < STALE_TASK_GUARD_TTL_MS) { |
| | | return; |
| | | } |
| | | taskGuardMap.remove(taskKey, taskGuard); |
| | | } |
| | | |
| | | private void cleanupIdleLanes(long now) { |
| | | synchronized (laneMap) { |
| | | for (Map.Entry<String, SerialTaskLane> entry : laneMap.entrySet()) { |
| | | SerialTaskLane lane = entry.getValue(); |
| | | if (lane == null || !lane.isIdle(now)) { |
| | | continue; |
| | | } |
| | | if (laneMap.remove(entry.getKey(), lane)) { |
| | | lane.executorService.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | private static class SerialTaskLane { |
| | | |
| | | private final ExecutorService executorService; |
| | | private final ThreadPoolExecutor executorService; |
| | | private volatile long lastTouchTimeMs = System.currentTimeMillis(); |
| | | |
| | | private SerialTaskLane(ExecutorService executorService) { |
| | | private SerialTaskLane(ThreadPoolExecutor executorService) { |
| | | this.executorService = executorService; |
| | | } |
| | | |
| | | private void touch(long now) { |
| | | this.lastTouchTimeMs = now; |
| | | } |
| | | |
| | | private boolean isIdle(long now) { |
| | | if (executorService.isShutdown()) { |
| | | return true; |
| | | } |
| | | if (executorService.getActiveCount() > 0) { |
| | | return false; |
| | | } |
| | | if (!executorService.getQueue().isEmpty()) { |
| | | return false; |
| | | } |
| | | return now - lastTouchTimeMs >= IDLE_LANE_TTL_MS; |
| | | } |
| | | } |
| | | |
| | | private static class TaskGuard { |