Junjie
1 天以前 88892fc38d50297c5ac5edf3fed629236bb9fd9d
#入库同步串行请求
3个文件已修改
228 ■■■■■ 已修改文件
src/main/java/com/zy/core/plugin/GslProcess.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java 192 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/WmsOperateUtils.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/GslProcess.java
@@ -38,7 +38,7 @@
public class GslProcess implements MainProcessPluginApi, StoreInTaskPolicy {
    private static final String CRN_TASK_LANE = "crn";
    private static final String STATION_TASK_LANE = "station";
    private static final String GENERATE_STORE_TASK_LANE = "generate-store";
    private static final String GENERATE_STORE_TASK_LANE_PREFIX = "generate-store-";
    private static final long DISPATCH_INTERVAL_MS = 200L;
    private static final long MAINTENANCE_INTERVAL_MS = 500L;
    private static final long TASK_SLOW_LOG_THRESHOLD_MS = 1000L;
@@ -64,8 +64,8 @@
    public void run() {
        //检测入库站是否有任务生成,并启动入库
        checkInStationHasTask();
        //请求生成入库任务
        submitGenerateStoreTask("generateStoreWrkFile", DISPATCH_INTERVAL_MS, this::generateStoreWrkFile);
        //按站点拆分生成入库任务,避免单个站点阻塞整轮扫描
        submitGenerateStoreTasks();
        //堆垛机与输送站点都按单个任务提交到各自串行通道,逐个执行
        submitCrnTask("crnIoExecute", DISPATCH_INTERVAL_MS, crnOperateUtils::crnIoExecute);
@@ -76,14 +76,6 @@
        submitStationTask("watchCircleStation", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::watchCircleStation);
        submitStationTask("checkStationRunBlock", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationRunBlock);
        submitStationTask("checkStationIdleRecover", MAINTENANCE_INTERVAL_MS, stationOperateProcessUtils::checkStationIdleRecover);
    }
    /**
     * 请求生成入库任务
     * 入库站,根据条码扫描生成入库工作档
     */
    public synchronized void generateStoreWrkFile() {
        storeInTaskGenerationService.generate(this);
    }
    @Override
@@ -177,8 +169,23 @@
        submitProcessTask(STATION_TASK_LANE, taskName, minIntervalMs, task);
    }
    private void submitGenerateStoreTask(String taskName, long minIntervalMs, Runnable task) {
        submitProcessTask(GENERATE_STORE_TASK_LANE, taskName, minIntervalMs, task);
    private void submitGenerateStoreTasks() {
        List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
        for (BasDevp basDevp : basDevps) {
            List<StationObjModel> barcodeStations = getBarcodeStations(basDevp);
            for (StationObjModel stationObjModel : barcodeStations) {
                Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
                if (stationId == null) {
                    continue;
                }
                submitGenerateStoreTask(stationId, DISPATCH_INTERVAL_MS,
                        () -> storeInTaskGenerationService.generate(this, basDevp, stationObjModel));
            }
        }
    }
    private void submitGenerateStoreTask(Integer stationId, long minIntervalMs, Runnable task) {
        submitProcessTask(GENERATE_STORE_TASK_LANE_PREFIX + stationId, "generateStoreWrkFile", minIntervalMs, task);
    }
    private void submitCrnTask(String taskName, long minIntervalMs, Runnable task) {
src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
@@ -27,9 +27,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class StoreInTaskGenerationService {
    private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5;
    @Autowired
    private BasDevpService basDevpService;
@@ -44,81 +46,112 @@
    @Autowired
    private CommonService commonService;
    private final AtomicInteger inFlightGenerateCount = new AtomicInteger(0);
    public void generate(StoreInTaskPolicy policy) {
        try {
            if (!policy.isEnabled()) {
                return;
            }
            Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
            if (systemConfigMapObj == null) {
            HashMap<String, String> systemConfigMap = getSystemConfigMap();
            if (systemConfigMap == null) {
                return;
            }
            HashMap<String, String> systemConfigMap = (HashMap<String, String>) systemConfigMapObj;
            int conveyorStationTaskLimit = 30;
            String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
            if (conveyorStationTaskLimitStr != null) {
                conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
            }
            int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
            if (currentStationTaskCount > conveyorStationTaskLimit) {
                News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount);
            if (!hasAvailableStationTaskCapacity(systemConfigMap)) {
                return;
            }
            List<BasDevp> basDevps = basDevpService.list(new QueryWrapper<>());
            for (BasDevp basDevp : basDevps) {
                StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
                if (stationThread == null) {
                    continue;
                }
                Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
                List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp);
                for (StationObjModel stationObjModel : barcodeStations) {
                    Integer stationId = stationObjModel.getStationId();
                    if (!stationMap.containsKey(stationId)) {
                        continue;
                    }
                    StationProtocol stationProtocol = stationMap.get(stationId);
                    if (stationProtocol == null) {
                        continue;
                    }
                    StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
                            stationProtocol);
                    if (!policy.matchCandidate(context)) {
                        continue;
                    }
                    if (!policy.beforeApply(context)) {
                        continue;
                    }
                    List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                            .eq("barcode", stationProtocol.getBarcode()));
                    if (!wrkMasts.isEmpty()) {
                        continue;
                    }
                    String generateLockKey = policy.getGenerateLockKey(context);
                    Object lock = redisUtil.get(generateLockKey);
                    if (lock != null) {
                        continue;
                    }
                    policy.onRequestPermitGranted(context);
                    InTaskApplyRequest request = policy.buildApplyRequest(context);
                    News.info("发起同步WMS入库请求,barcode={},stationId={}", request.getBarcode(),
                            request.getSourceStaNo());
                    InTaskApplyResult result = applySyncInTask(request);
                    handleApplyResult(policy, context, request, result);
                    generate(policy, basDevp, stationObjModel, systemConfigMap);
                }
            }
        } catch (Exception e) {
            News.error("生成入库任务异常,policy={}", policy.getPolicyName(), e);
        }
    }
    public void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel) {
        try {
            if (!policy.isEnabled()) {
                return;
            }
            HashMap<String, String> systemConfigMap = getSystemConfigMap();
            if (systemConfigMap == null) {
                return;
            }
            if (!hasAvailableStationTaskCapacity(systemConfigMap)) {
                return;
            }
            generate(policy, basDevp, stationObjModel, systemConfigMap);
        } catch (Exception e) {
            Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
            News.error("生成入库任务异常,policy={},stationId={}", policy.getPolicyName(), stationId, e);
        }
    }
    private void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
                          HashMap<String, String> systemConfigMap) {
        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
            return;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
        if (stationThread == null) {
            return;
        }
        Integer stationId = stationObjModel.getStationId();
        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
        if (!stationMap.containsKey(stationId)) {
            return;
        }
        StationProtocol stationProtocol = stationMap.get(stationId);
        if (stationProtocol == null) {
            return;
        }
        StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
                stationProtocol);
        if (!policy.matchCandidate(context)) {
            return;
        }
        if (!policy.beforeApply(context)) {
            return;
        }
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("barcode", stationProtocol.getBarcode()));
        if (!wrkMasts.isEmpty()) {
            return;
        }
        String generateLockKey = policy.getGenerateLockKey(context);
        Object lock = redisUtil.get(generateLockKey);
        if (lock != null) {
            return;
        }
        if (!tryReserveGenerateCapacity(systemConfigMap)) {
            return;
        }
        try {
            policy.onRequestPermitGranted(context);
            InTaskApplyRequest request = policy.buildApplyRequest(context);
            News.info("发起同步WMS入库请求,barcode={},stationId={},timeout={}s",
                    request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS);
            InTaskApplyResult result = applySyncInTask(request);
            handleApplyResult(policy, context, request, result);
        } finally {
            releaseGenerateCapacity();
        }
    }
