package com.vincent.rsf.server.manager.schedules; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload; import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam; import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam; import com.vincent.rsf.server.api.service.CloudWmsReportService; import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog; import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService; import com.vincent.rsf.server.system.constant.GlobalConfigCode; import com.vincent.rsf.server.system.entity.Config; import com.vincent.rsf.server.system.service.ConfigService; import feign.FeignException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; /** 云仓上报定时任务 */ @Slf4j @Component public class CloudWmsNotifySchedule { private static final int BATCH_LIMIT = -1; private static final int STORE_BODY_MAX_CHARS_DEFAULT = 2000; @Autowired private CloudWmsNotifyLogService cloudWmsNotifyLogService; @Autowired private CloudWmsReportService cloudWmsReportService; @Autowired private ObjectMapper objectMapper; @Autowired private ConfigService configService; @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}") public void recoverStaleSending() { try { cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss(); } catch (Exception e) { log.warn("云仓 sending 补偿任务异常:{}", e.getMessage()); } } @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}") public void syncCloudWmsNotify() { long t0 = System.currentTimeMillis(); try { List pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1); if (pending.isEmpty()) { log.debug("云仓上报调度:本轮待发送 0 条"); return; } log.info("云仓上报调度:本轮待发送 {} 条", pending.size()); log.info("云仓上报调度:开始派发"); dispatchPending(pending); log.info("云仓上报调度:派发完成"); log.info("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0); } catch (Exception e) { log.error("云仓上报调度异常,已耗时 {} ms", System.currentTimeMillis() - t0, e); } } private static boolean isSendingBusy(CloudWmsNotifyLog row) { Integer s = row.getSending(); return s != null && s != 0; } /** 同单多条合并上报 */ private void dispatchPending(List pending) { int rowsReported = 0; log.info("云仓上报派发:开始,待办 {} 条", pending.size()); long tCfg = System.currentTimeMillis(); log.info("云仓上报派发:读取上报类型配置"); String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult(); log.info("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg); LinkedHashMap> inOutGroups = new LinkedHashMap<>(); int n = pending.size(); for (int i = 0; i < n; i++) { CloudWmsNotifyLog row = pending.get(i); if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) { log.info("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId()); } if (!rtInOut.equals(row.getReportType())) { continue; } String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody()); if (key == null) { continue; } inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row); } log.info("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size()); Set done = new HashSet<>(); for (CloudWmsNotifyLog row : pending) { Long rid = row.getId(); if (rid != null && done.contains(rid)) { continue; } if (!rtInOut.equals(row.getReportType())) { if (isSendingBusy(row)) { continue; } log.info("云仓上报派发:其它类型单条 id={}", rid); if (safeProcessOne(row)) { rowsReported++; if (rid != null) { done.add(rid); } } continue; } String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody()); if (key == null) { if (isSendingBusy(row)) { continue; } log.info("云仓上报派发:无合并键单条 id={}", rid); if (safeProcessOne(row)) { rowsReported++; if (rid != null) { done.add(rid); } } continue; } List g = inOutGroups.get(key); List work = new ArrayList<>(); if (g != null) { for (CloudWmsNotifyLog x : g) { if (x.getId() == null || done.contains(x.getId())) { continue; } if (isSendingBusy(x)) { continue; } work.add(x); } } if (work.isEmpty()) { continue; } if (work.size() >= 2) { log.info("云仓上报派发:同单合并 {} 条", work.size()); Set handled = safeProcessMergedInOutGroup(work); done.addAll(handled); rowsReported += handled.size(); } else { log.info("云仓上报派发:入出库单条 id={}", work.get(0).getId()); if (safeProcessOne(work.get(0))) { rowsReported++; if (work.get(0).getId() != null) { done.add(work.get(0).getId()); } } } } log.info("云仓上报派发:遍历结束"); if (rowsReported == 0) { log.warn("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size()); } else { log.info("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported); } } private Set safeProcessMergedInOutGroup(List group) { Set handled = new HashSet<>(); List claimedIds = new ArrayList<>(); List claimedRows = new ArrayList<>(); log.info("云仓上报派发:合并组抢占,行数 {}", group.size()); try { for (CloudWmsNotifyLog row : group) { Long id = row.getId(); log.debug("云仓上报派发:合并抢占尝试 id={}", id); if (!cloudWmsNotifyLogService.tryClaimSending(id)) { log.warn("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id); continue; } claimedIds.add(id); claimedRows.add(row); } if (claimedRows.isEmpty()) { log.warn("云仓上报派发:合并组抢占 0 行"); return handled; } log.info("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size()); processMergedInOut(claimedRows); log.info("云仓上报派发:合并组云仓调用已返回"); handled.addAll(claimedIds); } catch (Exception e) { log.warn("云仓上报同批合并异常:{}", e.getMessage()); } finally { for (Long id : claimedIds) { cloudWmsNotifyLogService.clearSending(id); } } return handled; } private void processMergedInOut(List group) { Date now = new Date(); List lines = new ArrayList<>(); try { for (CloudWmsNotifyLog row : group) { lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody())); } } catch (IOException e) { String msg = "反序列化失败: " + e.getMessage(); for (CloudWmsNotifyLog row : group) { int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1; setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount()); } return; } if (lines.isEmpty()) { return; } String mergedBody; try { mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines)); } catch (JsonProcessingException e) { for (CloudWmsNotifyLog row : group) { int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1; setFailResult(row, row.getRequestBody(), "合并请求体序列化失败: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount()); } return; } log.info("云仓上报开始(同单合并),ids={},requestBody={}", idsOf(group), mergedBody); try { Map res = cloudWmsReportService.reportInOutResults(lines); for (CloudWmsNotifyLog row : group) { int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1; updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount()); } } catch (FeignException e) { String responseBody = e.contentUTF8(); String fullMsg = "status=" + e.status() + ",message=" + e.getMessage() + (responseBody == null || responseBody.isEmpty() ? "" : ",responseBody=" + responseBody); log.warn("云仓上报同批合并请求失败:{}", fullMsg); for (CloudWmsNotifyLog row : group) { int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1; setFailResult(row, mergedBody, "请求异常: " + fullMsg, nextRetry, now, row.getMaxRetryCount()); } } catch (Exception e) { log.warn("云仓上报同批合并请求失败:{}", e.getMessage()); for (CloudWmsNotifyLog row : group) { int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1; setFailResult(row, mergedBody, "请求异常: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount()); } } } private static List idsOf(List group) { List ids = new ArrayList<>(group.size()); for (CloudWmsNotifyLog row : group) { ids.add(row.getId()); } return ids; } private boolean safeProcessOne(CloudWmsNotifyLog logRecord) { Long id = logRecord.getId(); log.info("云仓上报派发:单条抢占 id={}", id); if (!cloudWmsNotifyLogService.tryClaimSending(id)) { log.warn("云仓上报派发:单条未抢到 id={}", id); return false; } try { processOne(logRecord); } catch (Exception e) { log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage()); } finally { cloudWmsNotifyLogService.clearSending(id); } return true; } private void processOne(CloudWmsNotifyLog logRecord) { String reportType = logRecord.getReportType(); String requestBody = logRecord.getRequestBody(); Date now = new Date(); int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1; int effectiveMaxRetry = logRecord.getMaxRetryCount(); String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult(); String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust(); log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}", logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody); try { if (rtInOut.equals(reportType)) { JsonNode root = objectMapper.readTree(requestBody); Map res; if (root.has("lines") && root.get("lines").isArray()) { InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class); res = cloudWmsReportService.reportInOutResults(batch.getLines()); } else { InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class); res = cloudWmsReportService.reportInOutResult(param); } updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry); } else if (rtAdj.equals(reportType)) { InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class); Map res = cloudWmsReportService.reportInventoryAdjust(param); updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry); } else { log.warn("未知上报类型,id={},reportType={}", logRecord.getId(), reportType); return; } } catch (JsonProcessingException e) { log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage()); setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry); } catch (FeignException e) { String responseBody = e.contentUTF8(); String fullMsg = "status=" + e.status() + ",message=" + e.getMessage() + (responseBody == null || responseBody.isEmpty() ? "" : ",responseBody=" + responseBody); log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), fullMsg); setFailResult(logRecord, requestBody, "请求异常: " + fullMsg, nextRetry, now, effectiveMaxRetry); } catch (Exception e) { log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage()); setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry); } } private void updateAfterNotify(CloudWmsNotifyLog logRecord, String requestBody, Map res, int nextRetry, Date now, int effectiveMaxRetry) { String responseJson; try { responseJson = res != null ? objectMapper.writeValueAsString(res) : "null"; } catch (JsonProcessingException e) { responseJson = String.valueOf(res); } Object codeObj = res != null ? res.get("code") : null; Object statusObj = res != null ? res.get("status") : null; boolean success = Integer.valueOf(200).equals(codeObj) || Integer.valueOf(200).equals(statusObj); int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusFail(); logRecord.setLastRequestBody(truncateForStore(requestBody)); logRecord.setLastResponseBody(truncateForStore(responseJson)); logRecord.setLastNotifyTime(now); logRecord.setRetryCount(nextRetry); logRecord.setNotifyStatus(status); logRecord.setSending(0); logRecord.setUpdateTime(now); cloudWmsNotifyLogService.updateById(logRecord); log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}", logRecord.getId(), logRecord.getBizRef(), nextRetry, status, responseJson); } private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) { logRecord.setLastRequestBody(truncateForStore(requestBody)); logRecord.setLastResponseBody(truncateForStore(errorMsg)); logRecord.setLastNotifyTime(now); logRecord.setRetryCount(nextRetry); int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending(); logRecord.setNotifyStatus(status); logRecord.setSending(0); logRecord.setUpdateTime(now); cloudWmsNotifyLogService.updateById(logRecord); log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}", logRecord.getId(), logRecord.getBizRef(), nextRetry, logRecord.getNotifyStatus(), errorMsg); } /** maxRetry = -1 表示无限重发 */ private boolean isInfiniteRetry(Integer maxRetry) { return maxRetry != null && maxRetry == -1; } private String truncateForStore(String body) { int maxChars = resolveStoreBodyMaxChars(); if (body == null || body.length() <= maxChars) { return body; } return body.substring(0, maxChars); } private int resolveStoreBodyMaxChars() { try { Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_NOTIFY_STORE_BODY_MAX_CHARS); if (cfg == null || cfg.getVal() == null || cfg.getVal().trim().isEmpty()) { return STORE_BODY_MAX_CHARS_DEFAULT; } int parsed = Integer.parseInt(cfg.getVal().trim()); return parsed > 0 ? parsed : STORE_BODY_MAX_CHARS_DEFAULT; } catch (Exception e) { return STORE_BODY_MAX_CHARS_DEFAULT; } } }