| | |
| | | import com.zy.core.News; |
| | | import com.zy.core.enums.RedisKeyType; |
| | | import com.zy.core.enums.SlaveType; |
| | | import com.zy.core.plugin.store.AsyncInTaskResult; |
| | | import com.zy.core.plugin.store.AsyncInTaskStatus; |
| | | import com.zy.core.plugin.store.InTaskApplyRequest; |
| | | import com.zy.system.entity.Config; |
| | | import com.zy.system.service.ConfigService; |
| | |
| | | import java.util.LinkedHashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.RejectedExecutionException; |
| | | import java.util.concurrent.ThreadPoolExecutor; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | @Component |
| | | public class WmsOperateUtils { |
| | | |
| | | private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2, |
| | | Math.min(4, Runtime.getRuntime().availableProcessors())); |
| | | private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200; |
| | | private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60; |
| | | private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60; |
| | | private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1); |
| | | private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor( |
| | | APPLY_IN_TASK_ASYNC_THREADS, |
| | | APPLY_IN_TASK_ASYNC_THREADS, |
| | | 60L, |
| | | TimeUnit.SECONDS, |
| | | new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY), |
| | | runnable -> { |
| | | Thread thread = new Thread(runnable); |
| | | thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement()); |
| | | thread.setDaemon(true); |
| | | return thread; |
| | | } |
| | | ); |
| | | private static final int APPLY_IN_TASK_TIMEOUT_SECONDS = 5; |
| | | |
| | | @Autowired |
| | | private ConfigService configService; |
| | |
| | | private BasStationService basStationService; |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | |
| | | private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>(); |
| | | |
| | | private String buildAsyncInTaskBizKey(InTaskApplyRequest request) { |
| | | if (!Cools.isEmpty(request.getBizKey())) { |
| | | return request.getBizKey(); |
| | | } |
| | | StringBuilder keyBuilder = new StringBuilder(request.getBarcode()) |
| | | .append("_") |
| | | .append(request.getSourceStaNo()); |
| | | if (request.getTaskNo() != null && request.getTaskNo() > 0) { |
| | | keyBuilder.append("_").append(request.getTaskNo()); |
| | | } |
| | | return keyBuilder.toString(); |
| | | } |
| | | |
| | | private String buildAsyncInTaskKey(String prefix, InTaskApplyRequest request) { |
| | | return prefix + buildAsyncInTaskBizKey(request); |
| | | } |
| | | |
| | | private String buildAsyncInTaskKey(String prefix, String barcode, Integer stationId, Integer taskNo) { |
| | | StringBuilder keyBuilder = new StringBuilder(prefix) |
| | | .append(barcode) |
| | | .append("_") |
| | | .append(stationId); |
| | | if (taskNo != null && taskNo > 0) { |
| | | keyBuilder.append("_").append(taskNo); |
| | | } |
| | | return keyBuilder.toString(); |
| | | } |
| | | |
| | | // 申请入库任务 |
| | | public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) { |
| | |
| | | Map<String, Object> requestParam = new LinkedHashMap<>(); |
| | | String response = null; |
| | | int result = 0; |
| | | long startMs = System.currentTimeMillis(); |
| | | try { |
| | | BasStation basStation = basStationService |
| | | .getOne(new QueryWrapper<BasStation>().eq("station_id", request.getSourceStaNo())); |
| | |
| | | .setUri(wmsUrl) |
| | | .setPath(wmsSystemInUrl) |
| | | .setJson(JSON.toJSONString(requestParam)) |
| | | .setTimeout(30, TimeUnit.SECONDS) |
| | | .setTimeout(APPLY_IN_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS) |
| | | .build() |
| | | .doPost(); |
| | | if (!Cools.isEmpty(response)) { |
| | | JSONObject jsonObject = JSON.parseObject(response); |
| | | if (jsonObject.getInteger("code") == 200) { |
| | | result = 1; |
| | | News.info("请求WMS入库接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, |
| | | JSON.toJSONString(requestParam), response); |
| | | News.info("请求WMS入库接口成功!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response, elapsedMs(startMs)); |
| | | } else { |
| | | News.info("请求WMS入库接口失败,接口返回Code异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, |
| | | JSON.toJSONString(requestParam), response); |
| | | News.info("请求WMS入库接口失败,接口返回Code异常!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response, elapsedMs(startMs)); |
| | | } |
| | | } else { |
| | | News.info("请求WMS入库接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, |
| | | JSON.toJSONString(requestParam), response); |
| | | News.info("请求WMS入库接口失败,接口未响应!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemInUrl, JSON.toJSONString(requestParam), response, elapsedMs(startMs)); |
| | | } |
| | | } catch (Exception e) { |
| | | News.error("请求WMS入库接口异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemInUrl, |
| | | JSON.toJSONString(requestParam), response, e); |
| | | News.error("请求WMS入库接口异常!!!url:{};request:{};response:{};cost={}ms", wmsUrl + wmsSystemInUrl, |
| | | JSON.toJSONString(requestParam), response, elapsedMs(startMs), e); |
| | | } finally { |
| | | HttpRequestLog httpRequestLog = new HttpRequestLog(); |
| | | httpRequestLog.setName(wmsUrl + wmsSystemInUrl); |
| | |
| | | httpRequestLogService.save(httpRequestLog); |
| | | } |
| | | return response; |
| | | } |
| | | |
| | | /** |
| | | * 异步申请入库任务 - 非阻塞版本 |
| | | * 将请求提交到线程池异步执行,结果存储到Redis中 |
| | | * |
| | | * @param barcode 托盘码 |
| | | * @param sourceStaNo 站点编号 |
| | | * @param locType1 托盘高度 |
| | | */ |
| | | public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer locType1) { |
| | | applyInTaskAsync(barcode, sourceStaNo, null, locType1); |
| | | } |
| | | |
| | | public void applyInTaskAsync(String barcode, Integer sourceStaNo, Integer taskNo, Integer locType1) { |
| | | InTaskApplyRequest request = new InTaskApplyRequest(); |
| | | request.setBarcode(barcode); |
| | | request.setSourceStaNo(sourceStaNo); |
| | | request.setTaskNo(taskNo); |
| | | request.setLocType1(locType1); |
| | | applyInTaskAsync(request); |
| | | } |
| | | |
| | | public void applyInTaskAsync(InTaskApplyRequest request) { |
| | | String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request); |
| | | String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request); |
| | | |
| | | if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) { |
| | | return; |
| | | } |
| | | |
| | | // 检查是否已有请求在进行中 |
| | | Object existingRequest = redisUtil.get(requestKey); |
| | | if (existingRequest != null) { |
| | | asyncInTaskInflight.remove(requestKey); |
| | | return; // 已有请求在进行中,跳过 |
| | | } |
| | | |
| | | // 标记请求进行中,避免请求在线程池排队时被重复提交 |
| | | redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS); |
| | | |
| | | try { |
| | | APPLY_IN_TASK_EXECUTOR.execute(() -> { |
| | | try { |
| | | String response = applyInTask(request); |
| | | AsyncInTaskResult result = buildAsyncInTaskResult(request, response, null); |
| | | redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS); |
| | | News.info("异步WMS入库请求完成,barcode={},stationId={},taskNo={},status={},response={}", |
| | | request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(), |
| | | result.getStatus(), response); |
| | | } catch (Exception e) { |
| | | News.error("异步WMS入库请求异常,barcode={},stationId={},taskNo={},error={}", |
| | | request.getBarcode(), request.getSourceStaNo(), request.getTaskNo(), e.getMessage()); |
| | | AsyncInTaskResult result = buildAsyncInTaskResult(request, null, e); |
| | | redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS); |
| | | } finally { |
| | | asyncInTaskInflight.remove(requestKey); |
| | | redisUtil.del(requestKey); |
| | | } |
| | | }); |
| | | } catch (RejectedExecutionException e) { |
| | | asyncInTaskInflight.remove(requestKey); |
| | | redisUtil.del(requestKey); |
| | | AsyncInTaskResult result = new AsyncInTaskResult(); |
| | | result.setBizKey(buildAsyncInTaskBizKey(request)); |
| | | result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | result.setMessage("ERROR:ASYNC_QUEUE_FULL"); |
| | | redisUtil.set(responseKey, JSON.toJSONString(result), APPLY_IN_TASK_RESPONSE_TTL_SECONDS); |
| | | News.error("异步WMS入库请求被拒绝,线程池已满,barcode={},stationId={},taskNo={}", |
| | | request.getBarcode(), request.getSourceStaNo(), request.getTaskNo()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 查询异步入库任务请求结果 |
| | | * |
| | | * @param barcode 托盘码 |
| | | * @param stationId 站点编号 |
| | | * @return 响应结果,null表示还未完成或未找到 |
| | | */ |
| | | public String queryAsyncInTaskResponse(String barcode, Integer stationId) { |
| | | return queryAsyncInTaskResponse(barcode, stationId, null); |
| | | } |
| | | |
| | | public String queryAsyncInTaskResponse(String barcode, Integer stationId, Integer taskNo) { |
| | | InTaskApplyRequest request = new InTaskApplyRequest(); |
| | | request.setBarcode(barcode); |
| | | request.setSourceStaNo(stationId); |
| | | request.setTaskNo(taskNo); |
| | | AsyncInTaskResult result = queryAsyncInTaskResponse(request); |
| | | if (result == null) { |
| | | return null; |
| | | } |
| | | clearAsyncInTaskResponse(request); |
| | | if (result.isSuccess()) { |
| | | return result.getResponse(); |
| | | } |
| | | if (!Cools.isEmpty(result.getMessage())) { |
| | | return result.getMessage(); |
| | | } |
| | | return result.getResponse(); |
| | | } |
| | | |
| | | public AsyncInTaskResult queryAsyncInTaskResponse(InTaskApplyRequest request) { |
| | | String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request); |
| | | Object response = redisUtil.get(responseKey); |
| | | if (response == null) { |
| | | return null; |
| | | } |
| | | return parseAsyncInTaskResult(buildAsyncInTaskBizKey(request), response.toString()); |
| | | } |
| | | |
| | | public void clearAsyncInTaskResponse(InTaskApplyRequest request) { |
| | | String responseKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key, request); |
| | | redisUtil.del(responseKey); |
| | | } |
| | | |
| | | private AsyncInTaskResult parseAsyncInTaskResult(String bizKey, String responseValue) { |
| | | AsyncInTaskResult result = null; |
| | | try { |
| | | JSONObject jsonObject = JSON.parseObject(responseValue); |
| | | if (jsonObject != null && jsonObject.containsKey("status")) { |
| | | result = jsonObject.toJavaObject(AsyncInTaskResult.class); |
| | | } else if (jsonObject != null && jsonObject.containsKey("code")) { |
| | | result = new AsyncInTaskResult(); |
| | | result.setBizKey(bizKey); |
| | | result.setResponse(responseValue); |
| | | if (Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { |
| | | result.setStatus(AsyncInTaskStatus.SUCCESS); |
| | | } else { |
| | | result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | result.setMessage(responseValue); |
| | | } |
| | | } |
| | | } catch (Exception ignored) { |
| | | } |
| | | |
| | | if (result != null) { |
| | | return result; |
| | | } |
| | | |
| | | result = new AsyncInTaskResult(); |
| | | result.setBizKey(bizKey); |
| | | result.setResponse(responseValue); |
| | | result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | result.setMessage(responseValue); |
| | | return result; |
| | | } |
| | | |
| | | /** |
| | | * 检查是否有异步请求正在进行中 |
| | | * |
| | | * @param barcode 托盘码 |
| | | * @param stationId 站点编号 |
| | | * @return true表示正在请求中 |
| | | */ |
| | | public boolean isAsyncRequestInProgress(String barcode, Integer stationId) { |
| | | return isAsyncRequestInProgress(barcode, stationId, null); |
| | | } |
| | | |
| | | public boolean isAsyncRequestInProgress(String barcode, Integer stationId, Integer taskNo) { |
| | | InTaskApplyRequest request = new InTaskApplyRequest(); |
| | | request.setBarcode(barcode); |
| | | request.setSourceStaNo(stationId); |
| | | request.setTaskNo(taskNo); |
| | | return isAsyncRequestInProgress(request); |
| | | } |
| | | |
| | | public boolean isAsyncRequestInProgress(InTaskApplyRequest request) { |
| | | String requestKey = buildAsyncInTaskKey(RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key, request); |
| | | return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null; |
| | | } |
| | | |
| | | private AsyncInTaskResult buildAsyncInTaskResult(InTaskApplyRequest request, String response, Exception exception) { |
| | | AsyncInTaskResult result = new AsyncInTaskResult(); |
| | | result.setBizKey(buildAsyncInTaskBizKey(request)); |
| | | result.setResponse(response); |
| | | if (exception != null) { |
| | | result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | result.setMessage("ERROR:" + exception.getMessage()); |
| | | return result; |
| | | } |
| | | |
| | | if (response == null) { |
| | | result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | result.setMessage("FAILED"); |
| | | return result; |
| | | } |
| | | |
| | | try { |
| | | JSONObject jsonObject = JSON.parseObject(response); |
| | | if (jsonObject != null && Integer.valueOf(200).equals(jsonObject.getInteger("code"))) { |
| | | result.setStatus(AsyncInTaskStatus.SUCCESS); |
| | | return result; |
| | | } |
| | | } catch (Exception ignored) { |
| | | } |
| | | |
| | | result.setStatus(AsyncInTaskStatus.RETRYABLE_FAIL); |
| | | result.setMessage(response); |
| | | return result; |
| | | } |
| | | |
| | | // 申请任务重新分配库位 |
| | |
| | | HashMap<String, Object> requestParam = new HashMap<>(); |
| | | String response = null; |
| | | int result = 0; |
| | | long startMs = System.currentTimeMillis(); |
| | | try { |
| | | List<Integer> excludeCrnList = new ArrayList<>(); |
| | | List<Integer> excludeDualCrnList = new ArrayList<>(); |
| | |
| | | JSONObject jsonObject = JSON.parseObject(response); |
| | | if (jsonObject.getInteger("code") == 200) { |
| | | result = 1; |
| | | News.info("请求申请任务重新分配入库接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemReassignInTaskUrl, |
| | | JSON.toJSONString(requestParam), response); |
| | | News.info("请求申请任务重新分配入库接口成功!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs)); |
| | | } else { |
| | | News.info("请求申请任务重新分配入库接口失败,接口返回Code异常!!!url:{};request:{};response:{}", |
| | | wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response); |
| | | News.info("请求申请任务重新分配入库接口失败,接口返回Code异常!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs)); |
| | | } |
| | | } else { |
| | | News.info("请求申请任务重新分配入库接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemReassignInTaskUrl, |
| | | JSON.toJSONString(requestParam), response); |
| | | News.info("请求申请任务重新分配入库接口失败,接口未响应!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs)); |
| | | } |
| | | } catch (Exception e) { |
| | | News.error("请求申请任务重新分配入库接口异常!!!url:{};request:{}; response:{}", wmsUrl + wmsSystemReassignInTaskUrl, |
| | | JSON.toJSONString(requestParam), response, e); |
| | | News.error("请求申请任务重新分配入库接口异常!!!url:{};request:{}; response:{};cost={}ms", |
| | | wmsUrl + wmsSystemReassignInTaskUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs), e); |
| | | } finally { |
| | | HttpRequestLog httpRequestLog = new HttpRequestLog(); |
| | | httpRequestLog.setName(wmsUrl + wmsSystemReassignInTaskUrl); |
| | |
| | | HashMap<String, Object> requestParam = new HashMap<>(); |
| | | String response = null; |
| | | int result = 0; |
| | | long startMs = System.currentTimeMillis(); |
| | | try { |
| | | requestParam.put("locNo", locNo); |
| | | requestParam.put("row", crnRows); |
| | |
| | | JSONObject jsonObject = JSON.parseObject(response); |
| | | if (jsonObject.getInteger("code") == 200) { |
| | | result = 1; |
| | | News.info("请求WMS申请更换库位接口成功!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl, |
| | | JSON.toJSONString(requestParam), response); |
| | | News.info("请求WMS申请更换库位接口成功!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs)); |
| | | } else { |
| | | News.info("请求WMS申请更换库位接口失败,接口返回Code异常!!!url:{};request:{};response:{}", |
| | | wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response); |
| | | News.info("请求WMS申请更换库位接口失败,接口返回Code异常!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs)); |
| | | } |
| | | } else { |
| | | News.info("请求WMS申请更换库位接口失败,接口未响应!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl, |
| | | JSON.toJSONString(requestParam), response); |
| | | News.info("请求WMS申请更换库位接口失败,接口未响应!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs)); |
| | | } |
| | | } catch (Exception e) { |
| | | News.error("请求WMS申请更换库位接口异常!!!url:{};request:{};response:{}", wmsUrl + wmsSystemChangeLocNoUrl, |
| | | JSON.toJSONString(requestParam), response, e); |
| | | News.error("请求WMS申请更换库位接口异常!!!url:{};request:{};response:{};cost={}ms", |
| | | wmsUrl + wmsSystemChangeLocNoUrl, JSON.toJSONString(requestParam), response, |
| | | elapsedMs(startMs), e); |
| | | } finally { |
| | | HttpRequestLog httpRequestLog = new HttpRequestLog(); |
| | | httpRequestLog.setName(wmsUrl + wmsSystemChangeLocNoUrl); |
| | |
| | | return response; |
| | | } |
| | | |
| | | private long elapsedMs(long startMs) { |
| | | return System.currentTimeMillis() - startMs; |
| | | } |
| | | |
| | | } |