@@ -199,4 +232,51 @@
        return result;
    }
    private HashMap<String, String> getSystemConfigMap() {
        Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
        if (systemConfigMapObj == null) {
            return null;
        }
        return (HashMap<String, String>) systemConfigMapObj;
    }
    private boolean hasAvailableStationTaskCapacity(HashMap<String, String> systemConfigMap) {
        int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
        int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
        if (currentStationTaskCount > conveyorStationTaskLimit) {
            News.error("输送站点任务已达到上限,上限值:{},站点任务数:{}", conveyorStationTaskLimit, currentStationTaskCount);
            return false;
        }
        return true;
    }
    private boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
        int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
        while (true) {
            int reservedCount = inFlightGenerateCount.get();
            int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
            if (currentStationTaskCount + reservedCount >= conveyorStationTaskLimit) {
                News.error("输送站点任务已达到上限,上限值:{},站点任务数:{},生成中任务数:{}",
                        conveyorStationTaskLimit, currentStationTaskCount, reservedCount);
                return false;
            }
            if (inFlightGenerateCount.compareAndSet(reservedCount, reservedCount + 1)) {
                return true;
            }
        }
    }
    private void releaseGenerateCapacity() {
        inFlightGenerateCount.updateAndGet(current -> current > 0 ? current - 1 : 0);
    }
    private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) {
        int conveyorStationTaskLimit = 30;
        String conveyorStationTaskLimitStr = systemConfigMap.get("conveyorStationTaskLimit");
        if (conveyorStationTaskLimitStr != null) {
            conveyorStationTaskLimit = Integer.parseInt(conveyorStationTaskLimitStr);
        }
        return conveyorStationTaskLimit;
    }
}
src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -39,6 +39,7 @@
@Component
public class WmsOperateUtils {
    private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5;
    @Autowired
    private ConfigService configService;
@@ -114,7 +115,7 @@
                    .setUri(wmsUrl)
                    .setPath(wmsSystemInUrl)
                    .setJson(JSON.toJSONString(requestParam))
                    .setTimeout(30, TimeUnit.SECONDS)
                    .setTimeout(APPLY_IN_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)
                    .build()
                    .doPost();
            if (!Cools.isEmpty(response)) {