#
Junjie
6 小时以前 14cc8925be94a6c07e8e48278afc8f2d4aa284f1
#
5个文件已修改
87 ■■■■■ 已修改文件
src/main/java/com/zy/core/plugin/FakeProcess.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/GslProcess.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/NormalProcess.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/XiaosongProcess.java 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/WmsOperateUtils.java 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/FakeProcess.java
@@ -492,9 +492,10 @@
                        String barcode = stationProtocol.getBarcode();
                        Integer stationIdVal = stationProtocol.getStationId();
                        Integer taskNo = stationProtocol.getTaskNo();
                        // 1. 首先查询是否有已完成的异步响应
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal);
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal, taskNo);
                        if (!Cools.isEmpty(response)) {
                            // 2. 有响应结果,处理响应
@@ -502,7 +503,7 @@
                                // 请求失败,重新发起异步请求
                                News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
@@ -535,7 +536,7 @@
                                // 接口返回非200,重新发起请求
                                News.error("WMS入库接口返回非200,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
@@ -543,10 +544,10 @@
                            }
                        } else {
                            // 3. 没有响应结果,检查是否有请求正在进行中
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal)) {
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal, taskNo)) {
                                // 没有请求进行中,发起新的异步请求
                                News.info("发起异步WMS入库请求,barcode={},stationId={}", barcode, stationIdVal);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
src/main/java/com/zy/core/plugin/GslProcess.java
@@ -167,9 +167,10 @@
                        String barcode = stationProtocol.getBarcode();
                        Integer stationIdVal = stationProtocol.getStationId();
                        Integer taskNo = stationProtocol.getTaskNo();
                        // 1. 首先查询是否有已完成的异步响应
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal);
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal, taskNo);
                        if (!Cools.isEmpty(response)) {
                            // 2. 有响应结果,处理响应
@@ -177,7 +178,7 @@
                                // 请求失败,重新发起异步请求
                                News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS返回=" + response);
@@ -200,17 +201,17 @@
                                // 接口返回非200,重新发起请求
                                News.error("WMS入库接口返回非200,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS返回=" + response);
                            }
                        } else {
                            // 3. 没有响应结果,检查是否有请求正在进行中
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal)) {
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal, taskNo)) {
                                // 没有请求进行中,发起新的异步请求
                                News.info("发起异步WMS入库请求,barcode={},stationId={}", barcode, stationIdVal);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS无返回");
src/main/java/com/zy/core/plugin/NormalProcess.java
@@ -145,9 +145,10 @@
                        String barcode = stationProtocol.getBarcode();
                        Integer stationIdVal = stationProtocol.getStationId();
                        Integer taskNo = stationProtocol.getTaskNo();
                        // 1. 首先查询是否有已完成的异步响应
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal);
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal, taskNo);
                        if (!Cools.isEmpty(response)) {
                            // 2. 有响应结果,处理响应
@@ -155,7 +156,7 @@
                                // 请求失败,重新发起异步请求
                                News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS返回=" + response);
@@ -178,17 +179,17 @@
                                // 接口返回非200,重新发起请求
                                News.error("WMS入库接口返回非200,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS返回=" + response);
                            }
                        } else {
                            // 3. 没有响应结果,检查是否有请求正在进行中
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal)) {
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal, taskNo)) {
                                // 没有请求进行中,发起新的异步请求
                                News.info("发起异步WMS入库请求,barcode={},stationId={}", barcode, stationIdVal);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS无返回");
src/main/java/com/zy/core/plugin/XiaosongProcess.java
@@ -172,9 +172,10 @@
                        String barcode = stationProtocol.getBarcode();
                        Integer stationIdVal = stationProtocol.getStationId();
                        Integer taskNo = stationProtocol.getTaskNo();
                        // 1. 首先查询是否有已完成的异步响应
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal);
                        String response = wmsOperateUtils.queryAsyncInTaskResponse(barcode, stationIdVal, taskNo);
                        if (!Cools.isEmpty(response)) {
                            // 2. 有响应结果,处理响应
@@ -182,7 +183,7 @@
                                // 请求失败,重新发起异步请求
                                News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS返回=" + response);
@@ -205,17 +206,17 @@
                                // 接口返回非200,重新发起请求
                                News.error("WMS入库接口返回非200,重新发起请求,barcode={},stationId={},response={}", barcode,
                                        stationIdVal, response);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS返回=" + response);
                            }
                        } else {
                            // 3. 没有响应结果,检查是否有请求正在进行中
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal)) {
                            if (!wmsOperateUtils.isAsyncRequestInProgress(barcode, stationIdVal, taskNo)) {
                                // 没有请求进行中,发起新的异步请求
                                News.info("发起异步WMS入库请求,barcode={},stationId={}", barcode, stationIdVal);
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal,
                                wmsOperateUtils.applyInTaskAsync(barcode, stationIdVal, taskNo,
                                        stationProtocol.getPalletHeight());
                                redisUtil.set(RedisKeyType.GENERATE_IN_TASK_LIMIT.key + stationId, "lock", 2);
                                stationProtocol.setSystemWarning("请求入库失败,WMS无返回");
src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -82,6 +82,17 @@
    private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
    private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) {
        StringBuilder keyBuilder = new StringBuilder(prefix)
                .append(barcode)
                .append("_")
                .append(stationId);
        if (taskNo != null && taskNo > 0) {
            keyBuilder.append("_").append(taskNo);
        }
        return keyBuilder.toString();
    }
    // 申请入库任务
    public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
        Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
@@ -169,8 +180,12 @@
     * @param locType1    托盘高度
     */
    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) {
        String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + sourceStaNo;
        String responseKey = RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key + barcode + "_" + sourceStaNo;
        applyInTaskAsync(barcode, sourceStaNo, null, locType1);
    }
    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) {
        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, barcode, sourceStaNo, taskNo);
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, barcode, sourceStaNo, taskNo);
        if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
            return;
