package com.zy.core.task; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * 主流程异步任务调度器。 * 按执行通道串行执行任务,并对同名任务提供最小提交间隔和不重入保护。 */ @Slf4j @Component public class MainProcessAsyncTaskScheduler { private static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 5L; private final Map laneMap = new ConcurrentHashMap<>(); private final Map taskGuardMap = new ConcurrentHashMap<>(); public boolean submit(String laneName, String taskName, long minIntervalMs, long slowLogThresholdMs, Runnable task) { if (isBlank(laneName) || isBlank(taskName) || task == null) { return false; } long now = System.currentTimeMillis(); String taskKey = buildTaskKey(laneName, taskName); TaskGuard taskGuard = taskGuardMap.computeIfAbsent(taskKey, key -> new TaskGuard()); synchronized (taskGuard) { if (taskGuard.lastSubmitTimeMs > 0L && now - taskGuard.lastSubmitTimeMs < minIntervalMs) { return false; } if (!taskGuard.running.compareAndSet(false, true)) { return false; } taskGuard.lastSubmitTimeMs = now; } try { ensureLane(laneName).executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task)); return true; } catch (Exception e) { taskGuard.running.set(false); log.error("MainProcess async task submit error, lane={}, task={}", laneName, taskName, e); return false; } } private void executeTask(String laneName, String taskName, long slowLogThresholdMs, TaskGuard taskGuard, Runnable task) { long startMs = System.currentTimeMillis(); try { task.run(); } catch (Exception e) { log.error("MainProcess async task execute error, lane={}, task={}", laneName, taskName, e); } finally { long costMs = System.currentTimeMillis() - startMs; if (slowLogThresholdMs > 0L && costMs > slowLogThresholdMs) { log.warn("MainProcess async task executed slowly, lane={}, task={}, cost={}ms", laneName, taskName, costMs); } taskGuard.running.set(false); } } private SerialTaskLane ensureLane(String laneName) { SerialTaskLane lane = laneMap.get(laneName); if (lane != null && !lane.executorService.isShutdown()) { return lane; } synchronized (laneMap) { lane = laneMap.get(laneName); if (lane == null || lane.executorService.isShutdown()) { lane = new SerialTaskLane(Executors.newSingleThreadExecutor(new NamedThreadFactory("main-process-" + laneName + "-"))); laneMap.put(laneName, lane); } return lane; } } @PreDestroy public void shutdown() { for (SerialTaskLane lane : laneMap.values()) { if (lane == null || lane.executorService == null) { continue; } lane.executorService.shutdownNow(); } for (Map.Entry entry : laneMap.entrySet()) { SerialTaskLane lane = entry.getValue(); if (lane == null || lane.executorService == null) { continue; } try { if (!lane.executorService.awaitTermination(DEFAULT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.warn("MainProcess async task lane did not terminate in time, lane={}", entry.getKey()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } } private String buildTaskKey(String laneName, String taskName) { return laneName + "::" + taskName; } private boolean isBlank(String value) { return value == null || value.trim().isEmpty(); } private static class SerialTaskLane { private final ExecutorService executorService; private SerialTaskLane(ExecutorService executorService) { this.executorService = executorService; } } private static class TaskGuard { private final AtomicBoolean running = new AtomicBoolean(false); private volatile long lastSubmitTimeMs = 0L; } private static class NamedThreadFactory implements ThreadFactory { private final String prefix; private final AtomicInteger index = new AtomicInteger(1); private NamedThreadFactory(String prefix) { this.prefix = prefix; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, prefix + index.getAndIncrement()); thread.setDaemon(true); return thread; } } }