| | |
| | | @Autowired |
| | | private ConfigService configService; |
| | | |
| | | /** sending=1 且 Redis 占位已失、update_time 超时:补偿清零 */ |
| | | @Scheduled(cron = "0 0/2 * * * ?") |
| | | // @Scheduled(cron = "0/5 * * * * ?") |
| | | @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}") |
| | | public void recoverStaleSending() { |
| | | try { |
| | | cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss(); |
| | |
| | | } |
| | | } |
| | | |
| | | @Scheduled(cron = "0/60 * * * * ?") |
| | | @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}") |
| | | public void syncCloudWmsNotify() { |
| | | List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1); |
| | | if (pending.isEmpty()) { |
| | | log.debug("云仓上报调度:本轮待发送 0 条"); |
| | | return; |
| | | long t0 = System.currentTimeMillis(); |
| | | try { |
| | | List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1); |
| | | if (pending.isEmpty()) { |
| | | log.debug("云仓上报调度:本轮待发送 0 条"); |
| | | return; |
| | | } |
| | | log.info("云仓上报调度:本轮待发送 {} 条", pending.size()); |
| | | log.info("云仓上报调度:开始派发"); |
| | | dispatchPending(pending); |
| | | log.info("云仓上报调度:派发完成"); |
| | | log.info("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0); |
| | | } catch (Exception e) { |
| | | log.error("云仓上报调度异常,已耗时 {} ms", System.currentTimeMillis() - t0, e); |
| | | } |
| | | log.info("云仓上报调度:本轮待发送 {} 条", pending.size()); |
| | | dispatchPending(pending); |
| | | } |
| | | |
| | | private static boolean isSendingBusy(CloudWmsNotifyLog row) { |
| | | Integer s = row.getSending(); |
| | | return s != null && s != 0; |
| | | } |
| | | |
| | | /** 同单多条合并上报 */ |
| | | private void dispatchPending(List<CloudWmsNotifyLog> pending) { |
| | | int rowsReported = 0; |
| | | log.info("云仓上报派发:开始,待办 {} 条", pending.size()); |
| | | long tCfg = System.currentTimeMillis(); |
| | | log.info("云仓上报派发:读取上报类型配置"); |
| | | String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult(); |
| | | log.info("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg); |
| | | LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>(); |
| | | for (CloudWmsNotifyLog row : pending) { |
| | | int n = pending.size(); |
| | | for (int i = 0; i < n; i++) { |
| | | CloudWmsNotifyLog row = pending.get(i); |
| | | if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) { |
| | | log.info("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId()); |
| | | } |
| | | if (!rtInOut.equals(row.getReportType())) { |
| | | continue; |
| | | } |
| | |
| | | } |
| | | inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row); |
| | | } |
| | | log.info("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size()); |
| | | Set<Long> done = new HashSet<>(); |
| | | for (CloudWmsNotifyLog row : pending) { |
| | | Long rid = row.getId(); |
| | |
| | | continue; |
| | | } |
| | | if (!rtInOut.equals(row.getReportType())) { |
| | | safeProcessOne(row); |
| | | if (rid != null) { |
| | | done.add(rid); |
| | | if (isSendingBusy(row)) { |
| | | continue; |
| | | } |
| | | log.info("云仓上报派发:其它类型单条 id={}", rid); |
| | | if (safeProcessOne(row)) { |
| | | rowsReported++; |
| | | if (rid != null) { |
| | | done.add(rid); |
| | | } |
| | | } |
| | | continue; |
| | | } |
| | | String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody()); |
| | | if (key == null) { |
| | | safeProcessOne(row); |
| | | if (rid != null) { |
| | | done.add(rid); |
| | | if (isSendingBusy(row)) { |
| | | continue; |
| | | } |
| | | log.info("云仓上报派发:无合并键单条 id={}", rid); |
| | | if (safeProcessOne(row)) { |
| | | rowsReported++; |
| | | if (rid != null) { |
| | | done.add(rid); |
| | | } |
| | | } |
| | | continue; |
| | | } |
| | | List<CloudWmsNotifyLog> g = inOutGroups.get(key); |
| | | if (g != null && g.size() >= 2) { |
| | | safeProcessMergedInOutGroup(g); |
| | | List<CloudWmsNotifyLog> work = new ArrayList<>(); |
| | | if (g != null) { |
| | | for (CloudWmsNotifyLog x : g) { |
| | | if (x.getId() != null) { |
| | | done.add(x.getId()); |
| | | if (x.getId() == null || done.contains(x.getId())) { |
| | | continue; |
| | | } |
| | | if (isSendingBusy(x)) { |
| | | continue; |
| | | } |
| | | work.add(x); |
| | | } |
| | | } |
| | | if (work.isEmpty()) { |
| | | continue; |
| | | } |
| | | if (work.size() >= 2) { |
| | | log.info("云仓上报派发:同单合并 {} 条", work.size()); |
| | | Set<Long> handled = safeProcessMergedInOutGroup(work); |
| | | done.addAll(handled); |
| | | rowsReported += handled.size(); |
| | | } else { |
| | | safeProcessOne(row); |
| | | if (rid != null) { |
| | | done.add(rid); |
| | | log.info("云仓上报派发:入出库单条 id={}", work.get(0).getId()); |
| | | if (safeProcessOne(work.get(0))) { |
| | | rowsReported++; |
| | | if (work.get(0).getId() != null) { |
| | | done.add(work.get(0).getId()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | log.info("云仓上报派发:遍历结束"); |
| | | if (rowsReported == 0) { |
| | | log.warn("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size()); |
| | | } else { |
| | | log.info("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported); |
| | | } |
| | | } |
| | | |
| | | private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) { |
| | | private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) { |
| | | Set<Long> handled = new HashSet<>(); |
| | | List<Long> claimedIds = new ArrayList<>(); |
| | | List<CloudWmsNotifyLog> claimedRows = new ArrayList<>(); |
| | | log.info("云仓上报派发:合并组抢占,行数 {}", group.size()); |
| | | try { |
| | | for (CloudWmsNotifyLog row : group) { |
| | | Long id = row.getId(); |
| | | log.debug("云仓上报派发:合并抢占尝试 id={}", id); |
| | | if (!cloudWmsNotifyLogService.tryClaimSending(id)) { |
| | | log.debug("云仓上报同批合并未抢到发送权 id={}", id); |
| | | return; |
| | | log.warn("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id); |
| | | continue; |
| | | } |
| | | claimedIds.add(id); |
| | | claimedRows.add(row); |
| | | } |
| | | processMergedInOut(group); |
| | | if (claimedRows.isEmpty()) { |
| | | log.warn("云仓上报派发:合并组抢占 0 行"); |
| | | return handled; |
| | | } |
| | | log.info("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size()); |
| | | processMergedInOut(claimedRows); |
| | | log.info("云仓上报派发:合并组云仓调用已返回"); |
| | | handled.addAll(claimedIds); |
| | | } catch (Exception e) { |
| | | log.warn("云仓上报同批合并异常:{}", e.getMessage()); |
| | | } finally { |
| | |
| | | cloudWmsNotifyLogService.clearSending(id); |
| | | } |
| | | } |
| | | return handled; |
| | | } |
| | | |
| | | private void processMergedInOut(List<CloudWmsNotifyLog> group) { |
| | |
| | | return ids; |
| | | } |
| | | |
| | | private void safeProcessOne(CloudWmsNotifyLog logRecord) { |
| | | private boolean safeProcessOne(CloudWmsNotifyLog logRecord) { |
| | | Long id = logRecord.getId(); |
| | | log.info("云仓上报派发:单条抢占 id={}", id); |
| | | if (!cloudWmsNotifyLogService.tryClaimSending(id)) { |
| | | log.debug("云仓上报未抢到发送权 id={}", id); |
| | | return; |
| | | log.warn("云仓上报派发:单条未抢到 id={}", id); |
| | | return false; |
| | | } |
| | | try { |
| | | processOne(logRecord); |
| | |
| | | } finally { |
| | | cloudWmsNotifyLogService.clearSending(id); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private void processOne(CloudWmsNotifyLog logRecord) { |