#
Administrator
2026-04-25 f7629d0de2d3dd8cd7f96a1f130bbc05b644c7c1
src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java
@@ -6,9 +6,9 @@
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -22,6 +22,8 @@
public class MainProcessAsyncTaskScheduler {
    private static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 5L;
    private static final long IDLE_LANE_TTL_MS = 60_000L;
    private static final long STALE_TASK_GUARD_TTL_MS = 60_000L;
    private final Map<String, SerialTaskLane> laneMap = new ConcurrentHashMap<>();
    private final Map<String, TaskGuard> taskGuardMap = new ConcurrentHashMap<>();
@@ -50,7 +52,10 @@
        }
        try {
            ensureLane(laneName).executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task));
            cleanupStaleState(now);
            SerialTaskLane lane = ensureLane(laneName);
            lane.touch(now);
            lane.executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task));
            return true;
        } catch (Exception e) {
            taskGuard.running.set(false);
@@ -76,6 +81,7 @@
                log.warn("MainProcess async task executed slowly, lane={}, task={}, cost={}ms, queueWaitMs={}ms", laneName, taskName, costMs, queueWaitMs);
            }
            taskGuard.running.set(false);
            cleanupTaskGuardIfIdle(buildTaskKey(laneName, taskName), taskGuard, System.currentTimeMillis());
        }
    }
@@ -87,10 +93,59 @@
        synchronized (laneMap) {
            lane = laneMap.get(laneName);
            if (lane == null || lane.executorService.isShutdown()) {
                lane = new SerialTaskLane(Executors.newSingleThreadExecutor(new NamedThreadFactory("main-process-" + laneName + "-")));
                lane = new SerialTaskLane(createLaneExecutor(laneName));
                laneMap.put(laneName, lane);
            }
            return lane;
        }
    }
    private ThreadPoolExecutor createLaneExecutor(String laneName) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,
                1,
                IDLE_LANE_TTL_MS,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory("main-process-" + laneName + "-")
        );
        executor.allowCoreThreadTimeOut(true);
        return executor;
    }
    private void cleanupStaleState(long now) {
        cleanupStaleTaskGuards(now);
        cleanupIdleLanes(now);
    }
    private void cleanupStaleTaskGuards(long now) {
        for (Map.Entry<String, TaskGuard> entry : taskGuardMap.entrySet()) {
            cleanupTaskGuardIfIdle(entry.getKey(), entry.getValue(), now);
        }
    }
    private void cleanupTaskGuardIfIdle(String taskKey, TaskGuard taskGuard, long now) {
        if (taskGuard == null || taskGuard.running.get()) {
            return;
        }
        long idleMs = now - taskGuard.lastSubmitTimeMs;
        if (taskGuard.lastSubmitTimeMs <= 0L || idleMs < STALE_TASK_GUARD_TTL_MS) {
            return;
        }
        taskGuardMap.remove(taskKey, taskGuard);
    }
    private void cleanupIdleLanes(long now) {
        synchronized (laneMap) {
            for (Map.Entry<String, SerialTaskLane> entry : laneMap.entrySet()) {
                SerialTaskLane lane = entry.getValue();
                if (lane == null || !lane.isIdle(now)) {
                    continue;
                }
                if (laneMap.remove(entry.getKey(), lane)) {
                    lane.executorService.shutdown();
                }
            }
        }
    }
@@ -128,11 +183,29 @@
    private static class SerialTaskLane {
        private final ExecutorService executorService;
        private final ThreadPoolExecutor executorService;
        private volatile long lastTouchTimeMs = System.currentTimeMillis();
        private SerialTaskLane(ExecutorService executorService) {
        private SerialTaskLane(ThreadPoolExecutor executorService) {
            this.executorService = executorService;
        }
        private void touch(long now) {
            this.lastTouchTimeMs = now;
        }
        private boolean isIdle(long now) {
            if (executorService.isShutdown()) {
                return true;
            }
            if (executorService.getActiveCount() > 0) {
                return false;
            }
            if (!executorService.getQueue().isEmpty()) {
                return false;
            }
            return now - lastTouchTimeMs >= IDLE_LANE_TTL_MS;
        }
    }
    private static class TaskGuard {