package com.zy.core.plugin; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.core.common.Cools; import com.zy.asrs.entity.BasDevp; import com.zy.asrs.service.BasDevpService; import com.zy.common.service.CommonService; import com.zy.common.utils.RedisUtil; import com.zy.core.News; import com.zy.core.cache.SlaveConnection; import com.zy.core.dispatch.StationCommandDispatcher; import com.zy.core.enums.RedisKeyType; import com.zy.core.enums.SlaveType; import com.zy.core.enums.StationCommandType; import com.zy.core.enums.WrkIoType; import com.zy.core.model.StationObjModel; import com.zy.core.model.command.StationCommand; import com.zy.core.model.protocol.StationProtocol; import com.zy.core.plugin.api.MainProcessPluginApi; import com.zy.core.plugin.store.InTaskApplyRequest; import com.zy.core.plugin.store.StoreInTaskContext; import com.zy.core.plugin.store.StoreInTaskGenerationService; import com.zy.core.plugin.store.StoreInTaskPolicy; import com.zy.core.thread.StationThread; import com.zy.core.utils.CrnOperateProcessUtils; import com.zy.core.utils.StationOperateProcessUtils; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @Component public class GslProcess implements MainProcessPluginApi, StoreInTaskPolicy { private static final long STATION_DISPATCH_INTERVAL_MS = 200L; private static final long STATION_MAINTENANCE_INTERVAL_MS = 500L; private static final long STATION_TASK_SLOW_LOG_THRESHOLD_MS = 1000L; private static final long STATION_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 5L; @Autowired private CrnOperateProcessUtils crnOperateUtils; @Autowired private StationOperateProcessUtils stationOperateProcessUtils; @Autowired private CommonService commonService; @Autowired private BasDevpService basDevpService; @Autowired private RedisUtil redisUtil; @Autowired private StoreInTaskGenerationService storeInTaskGenerationService; @Autowired private StationCommandDispatcher stationCommandDispatcher; private final ExecutorService stationTaskExecutor = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setName("StationTask"); thread.setDaemon(true); return thread; }); private final Map stationTaskRunningMap = new ConcurrentHashMap<>(); private final Map stationTaskLastSubmitTimeMap = new ConcurrentHashMap<>(); @Override public void run() { //检测入库站是否有任务生成,并启动入库 checkInStationHasTask(); //请求生成入库任务 generateStoreWrkFile(); //执行堆垛机任务 crnOperateUtils.crnIoExecute(); //堆垛机任务执行完成 crnOperateUtils.crnIoExecuteFinish(); //输送站点逻辑切到后台串行执行,避免阻塞主流程里的堆垛机发任务 submitStationTask("stationInExecute", STATION_DISPATCH_INTERVAL_MS, stationOperateProcessUtils::stationInExecute); submitStationTask("crnStationOutExecute", STATION_DISPATCH_INTERVAL_MS, stationOperateProcessUtils::crnStationOutExecute); submitStationTask("checkStationOutOrder", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationOutOrder); submitStationTask("watchCircleStation", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::watchCircleStation); submitStationTask("checkStationRunBlock", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationRunBlock); submitStationTask("checkStationIdleRecover", STATION_MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationIdleRecover); } /** * 请求生成入库任务 * 入库站,根据条码扫描生成入库工作档 */ public synchronized void generateStoreWrkFile() { storeInTaskGenerationService.generate(this); } @Override public boolean matchCandidate(StoreInTaskContext context) { StationProtocol stationProtocol = context.getStationProtocol(); return stationProtocol.isAutoing() && stationProtocol.isLoading() && stationProtocol.isInEnable() && stationProtocol.getTaskNo() > 0 && !Cools.isEmpty(stationProtocol.getBarcode()); } @Override public boolean beforeApply(StoreInTaskContext context) { StationProtocol stationProtocol = context.getStationProtocol(); if (stationProtocol.getError() <= 0) { return true; } Object lock = redisUtil.get(RedisKeyType.GENERATE_STATION_BACK_LIMIT.key + stationProtocol.getStationId()); if (lock != null) { return false; } StationObjModel backStation = context.getStationObjModel().getBackStation(); StationCommand command = context.getStationThread().getCommand(StationCommandType.MOVE, commonService.getWorkNo(WrkIoType.STATION_BACK.id), context.getStationObjModel().getStationId(), backStation.getStationId(), 0); if (command == null) { News.taskInfo(stationProtocol.getTaskNo(), "{}工作,获取输送线命令失败", stationProtocol.getTaskNo()); return false; } stationCommandDispatcher.dispatch(context.getBasDevp().getDevpNo(), command, "gsl-process", "station-back"); News.taskInfo(stationProtocol.getTaskNo(), "{}扫码异常,已退回至{}", backStation.getStationId()); redisUtil.set(RedisKeyType.GENERATE_STATION_BACK_LIMIT.key + stationProtocol.getStationId(), "lock", 10); return true; } @Override public InTaskApplyRequest buildApplyRequest(StoreInTaskContext context) { InTaskApplyRequest request = StoreInTaskPolicy.super.buildApplyRequest(context); request.getExtraParams().put("weight", context.getStationProtocol().getWeight()); return request; } //检测入库站是否有任务生成,并启动入库 private synchronized void checkInStationHasTask() { List basDevps = basDevpService.list(new QueryWrapper<>()); for (BasDevp basDevp : basDevps) { StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo()); if(stationThread == null){ continue; } Map stationMap = stationThread.getStatusMap(); List list = basDevp.getInStationList$(); for (StationObjModel entity : list) { Integer stationId = entity.getStationId(); if(!stationMap.containsKey(stationId)){ continue; } StationProtocol stationProtocol = stationMap.get(stationId); if (stationProtocol == null) { continue; } Object lock = redisUtil.get(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId); if(lock != null){ continue; } //满足自动、无物、工作号0,生成入库数据 if (stationProtocol.isAutoing() && stationProtocol.isLoading() && stationProtocol.getTaskNo() == 0 && stationProtocol.isEnableIn() ) { StationCommand command = stationThread.getCommand(StationCommandType.MOVE, commonService.getWorkNo(WrkIoType.ENABLE_IN.id), stationId, entity.getBarcodeStation().getStationId(), 0); stationCommandDispatcher.dispatch(basDevp.getDevpNo(), command, "gsl-process", "enable-in"); redisUtil.set(RedisKeyType.GENERATE_ENABLE_IN_STATION_DATA_LIMIT.key + stationId, "lock", 15); News.info("{}站点启动入库成功,数据包:{}", stationId, JSON.toJSONString(command)); } } } } private void submitStationTask(String taskName, long minIntervalMs, Runnable task) { long now = System.currentTimeMillis(); Long lastSubmitTime = stationTaskLastSubmitTimeMap.get(taskName); if (lastSubmitTime != null && now - lastSubmitTime < minIntervalMs) { return; } AtomicBoolean running = stationTaskRunningMap.computeIfAbsent(taskName, key -> new AtomicBoolean(false)); if (!running.compareAndSet(false, true)) { return; } stationTaskLastSubmitTimeMap.put(taskName, now); try { stationTaskExecutor.execute(() -> { long startMs = System.currentTimeMillis(); try { task.run(); } catch (Exception e) { log.error("GslProcess station task {} execute error", taskName, e); } finally { long costMs = System.currentTimeMillis() - startMs; if (costMs > STATION_TASK_SLOW_LOG_THRESHOLD_MS) { log.warn("GslProcess station task {} executed slowly, cost={}ms", taskName, costMs); } running.set(false); } }); } catch (Exception e) { running.set(false); stationTaskLastSubmitTimeMap.remove(taskName); log.error("GslProcess station task {} submit error", taskName, e); } } @PreDestroy public void shutDown() { stationTaskExecutor.shutdownNow(); try { if (!stationTaskExecutor.awaitTermination(STATION_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.warn("GslProcess station task executor did not terminate within {}s", STATION_EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }