Junjie
23 小时以前 641bf75f1b6684ee5b6d13497ad1106b82c59043
#入库同步串行请求
1个文件已删除
1个文件已添加
1 文件已重命名
6个文件已修改
392 ■■■■ 已修改文件
src/main/java/com/zy/core/enums/RedisKeyType.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/GslProcess.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/store/AsyncInTaskResult.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/store/InTaskApplyResult.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/store/InTaskApplyStatus.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/plugin/store/StoreInTaskPolicy.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/utils/WmsOperateUtils.java 259 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/test/java/com/zy/core/plugin/store/StoreInTaskPolicyTest.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zy/core/enums/RedisKeyType.java
@@ -68,8 +68,6 @@
    STATION_TASK_TRACE_REGISTRY("station_task_trace_registry"),
    CURRENT_CIRCLE_TASK_CRN_NO("current_circle_task_crn_no_"),
    ASYNC_WMS_IN_TASK_REQUEST("async_wms_in_task_request_"),
    ASYNC_WMS_IN_TASK_RESPONSE("async_wms_in_task_response_"),
    MAIN_PROCESS_PSEUDOCODE("main_process_pseudocode"),
    PLANNER_SCHEDULE("planner_schedule_"),
    ;
src/main/java/com/zy/core/plugin/GslProcess.java
@@ -38,6 +38,7 @@
public class GslProcess implements MainProcessPluginApi, StoreInTaskPolicy {
    private static final String CRN_TASK_LANE = "crn";
    private static final String STATION_TASK_LANE = "station";
    private static final String GENERATE_STORE_TASK_LANE = "generate-store";
    private static final long DISPATCH_INTERVAL_MS = 200L;
    private static final long MAINTENANCE_INTERVAL_MS = 500L;
    private static final long TASK_SLOW_LOG_THRESHOLD_MS = 1000L;
@@ -64,7 +65,7 @@
        //检测入库站是否有任务生成,并启动入库
        checkInStationHasTask();
        //请求生成入库任务
        generateStoreWrkFile();
        submitGenerateStoreTask("generateStoreWrkFile", DISPATCH_INTERVAL_MS, this::generateStoreWrkFile);
        //堆垛机与输送站点都按单个任务提交到各自串行通道,逐个执行
        submitCrnTask("crnIoExecute", DISPATCH_INTERVAL_MS, crnOperateUtils::crnIoExecute);
@@ -176,6 +177,10 @@
        submitProcessTask(STATION_TASK_LANE, taskName, minIntervalMs, task);
    }
    private void submitGenerateStoreTask(String taskName, long minIntervalMs, Runnable task) {
        submitProcessTask(GENERATE_STORE_TASK_LANE, taskName, minIntervalMs, task);
    }
    private void submitCrnTask(String taskName, long minIntervalMs, Runnable task) {
        submitProcessTask(CRN_TASK_LANE, taskName, minIntervalMs, task);
    }
