package com.zy.core.task;
|
|
import jakarta.annotation.PreDestroy;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* 主流程异步任务调度器。
|
* 按执行通道串行执行任务,并对同名任务提供最小提交间隔和不重入保护。
|
*/
|
@Slf4j
|
@Component
|
public class MainProcessAsyncTaskScheduler {
|
|
private static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 5L;
|
|
private final Map<String, SerialTaskLane> laneMap = new ConcurrentHashMap<>();
|
private final Map<String, TaskGuard> taskGuardMap = new ConcurrentHashMap<>();
|
|
public boolean submit(String laneName,
|
String taskName,
|
long minIntervalMs,
|
long slowLogThresholdMs,
|
Runnable task) {
|
if (isBlank(laneName) || isBlank(taskName) || task == null) {
|
return false;
|
}
|
|
long now = System.currentTimeMillis();
|
String taskKey = buildTaskKey(laneName, taskName);
|
TaskGuard taskGuard = taskGuardMap.computeIfAbsent(taskKey, key -> new TaskGuard());
|
synchronized (taskGuard) {
|
if (taskGuard.lastSubmitTimeMs > 0L && now - taskGuard.lastSubmitTimeMs < minIntervalMs) {
|
return false;
|
}
|
if (!taskGuard.running.compareAndSet(false, true)) {
|
return false;
|
}
|
taskGuard.lastSubmitTimeMs = now;
|
}
|
|
try {
|
ensureLane(laneName).executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task));
|
return true;
|
} catch (Exception e) {
|
taskGuard.running.set(false);
|
log.error("MainProcess async task submit error, lane={}, task={}", laneName, taskName, e);
|
return false;
|
}
|
}
|
|
private void executeTask(String laneName,
|
String taskName,
|
long slowLogThresholdMs,
|
TaskGuard taskGuard,
|
Runnable task) {
|
long startMs = System.currentTimeMillis();
|
try {
|
task.run();
|
} catch (Exception e) {
|
log.error("MainProcess async task execute error, lane={}, task={}", laneName, taskName, e);
|
} finally {
|
long costMs = System.currentTimeMillis() - startMs;
|
if (slowLogThresholdMs > 0L && costMs > slowLogThresholdMs) {
|
log.warn("MainProcess async task executed slowly, lane={}, task={}, cost={}ms", laneName, taskName, costMs);
|
}
|
taskGuard.running.set(false);
|
}
|
}
|
|
private SerialTaskLane ensureLane(String laneName) {
|
SerialTaskLane lane = laneMap.get(laneName);
|
if (lane != null && !lane.executorService.isShutdown()) {
|
return lane;
|
}
|
synchronized (laneMap) {
|
lane = laneMap.get(laneName);
|
if (lane == null || lane.executorService.isShutdown()) {
|
lane = new SerialTaskLane(Executors.newSingleThreadExecutor(new NamedThreadFactory("main-process-" + laneName + "-")));
|
laneMap.put(laneName, lane);
|
}
|
return lane;
|
}
|
}
|
|
@PreDestroy
|
public void shutdown() {
|
for (SerialTaskLane lane : laneMap.values()) {
|
if (lane == null || lane.executorService == null) {
|
continue;
|
}
|
lane.executorService.shutdownNow();
|
}
|
for (Map.Entry<String, SerialTaskLane> entry : laneMap.entrySet()) {
|
SerialTaskLane lane = entry.getValue();
|
if (lane == null || lane.executorService == null) {
|
continue;
|
}
|
try {
|
if (!lane.executorService.awaitTermination(DEFAULT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
|
log.warn("MainProcess async task lane did not terminate in time, lane={}", entry.getKey());
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
return;
|
}
|
}
|
}
|
|
private String buildTaskKey(String laneName, String taskName) {
|
return laneName + "::" + taskName;
|
}
|
|
private boolean isBlank(String value) {
|
return value == null || value.trim().isEmpty();
|
}
|
|
private static class SerialTaskLane {
|
|
private final ExecutorService executorService;
|
|
private SerialTaskLane(ExecutorService executorService) {
|
this.executorService = executorService;
|
}
|
}
|
|
private static class TaskGuard {
|
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
private volatile long lastSubmitTimeMs = 0L;
|
}
|
|
private static class NamedThreadFactory implements ThreadFactory {
|
|
private final String prefix;
|
private final AtomicInteger index = new AtomicInteger(1);
|
|
private NamedThreadFactory(String prefix) {
|
this.prefix = prefix;
|
}
|
|
@Override
|
public Thread newThread(Runnable runnable) {
|
Thread thread = new Thread(runnable, prefix + index.getAndIncrement());
|
thread.setDaemon(true);
|
return thread;
|
}
|
}
|
}
|