cl
6 天以前 e23efd75ca74df6b35a0c03b8e8fc0712c9d4544
rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -62,11 +62,11 @@
                log.debug("云仓上报调度:本轮待发送 0 条");
                return;
            }
            log.info("云仓上报调度:本轮待发送 {} 条", pending.size());
            log.info("云仓上报调度:开始派发");
            log.debug("云仓上报调度:本轮待发送 {} 条", pending.size());
            log.debug("云仓上报调度:开始派发");
            dispatchPending(pending);
            log.info("云仓上报调度:派发完成");
            log.info("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0);
            log.debug("云仓上报调度:派发完成");
            log.debug("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0);
        } catch (Exception e) {
            log.error("云仓上报调度异常,已耗时 {} ms", System.currentTimeMillis() - t0, e);
        }
@@ -80,17 +80,17 @@
    /** 同单多条合并上报 */
    private void dispatchPending(List<CloudWmsNotifyLog> pending) {
        int rowsReported = 0;
        log.info("云仓上报派发:开始,待办 {} 条", pending.size());
        log.debug("云仓上报派发:开始,待办 {} 条", pending.size());
        long tCfg = System.currentTimeMillis();
        log.info("云仓上报派发:读取上报类型配置");
        log.debug("云仓上报派发:读取上报类型配置");
        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
        log.info("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
        log.debug("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
        LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
        int n = pending.size();
        for (int i = 0; i < n; i++) {
            CloudWmsNotifyLog row = pending.get(i);
            if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) {
                log.info("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId());
                log.debug("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId());
            }
            if (!rtInOut.equals(row.getReportType())) {
                continue;
@@ -101,7 +101,7 @@
            }
            inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
        }
        log.info("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size());
        log.debug("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size());
        Set<Long> done = new HashSet<>();
        for (CloudWmsNotifyLog row : pending) {
            Long rid = row.getId();
@@ -112,7 +112,7 @@
                if (isSendingBusy(row)) {
                    continue;
                }
                log.info("云仓上报派发:其它类型单条 id={}", rid);
                log.debug("云仓上报派发:其它类型单条 id={}", rid);
                if (safeProcessOne(row)) {
                    rowsReported++;
                    if (rid != null) {
@@ -126,7 +126,7 @@
                if (isSendingBusy(row)) {
                    continue;
                }
                log.info("云仓上报派发:无合并键单条 id={}", rid);
                log.debug("云仓上报派发:无合并键单条 id={}", rid);
                if (safeProcessOne(row)) {
                    rowsReported++;
                    if (rid != null) {
@@ -152,12 +152,12 @@
                continue;
            }
            if (work.size() >= 2) {
                log.info("云仓上报派发:同单合并 {} 条", work.size());
                log.debug("云仓上报派发:同单合并 {} 条", work.size());
                Set<Long> handled = safeProcessMergedInOutGroup(work);
                done.addAll(handled);
                rowsReported += handled.size();
            } else {
                log.info("云仓上报派发:入出库单条 id={}", work.get(0).getId());
                log.debug("云仓上报派发:入出库单条 id={}", work.get(0).getId());
                if (safeProcessOne(work.get(0))) {
                    rowsReported++;
                    if (work.get(0).getId() != null) {
@@ -166,11 +166,11 @@
                }
            }
        }
        log.info("云仓上报派发:遍历结束");
        log.debug("云仓上报派发:遍历结束");
        if (rowsReported == 0) {
            log.warn("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size());
            log.debug("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size());
        } else {
            log.info("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported);
            log.debug("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported);
        }
    }
@@ -178,25 +178,25 @@
        Set<Long> handled = new HashSet<>();
        List<Long> claimedIds = new ArrayList<>();
        List<CloudWmsNotifyLog> claimedRows = new ArrayList<>();
        log.info("云仓上报派发:合并组抢占,行数 {}", group.size());
        log.debug("云仓上报派发:合并组抢占,行数 {}", group.size());
        try {
            for (CloudWmsNotifyLog row : group) {
                Long id = row.getId();
                log.debug("云仓上报派发:合并抢占尝试 id={}", id);
                if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
                    log.warn("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id);
                    log.debug("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id);
                    continue;
                }
                claimedIds.add(id);
                claimedRows.add(row);
            }
            if (claimedRows.isEmpty()) {
                log.warn("云仓上报派发:合并组抢占 0 行");
                log.debug("云仓上报派发:合并组抢占 0 行");
                return handled;
            }
            log.info("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size());
            log.debug("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size());
            processMergedInOut(claimedRows);
            log.info("云仓上报派发:合并组云仓调用已返回");
            log.debug("云仓上报派发:合并组云仓调用已返回");
            handled.addAll(claimedIds);
        } catch (Exception e) {
            log.warn("云仓上报同批合并异常:{}", e.getMessage());
@@ -271,9 +271,9 @@
    private boolean safeProcessOne(CloudWmsNotifyLog logRecord) {
        Long id = logRecord.getId();
        log.info("云仓上报派发:单条抢占 id={}", id);
        log.debug("云仓上报派发:单条抢占 id={}", id);
        if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
            log.warn("云仓上报派发:单条未抢到 id={}", id);
            log.debug("云仓上报派发:单条未抢到 id={}", id);
            return false;
        }
        try {