src/main/java/com/zy/core/plugin/store/AsyncInTaskResult.java
File was deleted
src/main/java/com/zy/core/plugin/store/InTaskApplyResult.java
New file
@@ -0,0 +1,24 @@
package com.zy.core.plugin.store;
import lombok.Data;
@Data
public class InTaskApplyResult {
    private String bizKey;
    private InTaskApplyStatus status;
    private String response;
    private String message;
    public boolean isSuccess() {
        return InTaskApplyStatus.SUCCESS.equals(status);
    }
    public boolean isRetryableFailure() {
        return InTaskApplyStatus.RETRYABLE_FAIL.equals(status);
    }
}
src/main/java/com/zy/core/plugin/store/InTaskApplyStatus.java
File was renamed from src/main/java/com/zy/core/plugin/store/AsyncInTaskStatus.java
@@ -1,6 +1,6 @@
package com.zy.core.plugin.store;
public enum AsyncInTaskStatus {
public enum InTaskApplyStatus {
    SUCCESS,
    RETRYABLE_FAIL,
src/main/java/com/zy/core/plugin/store/StoreInTaskGenerationService.java
@@ -111,21 +111,10 @@
                    policy.onRequestPermitGranted(context);
                    InTaskApplyRequest request = policy.buildApplyRequest(context);
                    AsyncInTaskResult result = wmsOperateUtils.queryAsyncInTaskResponse(request);
                    if (result != null) {
                        handleApplyResult(policy, context, request, result);
                        continue;
                    }
                    if (wmsOperateUtils.isAsyncRequestInProgress(request)) {
                        continue;
                    }
                    News.info("发起异步WMS入库请求,barcode={},stationId={}", request.getBarcode(),
                    News.info("发起同步WMS入库请求,barcode={},stationId={}", request.getBarcode(),
                            request.getSourceStaNo());
                    wmsOperateUtils.applyInTaskAsync(request);
                    redisUtil.set(generateLockKey, "lock", policy.getSubmitLockSeconds(context));
//                    policy.onApplySubmitted(context);
                    InTaskApplyResult result = applySyncInTask(request);
                    handleApplyResult(policy, context, request, result);
                }
            }
        } catch (Exception e) {
@@ -134,17 +123,15 @@
    }
    private void handleApplyResult(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                   AsyncInTaskResult result) {
                                   InTaskApplyResult result) {
        if (result.isSuccess()) {
            handleApplySuccess(policy, context, request, result);
            return;
        }
        if (result.isRetryableFailure()) {
            News.error("WMS入库请求失败,重新发起请求,barcode={},stationId={},response={}",
            News.error("WMS入库请求失败,barcode={},stationId={},response={}",
                    request.getBarcode(), request.getSourceStaNo(), policy.buildFailureMessage(result));
            wmsOperateUtils.clearAsyncInTaskResponse(request);
            wmsOperateUtils.applyInTaskAsync(request);
            redisUtil.set(policy.getGenerateLockKey(context), "lock", policy.getRetryLockSeconds(context));
            policy.onApplyFailed(context, result);
            return;
@@ -154,12 +141,12 @@
    }
    private void handleApplySuccess(StoreInTaskPolicy policy, StoreInTaskContext context, InTaskApplyRequest request,
                                    AsyncInTaskResult result) {
                                    InTaskApplyResult result) {
        try {
            JSONObject jsonObject = JSON.parseObject(result.getResponse());
            if (jsonObject == null || !Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                AsyncInTaskResult failResult = new AsyncInTaskResult();
                failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
                InTaskApplyResult failResult = new InTaskApplyResult();
                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
                failResult.setResponse(result.getResponse());
                failResult.setMessage("WMS返回非200");
                handleApplyResult(policy, context, request, failResult);
@@ -168,8 +155,8 @@
            StartupDto dto = jsonObject.getObject("data", StartupDto.class);
            if (dto == null) {
                AsyncInTaskResult failResult = new AsyncInTaskResult();
                failResult.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
                InTaskApplyResult failResult = new InTaskApplyResult();
                failResult.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
                failResult.setResponse(result.getResponse());
                failResult.setMessage("WMS返回data为空");
                handleApplyResult(policy, context, request, failResult);
@@ -180,11 +167,36 @@
            WrkMast wrkMast = commonService.createInTask(taskParam);
            policy.afterTaskCreated(context, wrkMast);
            context.getStationProtocol().setSystemWarning("");
            wmsOperateUtils.clearAsyncInTaskResponse(request);
        } catch (Exception e) {
            News.error("处理WMS入库成功响应失败,barcode={},stationId={}", request.getBarcode(),
                    request.getSourceStaNo(), e);
        }
    }
    private InTaskApplyResult applySyncInTask(InTaskApplyRequest request) {
        InTaskApplyResult result = new InTaskApplyResult();
        result.setBizKey(request.getBizKey());
        String response = wmsOperateUtils.applyInTask(request);
        result.setResponse(response);
        if (Cools.isEmpty(response)) {
            result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
            result.setMessage("FAILED");
            return result;
        }
        try {
            JSONObject jsonObject = JSON.parseObject(response);
            if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                result.setStatus(InTaskApplyStatus.SUCCESS);
                return result;
            }
        } catch (Exception ignored) {
        }
        result.setStatus(InTaskApplyStatus.RETRYABLE_FAIL);
        result.setMessage(response);
        return result;
    }
}
src/main/java/com/zy/core/plugin/store/StoreInTaskPolicy.java
@@ -38,10 +38,6 @@
        return RedisKeyType.GENERATE_IN_TASK_LIMIT.key + context.getStationObjModel().getStationId();
    }
    default int getSubmitLockSeconds(StoreInTaskContext context) {
        return 2;
    }
    default int getRetryLockSeconds(StoreInTaskContext context) {
        return 2;
    }
@@ -67,17 +63,13 @@
    default void afterTaskCreated(StoreInTaskContext context, WrkMast wrkMast) {
    }
    default void onApplySubmitted(StoreInTaskContext context) {
        context.getStationProtocol().setSystemWarning("请求入库中");
    }
    default void onApplyFailed(StoreInTaskContext context, AsyncInTaskResult result) {
    default void onApplyFailed(StoreInTaskContext context, InTaskApplyResult result) {
        String warning = "请求入库失败,WMS返回=" + buildFailureMessage(result);
        context.getStationProtocol().setSystemWarning(warning);
        syncWarningToBackStation(context, warning);
    }
    default String buildFailureMessage(AsyncInTaskResult result) {
    default String buildFailureMessage(InTaskApplyResult result) {
        if (!Cools.isEmpty(result.getResponse())) {
            return result.getResponse();
        }
src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -23,8 +23,6 @@
import com.zy.core.News;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
import com.zy.core.plugin.store.AsyncInTaskResult;
import com.zy.core.plugin.store.AsyncInTaskStatus;
import com.zy.core.plugin.store.InTaskApplyRequest;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -37,36 +35,10 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class WmsOperateUtils {
    private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2,
            Math.min(4, Runtime.getRuntime().availableProcessors()));
    private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200;
    private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60;
    private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60;
    private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1);
    private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor(
            APPLY_IN_TASK_ASYNC_THREADS,
            APPLY_IN_TASK_ASYNC_THREADS,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY),
            runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
    );
    @Autowired
    private ConfigService configService;
@@ -84,36 +56,6 @@
    private BasStationService basStationService;
    @Autowired
    private RedisUtil redisUtil;
    private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
    private String buildAsyncInTaskBizKey(InTaskApplyRequest request) {
        if (!Cools.isEmpty(request.getBizKey())) {
            return request.getBizKey();
        }
        StringBuilder keyBuilder = new StringBuilder(request.getBarcode())
                .append("_")
                .append(request.getSourceStaNo());
        if (request.getTaskNo() != null && request.getTaskNo() > 0) {
            keyBuilder.append("_").append(request.getTaskNo());
        }
        return keyBuilder.toString();
    }
    private String buildAsyncInTaskKey(String prefix, InTaskApplyRequest request) {
        return prefix + buildAsyncInTaskBizKey(request);
    }
    private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) {
        StringBuilder keyBuilder = new StringBuilder(prefix)
                .append(barcode)
                .append("_")
                .append(stationId);
        if (taskNo != null && taskNo > 0) {
            keyBuilder.append("_").append(taskNo);
        }
        return keyBuilder.toString();
    }
    // 申请入库任务
    public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
@@ -202,207 +144,6 @@
            httpRequestLogService.save(httpRequestLog);
        }
        return response;
    }
    /**
     * 异步申请入库任务 - 非阻塞版本
     * 将请求提交到线程池异步执行,结果存储到Redis中
     *
     * @param barcode     托盘码
     * @param sourceStaNo 站点编号
     * @param locType1    托盘高度
     */
    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) {
        applyInTaskAsync(barcode, sourceStaNo, null, locType1);
    }
    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) {
        InTaskApplyRequest request = new InTaskApplyRequest();
        request.setBarcode(barcode);
        request.setSourceStaNo(sourceStaNo);
        request.setTaskNo(taskNo);
        request.setLocType1(locType1);
        applyInTaskAsync(request);
    }
    public void applyInTaskAsync(InTaskApplyRequest request) {
        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request);
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
        if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
            return;
        }
        // 检查是否已有请求在进行中
        Object existingRequest = redisUtil.get(requestKey);
        if (existingRequest != null) {
            asyncInTaskInflight.remove(requestKey);
            return; // 已有请求在进行中,跳过
        }
        // 标记请求进行中,避免请求在线程池排队时被重复提交
        redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS);
        try {
            APPLY_IN_TASK_EXECUTOR.execute(() -> {
                try {
                    String response = applyInTask(request);
                    AsyncInTaskResult result = buildAsyncInTaskResult(request, response, null);
                    redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
                    News.info("异步WMS入库请求完成,barcode={},stationId={},taskNo={},status={},response={}",
                            request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(),
                            result.getStatus(), response);
                } catch (Exception e) {
                    News.error("异步WMS入库请求异常,barcode={},stationId={},taskNo={},error={}",
                            request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(), e.getMessage());
                    AsyncInTaskResult result = buildAsyncInTaskResult(request, null, e);
                    redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
                } finally {
                    asyncInTaskInflight.remove(requestKey);
                    redisUtil.del(requestKey);
                }
            });
        } catch (RejectedExecutionException e) {
            asyncInTaskInflight.remove(requestKey);
            redisUtil.del(requestKey);
            AsyncInTaskResult result = new AsyncInTaskResult();
            result.setBizKey(buildAsyncInTaskBizKey(request));
            result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
            result.setMessage("ERROR:ASYNC_QUEUE_FULL");
            redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
            News.error("异步WMS入库请求被拒绝,线程池已满,barcode={},stationId={},taskNo={}",
                    request.getBarcode(), request.getSourceStaNo(), request.getTaskNo());
        }
    }
    /**
     * 查询异步入库任务请求结果
     *
     * @param barcode   托盘码
     * @param stationId 站点编号
     * @return 响应结果,null表示还未完成或未找到
     */
    public String queryAsyncInTaskResponse(String barcode, Integer stationId) {
        return queryAsyncInTaskResponse(barcode, stationId, null);
    }
    public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) {
        InTaskApplyRequest request = new InTaskApplyRequest();
        request.setBarcode(barcode);
        request.setSourceStaNo(stationId);
        request.setTaskNo(taskNo);
        AsyncInTaskResult result = queryAsyncInTaskResponse(request);
        if (result == null) {
            return null;
        }
        clearAsyncInTaskResponse(request);
        if (result.isSuccess()) {
            return result.getResponse();
        }
        if (!Cools.isEmpty(result.getMessage())) {
            return result.getMessage();
        }
        return result.getResponse();
    }
    public AsyncInTaskResult queryAsyncInTaskResponse(InTaskApplyRequest request) {
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
        Object response = redisUtil.get(responseKey);
        if (response == null) {
            return null;
        }
        return parseAsyncInTaskResult(buildAsyncInTaskBizKey(request), response.toString());
    }
    public void clearAsyncInTaskResponse(InTaskApplyRequest request) {
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
        redisUtil.del(responseKey);
    }
    private AsyncInTaskResult parseAsyncInTaskResult(String bizKey, String responseValue) {
        AsyncInTaskResult result = null;
        try {
            JSONObject jsonObject = JSON.parseObject(responseValue);
            if (jsonObject != null && jsonObject.containsKey("status")) {
                result = jsonObject.toJavaObject(AsyncInTaskResult.class);
            } else if (jsonObject != null && jsonObject.containsKey("code")) {
                result = new AsyncInTaskResult();
                result.setBizKey(bizKey);
                result.setResponse(responseValue);
                if (Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                    result.setStatus(AsyncInTaskStatus.SUCCESS);
                } else {
                    result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
                    result.setMessage(responseValue);
                }
            }
        } catch (Exception ignored) {
        }
        if (result != null) {
            return result;
        }
        result = new AsyncInTaskResult();
        result.setBizKey(bizKey);
        result.setResponse(responseValue);
        result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
        result.setMessage(responseValue);
        return result;
    }
    /**
     * 检查是否有异步请求正在进行中
     *
     * @param barcode   托盘码
     * @param stationId 站点编号
     * @return true表示正在请求中
     */
    public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
        return isAsyncRequestInProgress(barcode, stationId, null);
    }
    public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) {
        InTaskApplyRequest request = new InTaskApplyRequest();
        request.setBarcode(barcode);
        request.setSourceStaNo(stationId);
        request.setTaskNo(taskNo);
        return isAsyncRequestInProgress(request);
    }
    public boolean isAsyncRequestInProgress(InTaskApplyRequest request) {
        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request);
        return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
    }
    private AsyncInTaskResult buildAsyncInTaskResult(InTaskApplyRequest request, String response, Exception exception) {
        AsyncInTaskResult result = new AsyncInTaskResult();
        result.setBizKey(buildAsyncInTaskBizKey(request));
        result.setResponse(response);
        if (exception != null) {
            result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
            result.setMessage("ERROR:" + exception.getMessage());
            return result;
        }
        if (response == null) {
            result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
            result.setMessage("FAILED");
            return result;
        }
        try {
            JSONObject jsonObject = JSON.parseObject(response);
            if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                result.setStatus(AsyncInTaskStatus.SUCCESS);
                return result;
            }
        } catch (Exception ignored) {
        }
        result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
        result.setMessage(response);
        return result;
    }
    // 申请任务重新分配库位
src/test/java/com/zy/core/plugin/store/StoreInTaskPolicyTest.java
@@ -35,7 +35,7 @@
        barcodeStationModel.setBackStation(backStationModel);
        StoreInTaskContext context = new StoreInTaskContext(new BasDevp(), stationThread, barcodeStationModel, barcodeStation);
        AsyncInTaskResult result = new AsyncInTaskResult();
        InTaskApplyResult result = new InTaskApplyResult();
        result.setMessage("WMS异常");
        policy.onApplyFailed(context, result);