Junjie
昨天 fa37288ba72c61578163950eca8d7e72a5e151a3
src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -23,8 +23,6 @@
import com.zy.core.News;
import com.zy.core.enums.RedisKeyType;
import com.zy.core.enums.SlaveType;
import com.zy.core.plugin.store.AsyncInTaskResult;
import com.zy.core.plugin.store.AsyncInTaskStatus;
import com.zy.core.plugin.store.InTaskApplyRequest;
import com.zy.system.entity.Config;
import com.zy.system.service.ConfigService;
@@ -37,36 +35,11 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class WmsOperateUtils {
    private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2,
            Math.min(4, Runtime.getRuntime().availableProcessors()));
    private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200;
    private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60;
    private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60;
    private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1);
    private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor(
            APPLY_IN_TASK_ASYNC_THREADS,
            APPLY_IN_TASK_ASYNC_THREADS,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY),
            runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
    );
    private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5;
    @Autowired
    private ConfigService configService;
@@ -85,36 +58,6 @@
    @Autowired
    private RedisUtil redisUtil;
    private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
    private String buildAsyncInTaskBizKey(InTaskApplyRequest request) {
        if (!Cools.isEmpty(request.getBizKey())) {
            return request.getBizKey();
        }
        StringBuilder keyBuilder = new StringBuilder(request.getBarcode())
                .append("_")
                .append(request.getSourceStaNo());
        if (request.getTaskNo() != null && request.getTaskNo() > 0) {
            keyBuilder.append("_").append(request.getTaskNo());
        }
        return keyBuilder.toString();
    }
    private String buildAsyncInTaskKey(String prefix, InTaskApplyRequest request) {
        return prefix + buildAsyncInTaskBizKey(request);
    }
    private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) {
        StringBuilder keyBuilder = new StringBuilder(prefix)
                .append(barcode)
                .append("_")
                .append(stationId);
        if (taskNo != null && taskNo > 0) {
            keyBuilder.append("_").append(taskNo);
        }
        return keyBuilder.toString();
    }
    // 申请入库任务
    public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
        InTaskApplyRequest request = new InTaskApplyRequest();
