From 007ee7bc1b63aa381b3a414952bbf41aeebed60e Mon Sep 17 00:00:00 2001
From: Administrator <XS@163.COM>
Date: 星期一, 27 四月 2026 16:32:19 +0800
Subject: [PATCH] #
---
src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java | 85 +++++++++++++++++++++++++++++++++++++++---
1 files changed, 79 insertions(+), 6 deletions(-)
diff --git a/src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java b/src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java
index a210337..a55166b 100644
--- a/src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java
+++ b/src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java
@@ -6,9 +6,9 @@
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -22,6 +22,8 @@
public class MainProcessAsyncTaskScheduler {
private static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 5L;
+ private static final long IDLE_LANE_TTL_MS = 60_000L;
+ private static final long STALE_TASK_GUARD_TTL_MS = 60_000L;
private final Map<String, SerialTaskLane> laneMap = new ConcurrentHashMap<>();
private final Map<String, TaskGuard> taskGuardMap = new ConcurrentHashMap<>();
@@ -50,7 +52,10 @@
}
try {
- ensureLane(laneName).executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task));
+ cleanupStaleState(now);
+ SerialTaskLane lane = ensureLane(laneName);
+ lane.touch(now);
+ lane.executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task));
return true;
} catch (Exception e) {
taskGuard.running.set(false);
@@ -76,6 +81,7 @@
log.warn("MainProcess async task executed slowly, lane={}, task={}, cost={}ms, queueWaitMs={}ms", laneName, taskName, costMs, queueWaitMs);
}
taskGuard.running.set(false);
+ cleanupTaskGuardIfIdle(buildTaskKey(laneName, taskName), taskGuard, System.currentTimeMillis());
}
}
@@ -87,10 +93,59 @@
synchronized (laneMap) {
lane = laneMap.get(laneName);
if (lane == null || lane.executorService.isShutdown()) {
- lane = new SerialTaskLane(Executors.newSingleThreadExecutor(new NamedThreadFactory("main-process-" + laneName + "-")));
+ lane = new SerialTaskLane(createLaneExecutor(laneName));
laneMap.put(laneName, lane);
}
return lane;
+ }
+ }
+
+ private ThreadPoolExecutor createLaneExecutor(String laneName) {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ IDLE_LANE_TTL_MS,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("main-process-" + laneName + "-")
+ );
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
+ }
+
+ private void cleanupStaleState(long now) {
+ cleanupStaleTaskGuards(now);
+ cleanupIdleLanes(now);
+ }
+
+ private void cleanupStaleTaskGuards(long now) {
+ for (Map.Entry<String, TaskGuard> entry : taskGuardMap.entrySet()) {
+ cleanupTaskGuardIfIdle(entry.getKey(), entry.getValue(), now);
+ }
+ }
+
+ private void cleanupTaskGuardIfIdle(String taskKey, TaskGuard taskGuard, long now) {
+ if (taskGuard == null || taskGuard.running.get()) {
+ return;
+ }
+ long idleMs = now - taskGuard.lastSubmitTimeMs;
+ if (taskGuard.lastSubmitTimeMs <= 0L || idleMs < STALE_TASK_GUARD_TTL_MS) {
+ return;
+ }
+ taskGuardMap.remove(taskKey, taskGuard);
+ }
+
+ private void cleanupIdleLanes(long now) {
+ synchronized (laneMap) {
+ for (Map.Entry<String, SerialTaskLane> entry : laneMap.entrySet()) {
+ SerialTaskLane lane = entry.getValue();
+ if (lane == null || !lane.isIdle(now)) {
+ continue;
+ }
+ if (laneMap.remove(entry.getKey(), lane)) {
+ lane.executorService.shutdown();
+ }
+ }
}
}
@@ -128,11 +183,29 @@
private static class SerialTaskLane {
- private final ExecutorService executorService;
+ private final ThreadPoolExecutor executorService;
+ private volatile long lastTouchTimeMs = System.currentTimeMillis();
- private SerialTaskLane(ExecutorService executorService) {
+ private SerialTaskLane(ThreadPoolExecutor executorService) {
this.executorService = executorService;
}
+
+ private void touch(long now) {
+ this.lastTouchTimeMs = now;
+ }
+
+ private boolean isIdle(long now) {
+ if (executorService.isShutdown()) {
+ return true;
+ }
+ if (executorService.getActiveCount() > 0) {
+ return false;
+ }
+ if (!executorService.getQueue().isEmpty()) {
+ return false;
+ }
+ return now - lastTouchTimeMs >= IDLE_LANE_TTL_MS;
+ }
}
private static class TaskGuard {
--
Gitblit v1.9.1