#
Junjie
8 小时以前 7b87595a7379c7b250233e2bfcbf8b44ab4a539d
#
1个文件已添加
1个文件已修改
260 ■■■■ 已修改文件
src/main/java/com/zy/core/plugin/GslProcess.java 102 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java 158 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/GslProcess.java
@@ -22,29 +22,25 @@
import com.zy.core.plugin.store.StoreInTaskContext;
import com.zy.core.plugin.store.StoreInTaskGenerationService;
import com.zy.core.plugin.store.StoreInTaskPolicy;
import com.zy.core.task.MainProcessAsyncTaskScheduler;
import com.zy.core.thread.StationThread;
import com.zy.core.utils.CrnOperateProcessUtils;
import com.zy.core.utils.StationOperateProcessUtils;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
public class GslProcess implements MainProcessPluginApi, StoreInTaskPolicy {
    private static final long STATION_DISPATCH_INTERVAL_MS = 200L;
    private static final long STATION_MAINTENANCE_INTERVAL_MS = 500L;
    private static final long STATION_TASK_SLOW_LOG_THRESHOLD_MS = 1000L;
    private static final long STATION_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 5L;
    private static final String CRN_TASK_LANE = "crn";
    private static final String STATION_TASK_LANE = "station";
    private static final long DISPATCH_INTERVAL_MS = 200L;
    private static final long MAINTENANCE_INTERVAL_MS = 500L;
    private static final long TASK_SLOW_LOG_THRESHOLD_MS = 1000L;
    @Autowired
    private CrnOperateProcessUtils crnOperateUtils;
@@ -60,15 +56,8 @@
    private StoreInTaskGenerationService storeInTaskGenerationService;
    @Autowired
    private StationCommandDispatcher stationCommandDispatcher;
    private final ExecutorService stationTaskExecutor = Executors.newSingleThreadExecutor(r -> {
        Thread thread = new Thread(r);
        thread.setName("StationTask");
        thread.setDaemon(true);
        return thread;
    });
    private final Map<String, AtomicBoolean> stationTaskRunningMap = new ConcurrentHashMap<>();
    private final Map<String, Long> stationTaskLastSubmitTimeMap = new ConcurrentHashMap<>();
    @Autowired
    private MainProcessAsyncTaskScheduler mainProcessAsyncTaskScheduler;
    @Override
    public void run() {
@@ -77,17 +66,15 @@
        //请求生成入库任务
        generateStoreWrkFile();
        //执行堆垛机任务
        crnOperateUtils.crnIoExecute();
        //堆垛机任务执行完成
        crnOperateUtils.crnIoExecuteFinish();
        //输送站点逻辑切到后台串行执行,避免阻塞主流程里的堆垛机发任务
        submitStationTask("stationInExecute", STATION_DISPATCH_INTERVAL_MS, stationOperateProcessUtils::stationInExecute);
        submitStationTask("crnStationOutExecute", STATION_DISPATCH_INTERVAL_MS, stationOperateProcessUtils::crnStationOutExecute);
        submitStationTask("checkStationOutOrder", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationOutOrder);
        submitStationTask("watchCircleStation", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::watchCircleStation);
        submitStationTask("checkStationRunBlock", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationRunBlock);
        submitStationTask("checkStationIdleRecover", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationIdleRecover);
        //堆垛机与输送站点都按单个任务提交到各自串行通道,逐个执行
        submitCrnTask("crnIoExecute", DISPATCH_INTERVAL_MS, crnOperateUtils::crnIoExecute);
        submitCrnTask("crnIoExecuteFinish", DISPATCH_INTERVAL_MS, crnOperateUtils::crnIoExecuteFinish);
        submitStationTask("stationInExecute", DISPATCH_INTERVAL_MS, stationOperateProcessUtils::stationInExecute);
        submitStationTask("crnStationOutExecute", DISPATCH_INTERVAL_MS, stationOperateProcessUtils::crnStationOutExecute);
        submitStationTask("checkStationOutOrder", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationOutOrder);
        submitStationTask("watchCircleStation", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::watchCircleStation);
        submitStationTask("checkStationRunBlock", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationRunBlock);
        submitStationTask("checkStationIdleRecover", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationIdleRecover);
    }
    /**
@@ -186,50 +173,21 @@
    }
    private void submitStationTask(String taskName, long minIntervalMs, Runnable task) {
        long now = System.currentTimeMillis();
        Long lastSubmitTime = stationTaskLastSubmitTimeMap.get(taskName);
        if (lastSubmitTime != null && now - lastSubmitTime < minIntervalMs) {
            return;
        }
        AtomicBoolean running = stationTaskRunningMap.computeIfAbsent(taskName, key -> new AtomicBoolean(false));
        if (!running.compareAndSet(false, true)) {
            return;
        }
        stationTaskLastSubmitTimeMap.put(taskName, now);
        try {
            stationTaskExecutor.execute(() -> {
                long startMs = System.currentTimeMillis();
                try {
                    task.run();
                } catch (Exception e) {
                    log.error("GslProcess station task {} execute error", taskName, e);
                } finally {
                    long costMs = System.currentTimeMillis() - startMs;
                    if (costMs > STATION_TASK_SLOW_LOG_THRESHOLD_MS) {
                        log.warn("GslProcess station task {} executed slowly, cost={}ms", taskName, costMs);
                    }
                    running.set(false);
                }
            });
        } catch (Exception e) {
            running.set(false);
            stationTaskLastSubmitTimeMap.remove(taskName);
            log.error("GslProcess station task {} submit error", taskName, e);
        }
        submitProcessTask(STATION_TASK_LANE, taskName, minIntervalMs, task);
    }
    @PreDestroy
    public void shutDown() {
        stationTaskExecutor.shutdownNow();
        try {
            if (!stationTaskExecutor.awaitTermination(STATION_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                log.warn("GslProcess station task executor did not terminate within {}s", STATION_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    private void submitCrnTask(String taskName, long minIntervalMs, Runnable task) {
        submitProcessTask(CRN_TASK_LANE, taskName, minIntervalMs, task);
    }
    private void submitProcessTask(String laneName, String taskName, long minIntervalMs, Runnable task) {
        mainProcessAsyncTaskScheduler.submit(
                laneName,
                taskName,
                minIntervalMs,
                TASK_SLOW_LOG_THRESHOLD_MS,
                task
        );
    }
}
src/main/java/com/zy/core/task/MainProcessAsyncTaskScheduler.java
New file
@@ -0,0 +1,158 @@
package com.zy.core.task;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 主流程异步任务调度器。
 * 按执行通道串行执行任务,并对同名任务提供最小提交间隔和不重入保护。
 */
@Slf4j
@Component
public class MainProcessAsyncTaskScheduler {
    private static final long DEFAULT_SHUTDOWN_TIMEOUT_SECONDS = 5L;
    private final Map<String, SerialTaskLane> laneMap = new ConcurrentHashMap<>();
    private final Map<String, TaskGuard> taskGuardMap = new ConcurrentHashMap<>();
    public boolean submit(String laneName,
                          String taskName,
                          long minIntervalMs,
                          long slowLogThresholdMs,
                          Runnable task) {
        if (isBlank(laneName) || isBlank(taskName) || task == null) {
            return false;
        }
        long now = System.currentTimeMillis();
        String taskKey = buildTaskKey(laneName, taskName);
        TaskGuard taskGuard = taskGuardMap.computeIfAbsent(taskKey, key -> new TaskGuard());
        synchronized (taskGuard) {
            if (taskGuard.lastSubmitTimeMs > 0L && now - taskGuard.lastSubmitTimeMs < minIntervalMs) {
                return false;
            }
            if (!taskGuard.running.compareAndSet(false, true)) {
                return false;
            }
            taskGuard.lastSubmitTimeMs = now;
        }
        try {
            ensureLane(laneName).executorService.execute(() -> executeTask(laneName, taskName, slowLogThresholdMs, taskGuard, task));
            return true;
        } catch (Exception e) {
            taskGuard.running.set(false);
            log.error("MainProcess async task submit error, lane={}, task={}", laneName, taskName, e);
            return false;
        }
    }
    private void executeTask(String laneName,
                             String taskName,
                             long slowLogThresholdMs,
                             TaskGuard taskGuard,
                             Runnable task) {
        long startMs = System.currentTimeMillis();
        try {
            task.run();
        } catch (Exception e) {
            log.error("MainProcess async task execute error, lane={}, task={}", laneName, taskName, e);
        } finally {
            long costMs = System.currentTimeMillis() - startMs;
            if (slowLogThresholdMs > 0L && costMs > slowLogThresholdMs) {
                log.warn("MainProcess async task executed slowly, lane={}, task={}, cost={}ms", laneName, taskName, costMs);
            }
            taskGuard.running.set(false);
        }
    }
    private SerialTaskLane ensureLane(String laneName) {
        SerialTaskLane lane = laneMap.get(laneName);
        if (lane != null && !lane.executorService.isShutdown()) {
            return lane;
        }
        synchronized (laneMap) {
            lane = laneMap.get(laneName);
            if (lane == null || lane.executorService.isShutdown()) {
                lane = new SerialTaskLane(Executors.newSingleThreadExecutor(new NamedThreadFactory("main-process-" + laneName + "-")));
                laneMap.put(laneName, lane);
            }
            return lane;
        }
    }
    @PreDestroy
    public void shutdown() {
        for (SerialTaskLane lane : laneMap.values()) {
            if (lane == null || lane.executorService == null) {
                continue;
            }
            lane.executorService.shutdownNow();
        }
        for (Map.Entry<String, SerialTaskLane> entry : laneMap.entrySet()) {
            SerialTaskLane lane = entry.getValue();
            if (lane == null || lane.executorService == null) {
                continue;
            }
            try {
                if (!lane.executorService.awaitTermination(DEFAULT_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                    log.warn("MainProcess async task lane did not terminate in time, lane={}", entry.getKey());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
    private String buildTaskKey(String laneName, String taskName) {
        return laneName + "::" + taskName;
    }
    private boolean isBlank(String value) {
        return value == null || value.trim().isEmpty();
    }
    private static class SerialTaskLane {
        private final ExecutorService executorService;
        private SerialTaskLane(ExecutorService executorService) {
            this.executorService = executorService;
        }
    }
    private static class TaskGuard {
        private final AtomicBoolean running = new AtomicBoolean(false);
        private volatile long lastSubmitTimeMs = 0L;
    }
    private static class NamedThreadFactory implements ThreadFactory {
        private final String prefix;
        private final AtomicInteger index = new AtomicInteger(1);
        private NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }
        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, prefix + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    }
}