@@ -125,7 +68,12 @@
    }
    public String applyInTask(InTaskApplyRequest request) {
        long startMs = System.currentTimeMillis();
        Map<String, Long> stepCostMap = new LinkedHashMap<>();
        long redisGetStartNs = System.nanoTime();
        Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
        addStepCost(stepCostMap, "redisUtil.get", elapsedMsFromNano(redisGetStartNs));
        if (systemConfigMapObj == null) {
            News.error("系统Config缓存失效");
            return null;
@@ -148,8 +96,10 @@
        String response = null;
        int result = 0;
        try {
            long stationQueryStartNs = System.nanoTime();
            BasStation basStation = basStationService
                    .getOne(new QueryWrapper<BasStation>().eq("station_id", request.getSourceStaNo()));
            addStepCost(stepCostMap, "basStationService.getOne", elapsedMsFromNano(stationQueryStartNs));
            if (basStation == null) {
                News.error("站点{}不存在", request.getSourceStaNo());
                return null;
@@ -163,35 +113,41 @@
            requestParam.put("barcode", request.getBarcode());
            requestParam.put("sourceStaNo", stationNo);
            requestParam.put("locType1", request.getLocType1() == null ? 1 : request.getLocType1());
            requestParam.put("row", Utils.getInTaskEnableRow(request.getSourceStaNo()));
            long getRowStartNs = System.nanoTime();
            requestParam.put("row", Utils.getInTaskEnableRowWithCache(request.getSourceStaNo()));
            addStepCost(stepCostMap, "Utils.getInTaskEnableRowWithCache", elapsedMsFromNano(getRowStartNs));
            if (request.getExtraParams() != null && !request.getExtraParams().isEmpty()) {
                requestParam.putAll(request.getExtraParams());
            }
            long httpPostStartNs = System.nanoTime();
            response = new HttpHandler.Builder()
                    .setUri(wmsUrl)
                    .setPath(wmsSystemInUrl)
                    .setJson(JSON.toJSONString(requestParam))
                    .setTimeout(30, TimeUnit.SECONDS)
                    .setTimeout(APPLY_IN_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)
                    .build()
                    .doPost();
            addStepCost(stepCostMap, "HttpHandler.doPost", elapsedMsFromNano(httpPostStartNs));
            if (!Cools.isEmpty(response)) {
                long parseResponseStartNs = System.nanoTime();
                JSONObject jsonObject = JSON.parseObject(response);
                addStepCost(stepCostMap, "JSON.parseObject", elapsedMsFromNano(parseResponseStartNs));
                if (jsonObject.getInteger("code") == 200) {
                    result = 1;
                    News.info("请求WMS入库接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl,
                            JSON.toJSONString(requestParam), response);
                    News.info("请求WMS入库接口成功!!!url:{};request:{};response:{};cost={}ms",
                            wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response, elapsedMs(startMs));
                } else {
                    News.info("请求WMS入库接口失败,接口返回Code异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl,
                            JSON.toJSONString(requestParam), response);
                    News.info("请求WMS入库接口失败,接口返回Code异常!!!url:{};request:{};response:{};cost={}ms",
                            wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response, elapsedMs(startMs));
                }
            } else {
                News.info("请求WMS入库接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl,
                        JSON.toJSONString(requestParam), response);
                News.info("请求WMS入库接口失败,接口未响应!!!url:{};request:{};response:{};cost={}ms",
                        wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response, elapsedMs(startMs));
            }
        } catch (Exception e) {
            News.error("请求WMS入库接口异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl,
                    JSON.toJSONString(requestParam), response, e);
            News.error("请求WMS入库接口异常!!!url:{};request:{};response:{};cost={}ms", wmsUrl + wmsSystemInUrl,
                    JSON.toJSONString(requestParam), response, elapsedMs(startMs), e);
        } finally {
            HttpRequestLog httpRequestLog = new HttpRequestLog();
            httpRequestLog.setName(wmsUrl + wmsSystemInUrl);
@@ -199,210 +155,17 @@
            httpRequestLog.setResponse(response);
            httpRequestLog.setCreateTime(new Date());
            httpRequestLog.setResult(result);
            httpRequestLogService.save(httpRequestLog);
            long saveRequestLogStartNs = System.nanoTime();
            try {
                httpRequestLogService.save(httpRequestLog);
            } finally {
                addStepCost(stepCostMap, "httpRequestLogService.save", elapsedMsFromNano(saveRequestLogStartNs));
                News.info("WMS入库申请耗时统计:url:{};barcode:{};sourceStaNo:{};stepCosts:{};slowestStep:{};totalCost={}ms",
                        wmsUrl + wmsSystemInUrl, request.getBarcode(), request.getSourceStaNo(),
                        formatStepCosts(stepCostMap), getSlowestStep(stepCostMap), elapsedMs(startMs));
            }
        }
        return response;
    }
    /**
     * 异步申请入库任务 - 非阻塞版本
     * 将请求提交到线程池异步执行,结果存储到Redis中
     *
     * @param barcode     托盘码
     * @param sourceStaNo 站点编号
     * @param locType1    托盘高度
     */
    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) {
        applyInTaskAsync(barcode, sourceStaNo, null, locType1);
    }
    public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) {
        InTaskApplyRequest request = new InTaskApplyRequest();
        request.setBarcode(barcode);
        request.setSourceStaNo(sourceStaNo);
        request.setTaskNo(taskNo);
        request.setLocType1(locType1);
        applyInTaskAsync(request);
    }
    public void applyInTaskAsync(InTaskApplyRequest request) {
        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request);
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
        if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
            return;
        }
        // 检查是否已有请求在进行中
        Object existingRequest = redisUtil.get(requestKey);
        if (existingRequest != null) {
            asyncInTaskInflight.remove(requestKey);
            return; // 已有请求在进行中,跳过
        }
        // 标记请求进行中,避免请求在线程池排队时被重复提交
        redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS);
        try {
            APPLY_IN_TASK_EXECUTOR.execute(() -> {
                try {
                    String response = applyInTask(request);
                    AsyncInTaskResult result = buildAsyncInTaskResult(request, response, null);
                    redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
                    News.info("异步WMS入库请求完成,barcode={},stationId={},taskNo={},status={},response={}",
                            request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(),
                            result.getStatus(), response);
                } catch (Exception e) {
                    News.error("异步WMS入库请求异常,barcode={},stationId={},taskNo={},error={}",
                            request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(), e.getMessage());
                    AsyncInTaskResult result = buildAsyncInTaskResult(request, null, e);
                    redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
                } finally {
                    asyncInTaskInflight.remove(requestKey);
                    redisUtil.del(requestKey);
                }
            });
        } catch (RejectedExecutionException e) {
            asyncInTaskInflight.remove(requestKey);
            redisUtil.del(requestKey);
            AsyncInTaskResult result = new AsyncInTaskResult();
            result.setBizKey(buildAsyncInTaskBizKey(request));
            result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
            result.setMessage("ERROR:ASYNC_QUEUE_FULL");
            redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
            News.error("异步WMS入库请求被拒绝,线程池已满,barcode={},stationId={},taskNo={}",
                    request.getBarcode(), request.getSourceStaNo(), request.getTaskNo());
        }
    }
    /**
     * 查询异步入库任务请求结果
     *
     * @param barcode   托盘码
     * @param stationId 站点编号
     * @return 响应结果,null表示还未完成或未找到
     */
    public String queryAsyncInTaskResponse(String barcode, Integer stationId) {
        return queryAsyncInTaskResponse(barcode, stationId, null);
    }
    public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) {
        InTaskApplyRequest request = new InTaskApplyRequest();
        request.setBarcode(barcode);
        request.setSourceStaNo(stationId);
        request.setTaskNo(taskNo);
        AsyncInTaskResult result = queryAsyncInTaskResponse(request);
        if (result == null) {
            return null;
        }
        clearAsyncInTaskResponse(request);
        if (result.isSuccess()) {
            return result.getResponse();
        }
        if (!Cools.isEmpty(result.getMessage())) {
            return result.getMessage();
        }
        return result.getResponse();
    }
    public AsyncInTaskResult queryAsyncInTaskResponse(InTaskApplyRequest request) {
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
        Object response = redisUtil.get(responseKey);
        if (response == null) {
            return null;
        }
        return parseAsyncInTaskResult(buildAsyncInTaskBizKey(request), response.toString());
    }
    public void clearAsyncInTaskResponse(InTaskApplyRequest request) {
        String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request);
        redisUtil.del(responseKey);
    }
    private AsyncInTaskResult parseAsyncInTaskResult(String bizKey, String responseValue) {
        AsyncInTaskResult result = null;
        try {
            JSONObject jsonObject = JSON.parseObject(responseValue);
            if (jsonObject != null && jsonObject.containsKey("status")) {
                result = jsonObject.toJavaObject(AsyncInTaskResult.class);
            } else if (jsonObject != null && jsonObject.containsKey("code")) {
                result = new AsyncInTaskResult();
                result.setBizKey(bizKey);
                result.setResponse(responseValue);
                if (Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                    result.setStatus(AsyncInTaskStatus.SUCCESS);
                } else {
                    result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
                    result.setMessage(responseValue);
                }
            }
        } catch (Exception ignored) {
        }
        if (result != null) {
            return result;
        }
        result = new AsyncInTaskResult();
        result.setBizKey(bizKey);
        result.setResponse(responseValue);
        result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
        result.setMessage(responseValue);
        return result;
    }
    /**
     * 检查是否有异步请求正在进行中
     *
     * @param barcode   托盘码
     * @param stationId 站点编号
     * @return true表示正在请求中
     */
    public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
        return isAsyncRequestInProgress(barcode, stationId, null);
    }
    public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) {
        InTaskApplyRequest request = new InTaskApplyRequest();
        request.setBarcode(barcode);
        request.setSourceStaNo(stationId);
        request.setTaskNo(taskNo);
        return isAsyncRequestInProgress(request);
    }
    public boolean isAsyncRequestInProgress(InTaskApplyRequest request) {
        String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request);
        return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
    }
    private AsyncInTaskResult buildAsyncInTaskResult(InTaskApplyRequest request, String response, Exception exception) {
        AsyncInTaskResult result = new AsyncInTaskResult();
        result.setBizKey(buildAsyncInTaskBizKey(request));
        result.setResponse(response);
        if (exception != null) {
            result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
            result.setMessage("ERROR:" + exception.getMessage());
            return result;
        }
        if (response == null) {
            result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
            result.setMessage("FAILED");
            return result;
        }
        try {
            JSONObject jsonObject = JSON.parseObject(response);
            if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) {
                result.setStatus(AsyncInTaskStatus.SUCCESS);
                return result;
            }
        } catch (Exception ignored) {
        }
        result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL);
        result.setMessage(response);
        return result;
    }
    // 申请任务重新分配库位
