Junjie
11 小时以前 61a792a8ba5532132bef32721f055c619057ab28
src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
@@ -27,7 +27,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class StoreInTaskGenerationService {
@@ -46,7 +45,10 @@
    @Autowired
    private CommonService commonService;
    private final AtomicInteger inFlightGenerateCount = new AtomicInteger(0);
    /**
     * 保留当前按站点 lane 并发的能力,同时用一个简单计数避免并发生成把站点任务数顶穿上限。
     */
    private int inFlightGenerateCount = 0;
    public void generate(StoreInTaskPolicy policy) {
        try {
@@ -66,7 +68,7 @@
            for (BasDevp basDevp : basDevps) {
                List<StationObjModel> barcodeStations = policy.getBarcodeStations(basDevp);
                for (StationObjModel stationObjModel : barcodeStations) {
                    generate(policy, basDevp, stationObjModel, systemConfigMap);
                    generateByStation(policy, basDevp, stationObjModel, systemConfigMap);
                }
            }
        } catch (Exception e) {
@@ -88,37 +90,19 @@
                return;
            }
            generate(policy, basDevp, stationObjModel, systemConfigMap);
            generateByStation(policy, basDevp, stationObjModel, systemConfigMap);
        } catch (Exception e) {
            Integer stationId = stationObjModel == null ? null : stationObjModel.getStationId();
            News.error("生成入库任务异常,policy={},stationId={}", policy.getPolicyName(), stationId, e);
        }
    }
    private void generate(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
                          HashMap<String, String> systemConfigMap) {
        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
    private void generateByStation(StoreInTaskPolicy policy, BasDevp basDevp, StationObjModel stationObjModel,
                                   HashMap<String, String> systemConfigMap) {
        StoreInTaskContext context = buildContext(basDevp, stationObjModel);
        if (context == null) {
            return;
        }
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
        if (stationThread == null) {
            return;
        }
        Integer stationId = stationObjModel.getStationId();
        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
        if (!stationMap.containsKey(stationId)) {
            return;
        }
        StationProtocol stationProtocol = stationMap.get(stationId);
        if (stationProtocol == null) {
            return;
        }
        StoreInTaskContext context = new StoreInTaskContext(basDevp, stationThread, stationObjModel,
                stationProtocol);
        if (!policy.matchCandidate(context)) {
            return;
        }
@@ -126,15 +110,11 @@
            return;
        }
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>()
                .eq("barcode", stationProtocol.getBarcode()));
        if (!wrkMasts.isEmpty()) {
        if (hasCreatedTask(context.getStationProtocol().getBarcode())) {
            return;
        }
        String generateLockKey = policy.getGenerateLockKey(context);
        Object lock = redisUtil.get(generateLockKey);
        if (lock != null) {
        if (redisUtil.get(policy.getGenerateLockKey(context)) != null) {
            return;
        }
@@ -142,58 +122,65 @@
            return;
        }
        InTaskApplyRequest request = policy.buildApplyRequest(context);
        try {
            policy.onRequestPermitGranted(context);
            InTaskApplyRequest request = policy.buildApplyRequest(context);
            policy.setSystemWarning(context, "请求WMS中");
            News.info("发起同步WMS入库请求,barcode={},stationId={},timeout={}s",
                    request.getBarcode(), request.getSourceStaNo(), APPLY_IN_TASK_TIMEOUT_SECONDS);
            InTaskApplyResult result = applySyncInTask(request);
            handleApplyResult(policy, context, request, result);
            String response = wmsOperateUtils.applyInTask(request);
            handleSyncApplyResponse(policy, context, request, response);
        } finally {
            releaseGenerateCapacity();
        }
    }
    private void handleApplyResult(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                   InTaskApplyResult result) {
        if (result.isSuccess()) {
            handleApplySuccess(policy, context, request, result);
            return;
    private StoreInTaskContext buildContext(BasDevp basDevp, StationObjModel stationObjModel) {
        if (basDevp == null || stationObjModel == null || stationObjModel.getStationId() == null) {
            return null;
        }
        if (result.isRetryableFailure()) {
            News.error("WMS入库请求失败,barcode={},stationId={},response={}",
                    request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result));
            redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
            policy.onApplyFailed(context, result);
            return;
        StationThread stationThread = (StationThread) SlaveConnection.get(SlaveType.Devp, basDevp.getDevpNo());
        if (stationThread == null) {
            return null;
        }
        policy.onApplyFailed(context, result);
        Integer stationId = stationObjModel.getStationId();
        Map<Integer, StationProtocol> stationMap = stationThread.getStatusMap();
        if (stationMap == null || !stationMap.containsKey(stationId)) {
            return null;
        }
        StationProtocol stationProtocol = stationMap.get(stationId);
        if (stationProtocol == null) {
            return null;
        }
        return new StoreInTaskContext(basDevp, stationThread, stationObjModel, stationProtocol);
    }
    private void handleApplySuccess(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                    InTaskApplyResult result) {
    private boolean hasCreatedTask(String barcode) {
        List<WrkMast> wrkMasts = wrkMastService.list(new QueryWrapper<WrkMast>().eq("barcode", barcode));
        return !wrkMasts.isEmpty();
    }
    private void handleSyncApplyResponse(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                         String response) {
        if (Cools.isEmpty(response)) {
            markApplyFailed(policy, context, request, null, "FAILED");
            return;
        }
        try {
            JSONObject jsonObject = JSON.parseObject(result.getResponse());
            JSONObject jsonObject = JSON.parseObject(response);
            if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                InTaskApplyResult failResult = new InTaskApplyResult();
                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
                failResult.setResponse(result.getResponse());
                failResult.setMessage("WMS返回非200");
                handleApplyResult(policy, context, request, failResult);
                markApplyFailed(policy, context, request, response, "WMS返回非200");
                return;
            }
            StartupDto dto = jsonObject.getObject("data", StartupDto.class);
            if (dto == null) {
                InTaskApplyResult failResult = new InTaskApplyResult();
                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
                failResult.setResponse(result.getResponse());
                failResult.setMessage("WMS返回data为空");
                handleApplyResult(policy, context, request, failResult);
                markApplyFailed(policy, context, request, response, "WMS返回data为空");
                return;
            }
@@ -202,35 +189,23 @@
            policy.afterTaskCreated(context, wrkMast);
            policy.clearSystemWarning(context);
        } catch (Exception e) {
            News.error("处理WMS入库成功响应失败,barcode={},stationId={}", request.getBarcode(),
            News.error("处理WMS入库响应异常,barcode={},stationId={}", request.getBarcode(),
                    request.getSourceStaNo(), e);
            markApplyFailed(policy, context, request, response, e.getMessage());
        }
    }
    private InTaskApplyResult applySyncInTask(InTaskApplyRequest request) {
    private void markApplyFailed(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                 String response, String message) {
        InTaskApplyResult result = new InTaskApplyResult();
        result.setBizKey(request.getBizKey());
        String response = wmsOperateUtils.applyInTask(request);
        result.setResponse(response);
        if (Cools.isEmpty(response)) {
            result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
            result.setMessage("FAILED");
            return result;
        }
        try {
            JSONObject jsonObject = JSON.parseObject(response);
            if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                result.setStatus(InTaskApplyStatus.SUCCESS);
                return result;
            }
        } catch (Exception ignored) {
        }
        result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
        result.setMessage(response);
        return result;
        result.setResponse(response);
        result.setMessage(message);
        News.error("WMS入库请求失败,barcode={},stationId={},response={}",
                request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result));
        redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
        policy.onApplyFailed(context, result);
    }
    private HashMap<String, String> getSystemConfigMap() {
@@ -251,24 +226,22 @@
        return true;
    }
    private boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
    private synchronized boolean tryReserveGenerateCapacity(HashMap<String, String> systemConfigMap) {
        int conveyorStationTaskLimit = getConveyorStationTaskLimit(systemConfigMap);
        while (true) {
            int reservedCount = inFlightGenerateCount.get();
            int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
            if (currentStationTaskCount + reservedCount >= conveyorStationTaskLimit) {
                News.error("输送站点任务已达到上限,上限值:{},站点任务数:{},生成中任务数:{}",
                        conveyorStationTaskLimit, currentStationTaskCount, reservedCount);
                return false;
            }
            if (inFlightGenerateCount.compareAndSet(reservedCount, reservedCount + 1)) {
                return true;
            }
        int currentStationTaskCount = stationOperateProcessUtils.getCurrentStationTaskCount();
        if (currentStationTaskCount + inFlightGenerateCount >= conveyorStationTaskLimit) {
            News.error("输送站点任务已达到上限,上限值:{},站点任务数:{},生成中任务数:{}",
                    conveyorStationTaskLimit, currentStationTaskCount, inFlightGenerateCount);
            return false;
        }
        inFlightGenerateCount++;
        return true;
    }
    private void releaseGenerateCapacity() {
        inFlightGenerateCount.updateAndGet(current -> current > 0 ? current - 1 : 0);
    private synchronized void releaseGenerateCapacity() {
        if (inFlightGenerateCount > 0) {
            inFlightGenerateCount--;
        }
    }
    private int getConveyorStationTaskLimit(HashMap<String, String> systemConfigMap) {