package com.zy.core.utils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.core.common.Cools; import com.core.exception.CoolException; import com.zy.asrs.entity.BasCrnp; import com.zy.asrs.entity.BasDualCrnp; import com.zy.asrs.entity.BasStation; import com.zy.asrs.entity.HttpRequestLog; import com.zy.asrs.entity.WrkMast; import com.zy.asrs.service.BasCrnpService; import com.zy.asrs.service.BasDualCrnpService; import com.zy.asrs.service.BasStationService; import com.zy.asrs.service.HttpRequestLogService; import com.zy.asrs.service.WrkMastService; import com.zy.asrs.utils.Utils; import com.zy.common.entity.FindCrnNoResult; import com.zy.common.service.CommonService; import com.zy.common.utils.HttpHandler; import com.zy.common.utils.RedisUtil; import com.zy.core.News; import com.zy.core.enums.RedisKeyType; import com.zy.core.enums.SlaveType; import com.zy.core.plugin.store.AsyncInTaskResult; import com.zy.core.plugin.store.AsyncInTaskStatus; import com.zy.core.plugin.store.InTaskApplyRequest; import com.zy.system.entity.Config; import com.zy.system.service.ConfigService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Component public class WmsOperateUtils { private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2, Math.min(4, Runtime.getRuntime().availableProcessors())); private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200; private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60; private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60; private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1); private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor( APPLY_IN_TASK_ASYNC_THREADS, APPLY_IN_TASK_ASYNC_THREADS, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY), runnable -> { Thread thread = new Thread(runnable); thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement()); thread.setDaemon(true); return thread; } ); @Autowired private ConfigService configService; @Autowired private HttpRequestLogService httpRequestLogService; @Autowired private WrkMastService wrkMastService; @Autowired private CommonService commonService; @Autowired private BasCrnpService basCrnpService; @Autowired private BasDualCrnpService basDualCrnpService; @Autowired private BasStationService basStationService; @Autowired private RedisUtil redisUtil; private final ConcurrentMap asyncInTaskInflight = new ConcurrentHashMap<>(); private String buildAsyncInTaskBizKey(InTaskApplyRequest request) { if (!Cools.isEmpty(request.getBizKey())) { return request.getBizKey(); } StringBuilder keyBuilder = new StringBuilder(request.getBarcode()) .append("_") .append(request.getSourceStaNo()); if (request.getTaskNo() != null && request.getTaskNo() > 0) { keyBuilder.append("_").append(request.getTaskNo()); } return keyBuilder.toString(); } private String buildAsyncInTaskKey(String prefix, InTaskApplyRequest request) { return prefix + buildAsyncInTaskBizKey(request); } private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) { StringBuilder keyBuilder = new StringBuilder(prefix) .append(barcode) .append("_") .append(stationId); if (taskNo != null && taskNo > 0) { keyBuilder.append("_").append(taskNo); } return keyBuilder.toString(); } // 申请入库任务 public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) { InTaskApplyRequest request = new InTaskApplyRequest(); request.setBarcode(barcode); request.setSourceStaNo(sourceStaNo); request.setLocType1(locType1); return applyInTask(request); } public String applyInTask(InTaskApplyRequest request) { Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key); if (systemConfigMapObj == null) { News.error("系统Config缓存失效"); return null; } HashMap systemConfigMap = (HashMap) systemConfigMapObj; String wmsUrl = systemConfigMap.get("wmsSystemUri"); if (wmsUrl == null) { News.error("未配置WMS系统URI,配置文件Code编码:wmsSystemUri"); return null; } String wmsSystemInUrl = systemConfigMap.get("wmsSystemInUrl"); if (wmsSystemInUrl == null) { News.error("未配置WMS入库接口地址,配置文件Code编码:wmsSystemInUrl"); return null; } Map requestParam = new LinkedHashMap<>(); String response = null; int result = 0; try { BasStation basStation = basStationService .getOne(new QueryWrapper().eq("station_id", request.getSourceStaNo())); if (basStation == null) { News.error("站点{}不存在", request.getSourceStaNo()); return null; } String stationNo = String.valueOf(request.getSourceStaNo()); if (!Cools.isEmpty(basStation.getStationAlias())) { stationNo = basStation.getStationAlias(); } requestParam.put("barcode", request.getBarcode()); requestParam.put("sourceStaNo", stationNo); requestParam.put("locType1", request.getLocType1() == null ? 1 : request.getLocType1()); requestParam.put("row", Utils.getInTaskEnableRow(request.getSourceStaNo())); if (request.getExtraParams() != null && !request.getExtraParams().isEmpty()) { requestParam.putAll(request.getExtraParams()); } response = new HttpHandler.Builder() .setUri(wmsUrl) .setPath(wmsSystemInUrl) .setJson(JSON.toJSONString(requestParam)) .setTimeout(30, TimeUnit.SECONDS) .build() .doPost(); if (!Cools.isEmpty(response)) { JSONObject jsonObject = JSON.parseObject(response); if (jsonObject.getInteger("code") == 200) { result = 1; News.info("请求WMS入库接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response); } else { News.info("请求WMS入库接口失败,接口返回Code异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response); } } else { News.info("请求WMS入库接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response); } } catch (Exception e) { News.error("请求WMS入库接口异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response, e); } finally { HttpRequestLog httpRequestLog = new HttpRequestLog(); httpRequestLog.setName(wmsUrl + wmsSystemInUrl); httpRequestLog.setRequest(JSON.toJSONString(requestParam)); httpRequestLog.setResponse(response); httpRequestLog.setCreateTime(new Date()); httpRequestLog.setResult(result); httpRequestLogService.save(httpRequestLog); } return response; } /** * 异步申请入库任务 - 非阻塞版本 * 将请求提交到线程池异步执行,结果存储到Redis中 * * @param barcode 托盘码 * @param sourceStaNo 站点编号 * @param locType1 托盘高度 */ public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) { applyInTaskAsync(barcode, sourceStaNo, null, locType1); } public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) { InTaskApplyRequest request = new InTaskApplyRequest(); request.setBarcode(barcode); request.setSourceStaNo(sourceStaNo); request.setTaskNo(taskNo); request.setLocType1(locType1); applyInTaskAsync(request); } public void applyInTaskAsync(InTaskApplyRequest request) { String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request); String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request); if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) { return; } // 检查是否已有请求在进行中 Object existingRequest = redisUtil.get(requestKey); if (existingRequest != null) { asyncInTaskInflight.remove(requestKey); return; // 已有请求在进行中,跳过 } // 标记请求进行中,避免请求在线程池排队时被重复提交 redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS); try { APPLY_IN_TASK_EXECUTOR.execute(() -> { try { String response = applyInTask(request); AsyncInTaskResult result = buildAsyncInTaskResult(request, response, null); redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS); News.info("异步WMS入库请求完成,barcode={},stationId={},taskNo={},status={},response={}", request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(), result.getStatus(), response); } catch (Exception e) { News.error("异步WMS入库请求异常,barcode={},stationId={},taskNo={},error={}", request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(), e.getMessage()); AsyncInTaskResult result = buildAsyncInTaskResult(request, null, e); redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS); } finally { asyncInTaskInflight.remove(requestKey); redisUtil.del(requestKey); } }); } catch (RejectedExecutionException e) { asyncInTaskInflight.remove(requestKey); redisUtil.del(requestKey); AsyncInTaskResult result = new AsyncInTaskResult(); result.setBizKey(buildAsyncInTaskBizKey(request)); result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); result.setMessage("ERROR:ASYNC_QUEUE_FULL"); redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS); News.error("异步WMS入库请求被拒绝,线程池已满,barcode={},stationId={},taskNo={}", request.getBarcode(), request.getSourceStaNo(), request.getTaskNo()); } } /** * 查询异步入库任务请求结果 * * @param barcode 托盘码 * @param stationId 站点编号 * @return 响应结果,null表示还未完成或未找到 */ public String queryAsyncInTaskResponse(String barcode, Integer stationId) { return queryAsyncInTaskResponse(barcode, stationId, null); } public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) { InTaskApplyRequest request = new InTaskApplyRequest(); request.setBarcode(barcode); request.setSourceStaNo(stationId); request.setTaskNo(taskNo); AsyncInTaskResult result = queryAsyncInTaskResponse(request); if (result == null) { return null; } clearAsyncInTaskResponse(request); if (result.isSuccess()) { return result.getResponse(); } if (!Cools.isEmpty(result.getMessage())) { return result.getMessage(); } return result.getResponse(); } public AsyncInTaskResult queryAsyncInTaskResponse(InTaskApplyRequest request) { String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request); Object response = redisUtil.get(responseKey); if (response == null) { return null; } return parseAsyncInTaskResult(buildAsyncInTaskBizKey(request), response.toString()); } public void clearAsyncInTaskResponse(InTaskApplyRequest request) { String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request); redisUtil.del(responseKey); } private AsyncInTaskResult parseAsyncInTaskResult(String bizKey, String responseValue) { AsyncInTaskResult result = null; try { JSONObject jsonObject = JSON.parseObject(responseValue); if (jsonObject != null && jsonObject.containsKey("status")) { result = jsonObject.toJavaObject(AsyncInTaskResult.class); } else if (jsonObject != null && jsonObject.containsKey("code")) { result = new AsyncInTaskResult(); result.setBizKey(bizKey); result.setResponse(responseValue); if (Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { result.setStatus(AsyncInTaskStatus.SUCCESS); } else { result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); result.setMessage(responseValue); } } } catch (Exception ignored) { } if (result != null) { return result; } result = new AsyncInTaskResult(); result.setBizKey(bizKey); result.setResponse(responseValue); result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); result.setMessage(responseValue); return result; } /** * 检查是否有异步请求正在进行中 * * @param barcode 托盘码 * @param stationId 站点编号 * @return true表示正在请求中 */ public boolean isAsyncRequestInProgress(String barcode, Integer stationId) { return isAsyncRequestInProgress(barcode, stationId, null); } public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) { InTaskApplyRequest request = new InTaskApplyRequest(); request.setBarcode(barcode); request.setSourceStaNo(stationId); request.setTaskNo(taskNo); return isAsyncRequestInProgress(request); } public boolean isAsyncRequestInProgress(InTaskApplyRequest request) { String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request); return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null; } private AsyncInTaskResult buildAsyncInTaskResult(InTaskApplyRequest request, String response, Exception exception) { AsyncInTaskResult result = new AsyncInTaskResult(); result.setBizKey(buildAsyncInTaskBizKey(request)); result.setResponse(response); if (exception != null) { result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); result.setMessage("ERROR:" + exception.getMessage()); return result; } if (response == null) { result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); result.setMessage("FAILED"); return result; } try { JSONObject jsonObject = JSON.parseObject(response); if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { result.setStatus(AsyncInTaskStatus.SUCCESS); return result; } } catch (Exception ignored) { } result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); result.setMessage(response); return result; } // 申请任务重新分配库位 public synchronized String applyReassignTaskLocNo(Integer taskNo, Integer stationId) { String wmsUrl = null; Config wmsSystemUriConfig = configService.getOne(new QueryWrapper().eq("code", "wmsSystemUri")); if (wmsSystemUriConfig != null) { wmsUrl = wmsSystemUriConfig.getValue(); } if (wmsUrl == null) { News.error("未配置WMS系统URI,配置文件Code编码:wmsSystemUri"); return null; } String wmsSystemReassignInTaskUrl = null; Config wmsSystemReassignInTaskUrlConfig = configService .getOne(new QueryWrapper().eq("code", "wmsSystemReassignInTaskUrl")); if (wmsSystemReassignInTaskUrlConfig != null) { wmsSystemReassignInTaskUrl = wmsSystemReassignInTaskUrlConfig.getValue(); } if (wmsSystemReassignInTaskUrl == null) { News.error("未配置WMS任务重新分配入库库位接口地址,配置文件Code编码:wmsSystemReassignInTaskUrl"); return null; } WrkMast wrkMast = wrkMastService.selectByWorkNo(taskNo); if (wrkMast == null) { News.info("无法找到对应任务,工作号={}", taskNo); return null; } HashMap requestParam = new HashMap<>(); String response = null; int result = 0; try { List excludeCrnList = new ArrayList<>(); List excludeDualCrnList = new ArrayList<>(); if (!Cools.isEmpty(wrkMast.getCrnNo())) { excludeCrnList.add(wrkMast.getCrnNo()); } if (!Cools.isEmpty(wrkMast.getDualCrnNo())) { excludeDualCrnList.add(wrkMast.getDualCrnNo()); } requestParam.put("taskNo", wrkMast.getWmsWrkNo()); requestParam.put("row", Utils.getInTaskEnableRow(stationId, excludeCrnList, excludeDualCrnList, false)); response = new HttpHandler.Builder() .setUri(wmsUrl) .setPath(wmsSystemReassignInTaskUrl) .setJson(JSON.toJSONString(requestParam)) .setTimeout(30, TimeUnit.SECONDS) .build() .doPost(); if (!Cools.isEmpty(response)) { JSONObject jsonObject = JSON.parseObject(response); if (jsonObject.getInteger("code") == 200) { result = 1; News.info("请求申请任务重新分配入库接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response); } else { News.info("请求申请任务重新分配入库接口失败,接口返回Code异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response); } } else { News.info("请求申请任务重新分配入库接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response); } } catch (Exception e) { News.error("请求申请任务重新分配入库接口异常!!!url:{};request:{}; response:{}", wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response, e); } finally { HttpRequestLog httpRequestLog = new HttpRequestLog(); httpRequestLog.setName(wmsUrl + wmsSystemReassignInTaskUrl); httpRequestLog.setRequest(JSON.toJSONString(requestParam)); httpRequestLog.setResponse(response); httpRequestLog.setCreateTime(new Date()); httpRequestLog.setResult(result); httpRequestLogService.save(httpRequestLog); } return response; } // 申请在库库位更换库位 public synchronized String applyChangeLocNo(String locNo) { String wmsUrl = null; Config wmsSystemUriConfig = configService.getOne(new QueryWrapper().eq("code", "wmsSystemUri")); if (wmsSystemUriConfig != null) { wmsUrl = wmsSystemUriConfig.getValue(); } if (wmsUrl == null) { News.error("未配置WMS系统URI,配置文件Code编码:wmsSystemUri"); return null; } String wmsSystemChangeLocNoUrl = null; Config wmsSystemChangeLocNoUrlConfig = configService .getOne(new QueryWrapper().eq("code", "wmsSystemChangeLocNoUrl")); if (wmsSystemChangeLocNoUrlConfig != null) { wmsSystemChangeLocNoUrl = wmsSystemChangeLocNoUrlConfig.getValue(); } if (wmsSystemChangeLocNoUrl == null) { News.error("未配置申请在库库位更换库位接口地址,配置文件Code编码:wmsSystemChangeLocNoUrl"); return null; } FindCrnNoResult findCrnNoResult = commonService.findCrnNoByLocNo(locNo); if (findCrnNoResult == null) { return null; } Integer crnNo = findCrnNoResult.getCrnNo(); List crnRows = new ArrayList<>(); if (findCrnNoResult.getCrnType().equals(SlaveType.Crn)) { BasCrnp basCrnp = basCrnpService.getOne(new QueryWrapper().eq("crn_no", crnNo)); if (basCrnp == null) { return null; } List> rowList = basCrnp.getControlRows$(); for (List list : rowList) { crnRows.addAll(list); } } else if (findCrnNoResult.getCrnType().equals(SlaveType.DualCrn)) { BasDualCrnp basDualCrnp = basDualCrnpService .getOne(new QueryWrapper().eq("crn_no", crnNo)); if (basDualCrnp == null) { return null; } List> rowList = basDualCrnp.getControlRows$(); for (List list : rowList) { crnRows.addAll(list); } } else { throw new CoolException("未知设备类型"); } HashMap requestParam = new HashMap<>(); String response = null; int result = 0; try { requestParam.put("locNo", locNo); requestParam.put("row", crnRows); response = new HttpHandler.Builder() .setUri(wmsUrl) .setPath(wmsSystemChangeLocNoUrl) .setJson(JSON.toJSONString(requestParam)) .setTimeout(30, TimeUnit.SECONDS) .build() .doPost(); if (!Cools.isEmpty(response)) { JSONObject jsonObject = JSON.parseObject(response); if (jsonObject.getInteger("code") == 200) { result = 1; News.info("请求WMS申请更换库位接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response); } else { News.info("请求WMS申请更换库位接口失败,接口返回Code异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response); } } else { News.info("请求WMS申请更换库位接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response); } } catch (Exception e) { News.error("请求WMS申请更换库位接口异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response, e); } finally { HttpRequestLog httpRequestLog = new HttpRequestLog(); httpRequestLog.setName(wmsUrl + wmsSystemChangeLocNoUrl); httpRequestLog.setRequest(JSON.toJSONString(requestParam)); httpRequestLog.setResponse(response); httpRequestLog.setCreateTime(new Date()); httpRequestLog.setResult(result); httpRequestLogService.save(httpRequestLog); } return response; } }