@@ -439,6 +202,7 @@
        HashMap<String, Object> requestParam = new HashMap<>();
        String response = null;
        int result = 0;
        long startMs = System.currentTimeMillis();
        try {
            List<Integer> excludeCrnList = new ArrayList<>();
            List<Integer> excludeDualCrnList = new ArrayList<>();
@@ -463,19 +227,23 @@
                JSONObject jsonObject = JSON.parseObject(response);
                if (jsonObject.getInteger("code") == 200) {
                    result = 1;
                    News.info("请求申请任务重新分配入库接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemReassignInTaskUrl,
                            JSON.toJSONString(requestParam), response);
                    News.info("请求申请任务重新分配入库接口成功!!!url:{};request:{};response:{};cost={}ms",
                            wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response,
                            elapsedMs(startMs));
                } else {
                    News.info("请求申请任务重新分配入库接口失败,接口返回Code异常!!!url:{};request:{};response:{}",
                            wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response);
                    News.info("请求申请任务重新分配入库接口失败,接口返回Code异常!!!url:{};request:{};response:{};cost={}ms",
                            wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response,
                            elapsedMs(startMs));
                }
            } else {
                News.info("请求申请任务重新分配入库接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemReassignInTaskUrl,
                        JSON.toJSONString(requestParam), response);
                News.info("请求申请任务重新分配入库接口失败,接口未响应!!!url:{};request:{};response:{};cost={}ms",
                        wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response,
                        elapsedMs(startMs));
            }
        } catch (Exception e) {
            News.error("请求申请任务重新分配入库接口异常!!!url:{};request:{}; response:{}", wmsUrl + wmsSystemReassignInTaskUrl,
                    JSON.toJSONString(requestParam), response, e);
            News.error("请求申请任务重新分配入库接口异常!!!url:{};request:{}; response:{};cost={}ms",
                    wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response,
                    elapsedMs(startMs), e);
        } finally {
            HttpRequestLog httpRequestLog = new HttpRequestLog();
            httpRequestLog.setName(wmsUrl + wmsSystemReassignInTaskUrl);
@@ -546,6 +314,7 @@
        HashMap<String, Object> requestParam = new HashMap<>();
        String response = null;
        int result = 0;
        long startMs = System.currentTimeMillis();
        try {
            requestParam.put("locNo", locNo);
            requestParam.put("row", crnRows);
@@ -562,19 +331,23 @@
                JSONObject jsonObject = JSON.parseObject(response);
                if (jsonObject.getInteger("code") == 200) {
                    result = 1;
                    News.info("请求WMS申请更换库位接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl,
                            JSON.toJSONString(requestParam), response);
                    News.info("请求WMS申请更换库位接口成功!!!url:{};request:{};response:{};cost={}ms",
                            wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response,
                            elapsedMs(startMs));
                } else {
                    News.info("请求WMS申请更换库位接口失败,接口返回Code异常!!!url:{};request:{};response:{}",
                            wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response);
                    News.info("请求WMS申请更换库位接口失败,接口返回Code异常!!!url:{};request:{};response:{};cost={}ms",
                            wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response,
                            elapsedMs(startMs));
                }
            } else {
                News.info("请求WMS申请更换库位接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl,
                        JSON.toJSONString(requestParam), response);
                News.info("请求WMS申请更换库位接口失败,接口未响应!!!url:{};request:{};response:{};cost={}ms",
                        wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response,
                        elapsedMs(startMs));
            }
        } catch (Exception e) {
            News.error("请求WMS申请更换库位接口异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl,
                    JSON.toJSONString(requestParam), response, e);
            News.error("请求WMS申请更换库位接口异常!!!url:{};request:{};response:{};cost={}ms",
                    wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response,
                    elapsedMs(startMs), e);
        } finally {
            HttpRequestLog httpRequestLog = new HttpRequestLog();
            httpRequestLog.setName(wmsUrl + wmsSystemChangeLocNoUrl);
@@ -587,4 +360,35 @@
        return response;
    }
    private long elapsedMs(long startMs) {
        return System.currentTimeMillis() - startMs;
    }
    private long elapsedMsFromNano(long startNs) {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
    }
    private void addStepCost(Map<String, Long> stepCostMap, String stepName, long costMs) {
        stepCostMap.put(stepName, costMs);
    }
    private String formatStepCosts(Map<String, Long> stepCostMap) {
        return JSON.toJSONString(stepCostMap);
    }
    private String getSlowestStep(Map<String, Long> stepCostMap) {
        String slowestStepName = "-";
        long maxCostMs = -1L;
        for (Map.Entry<String, Long> entry : stepCostMap.entrySet()) {
            if (entry.getValue() != null && entry.getValue() > maxCostMs) {
                slowestStepName = entry.getKey();
                maxCostMs = entry.getValue();
            }
        }
        if (maxCostMs < 0) {
            return "-";
        }
        return slowestStepName + "=" + maxCostMs + "ms";
    }
}