@@ -192,13 +207,16 @@
                    String response = applyInTask(barcode, sourceStaNo, locType1);
                    if (response != null) {
                        redisUtil.set(responseKey, response, APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
                        News.info("异步WMS入库请求完成,barcode={},stationId={},response={}", barcode, sourceStaNo, response);
                        News.info("异步WMS入库请求完成,barcode={},stationId={},taskNo={},response={}",
                                barcode, sourceStaNo, taskNo, response);
                    } else {
                        redisUtil.set(responseKey, "FAILED", 10);
                        News.error("异步WMS入库请求失败,barcode={},stationId={}", barcode, sourceStaNo);
                        News.error("异步WMS入库请求失败,barcode={},stationId={},taskNo={}",
                                barcode, sourceStaNo, taskNo);
                    }
                } catch (Exception e) {
                    News.error("异步WMS入库请求异常,barcode={},stationId={},error={}", barcode, sourceStaNo, e.getMessage());
                    News.error("异步WMS入库请求异常,barcode={},stationId={},taskNo={},error={}",
                            barcode, sourceStaNo, taskNo, e.getMessage());
                    redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
                } finally {
                    asyncInTaskInflight.remove(requestKey);
@@ -209,7 +227,8 @@
            asyncInTaskInflight.remove(requestKey);
            redisUtil.del(requestKey);
            redisUtil.set(responseKey, "ERROR:ASYNC_QUEUE_FULL", 10);
            News.error("异步WMS入库请求被拒绝,线程池已满,barcode={},stationId={}", barcode, sourceStaNo);
            News.error("异步WMS入库请求被拒绝,线程池已满,barcode={},stationId={},taskNo={}",
                    barcode, sourceStaNo, taskNo);
        }
    }
@@ -221,7 +240,11 @@
     * @return 响应结果,null表示还未完成或未找到
     */
    public String queryAsyncInTaskResponse(String barcode, Integer stationId) {
        String responseKey = RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key + barcode + "_" + stationId;
        return queryAsyncInTaskResponse(barcode, stationId, null);
    }
    public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) {
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, barcode, stationId, taskNo);
        Object response = redisUtil.get(responseKey);
        if (response != null) {
            // 获取后删除,避免重复处理
@@ -239,7 +262,11 @@
     * @return true表示正在请求中
     */
    public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
        String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + stationId;
        return isAsyncRequestInProgress(barcode, stationId, null);
    }
    public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) {
        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, barcode, stationId, taskNo);
        return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
    }