cl
6 天以前 50393719d85fc30438456b0d0f065573a404fba5
rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -1,27 +1,40 @@
package com.vincent.rsf.server.manager.schedules;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
import com.vincent.rsf.server.api.service.CloudWmsReportService;
import com.vincent.rsf.server.manager.constant.CloudWmsInoutReportMode;
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
import com.vincent.rsf.server.system.entity.Config;
import com.vincent.rsf.server.system.service.ConfigService;
import feign.FeignException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** 云仓上报定时任务 */
@Slf4j
@Component
public class CloudWmsNotifySchedule {
    private static final int BATCH_LIMIT = 50;
    private static final int BATCH_LIMIT = -1;
    private static final int STORE_BODY_MAX_CHARS_DEFAULT = 2000;
    @Autowired
    private CloudWmsNotifyLogService cloudWmsNotifyLogService;
@@ -29,39 +42,291 @@
    private CloudWmsReportService cloudWmsReportService;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private ConfigService configService;
    @Scheduled(cron = "0/30 * * * * ?")
    public void syncCloudWmsNotify() {
        // List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
        List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
        if (pending.isEmpty()) {
            return;
    @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}")
    public void recoverStaleSending() {
        try {
            cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
        } catch (Exception e) {
            log.warn("云仓 sending 补偿任务异常:{}", e.getMessage());
        }
        long nowMs = System.currentTimeMillis();
        for (CloudWmsNotifyLog logRecord : pending) {
            try {
                Integer maxRetry = logRecord.getMaxRetryCount();
                Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
                if (maxRetry == null || intervalSeconds == null || intervalSeconds <= 0) {
                    continue;
    }
    @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}")
    public void syncCloudWmsNotify() {
        long t0 = System.currentTimeMillis();
        try {
            List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
            if (pending.isEmpty()) {
                log.debug("云仓上报调度:本轮待发送 0 条");
                return;
            }
            log.debug("云仓上报调度:本轮待发送 {} 条", pending.size());
            log.debug("云仓上报调度:开始派发");
            dispatchPending(pending);
            log.debug("云仓上报调度:派发完成");
            log.debug("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0);
        } catch (Exception e) {
            log.error("云仓上报调度异常,已耗时 {} ms", System.currentTimeMillis() - t0, e);
        }
    }
    private static boolean isSendingBusy(CloudWmsNotifyLog row) {
        Integer s = row.getSending();
        return s != null && s != 0;
    }
    /** CLOUD_WMS_INOUT_REPORT_MODE,缺省 immediate */
    private String resolveInOutReportMode() {
        try {
            Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_INOUT_REPORT_MODE);
            if (cfg != null && cfg.getVal() != null) {
                String v = cfg.getVal().trim();
                if (!v.isEmpty()) {
                    return v.toLowerCase();
                }
                // if (logRecord.getRetryCount() != null && logRecord.getRetryCount() >= maxRetry) {
                if (!isInfiniteRetry(maxRetry)
                        && logRecord.getRetryCount() != null
                        && logRecord.getRetryCount() >= maxRetry) {
                    continue;
                }
                if (logRecord.getLastNotifyTime() != null) {
                    long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
                    if (elapsed < intervalSeconds) {
                        continue;
                    }
                }
                processOne(logRecord);
            } catch (Exception e) {
                log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
            }
        } catch (Exception ignored) {
        }
        return CloudWmsInoutReportMode.IMMEDIATE;
    }
    /** 同单多条合并上报 */
    private void dispatchPending(List<CloudWmsNotifyLog> pending) {
        int rowsReported = 0;
        log.debug("云仓上报派发:开始,待办 {} 条", pending.size());
        long tCfg = System.currentTimeMillis();
        log.debug("云仓上报派发:读取上报类型配置");
        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
        log.debug("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
        String inoutMode = resolveInOutReportMode();
        boolean singleRowOnly = CloudWmsInoutReportMode.SINGLE.equals(inoutMode);
        if (singleRowOnly) {
            log.debug("云仓上报派发:入出库模式 single,不合并同单多条");
        }
        LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
        int n = pending.size();
        for (int i = 0; i < n; i++) {
            CloudWmsNotifyLog row = pending.get(i);
            if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) {
                log.debug("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId());
            }
            if (!rtInOut.equals(row.getReportType())) {
                continue;
            }
            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
            if (key == null) {
                continue;
            }
            if (!singleRowOnly) {
                inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
            }
        }
        log.debug("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size());
        Set<Long> done = new HashSet<>();
        for (CloudWmsNotifyLog row : pending) {
            Long rid = row.getId();
            if (rid != null && done.contains(rid)) {
                continue;
            }
            if (!rtInOut.equals(row.getReportType())) {
                if (isSendingBusy(row)) {
                    continue;
                }
                log.debug("云仓上报派发:其它类型单条 id={}", rid);
                if (safeProcessOne(row)) {
                    rowsReported++;
                    if (rid != null) {
                        done.add(rid);
                    }
                }
                continue;
            }
            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
            if (key == null) {
                if (isSendingBusy(row)) {
                    continue;
                }
                log.debug("云仓上报派发:无合并键单条 id={}", rid);
                if (safeProcessOne(row)) {
                    rowsReported++;
                    if (rid != null) {
                        done.add(rid);
                    }
                }
                continue;
            }
            if (singleRowOnly) {
                if (isSendingBusy(row)) {
                    continue;
                }
                log.debug("云仓上报派发:single 模式单条 id={}", rid);
                if (safeProcessOne(row)) {
                    rowsReported++;
                    if (rid != null) {
                        done.add(rid);
                    }
                }
                continue;
            }
            List<CloudWmsNotifyLog> g = inOutGroups.get(key);
            List<CloudWmsNotifyLog> work = new ArrayList<>();
            if (g != null) {
                for (CloudWmsNotifyLog x : g) {
                    if (x.getId() == null || done.contains(x.getId())) {
                        continue;
                    }
                    if (isSendingBusy(x)) {
                        continue;
                    }
                    work.add(x);
                }
            }
            if (work.isEmpty()) {
                continue;
            }
            if (work.size() >= 2) {
                log.debug("云仓上报派发:同单合并 {} 条", work.size());
                Set<Long> handled = safeProcessMergedInOutGroup(work);
                done.addAll(handled);
                rowsReported += handled.size();
            } else {
                log.debug("云仓上报派发:入出库单条 id={}", work.get(0).getId());
                if (safeProcessOne(work.get(0))) {
                    rowsReported++;
                    if (work.get(0).getId() != null) {
                        done.add(work.get(0).getId());
                    }
                }
            }
        }
        log.debug("云仓上报派发:遍历结束");
        if (rowsReported == 0) {
            log.debug("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size());
        } else {
            log.debug("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported);
        }
    }
    private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
        Set<Long> handled = new HashSet<>();
        List<Long> claimedIds = new ArrayList<>();
        List<CloudWmsNotifyLog> claimedRows = new ArrayList<>();
        log.debug("云仓上报派发:合并组抢占,行数 {}", group.size());
        try {
            for (CloudWmsNotifyLog row : group) {
                Long id = row.getId();
                log.debug("云仓上报派发:合并抢占尝试 id={}", id);
                if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
                    log.debug("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id);
                    continue;
                }
                claimedIds.add(id);
                claimedRows.add(row);
            }
            if (claimedRows.isEmpty()) {
                log.debug("云仓上报派发:合并组抢占 0 行");
                return handled;
            }
            log.debug("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size());
            processMergedInOut(claimedRows);
            log.debug("云仓上报派发:合并组云仓调用已返回");
            handled.addAll(claimedIds);
        } catch (Exception e) {
            log.warn("云仓上报同批合并异常:{}", e.getMessage());
        } finally {
            for (Long id : claimedIds) {
                cloudWmsNotifyLogService.clearSending(id);
            }
        }
        return handled;
    }
    private void processMergedInOut(List<CloudWmsNotifyLog> group) {
        Date now = new Date();
        List<InOutResultReportParam> lines = new ArrayList<>();
        try {
            for (CloudWmsNotifyLog row : group) {
                lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody()));
            }
        } catch (IOException e) {
            String msg = "反序列化失败: " + e.getMessage();
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount());
            }
            return;
        }
        if (lines.isEmpty()) {
            return;
        }
        String mergedBody;
        try {
            mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines));
        } catch (JsonProcessingException e) {
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, row.getRequestBody(), "合并请求体序列化失败: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
            }
            return;
        }
        log.info("云仓上报开始(同单合并),ids={},requestBody={}", idsOf(group), mergedBody);
        try {
            Map<String, Object> res = cloudWmsReportService.reportInOutResults(lines);
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount());
            }
        } catch (FeignException e) {
            String err = feignErrorSummary(e);
            log.warn("云仓上报同批合并请求失败:{}", err);
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, mergedBody, "请求异常: " + err, nextRetry, now, row.getMaxRetryCount());
            }
        } catch (Exception e) {
            log.warn("云仓上报同批合并请求失败:{}", e.getMessage());
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, mergedBody, "请求异常: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
            }
        }
    }
    /** 打印全部返回日志 */
    private static String feignErrorSummary(FeignException e) {
        String body = e.contentUTF8();
        if (body != null && !body.trim().isEmpty()) {
            return "status=" + e.status() + ",responseBody=" + body.trim();
        }
        return "status=" + e.status() + ",message=" + e.getMessage();
    }
    private static List<Long> idsOf(List<CloudWmsNotifyLog> group) {
        List<Long> ids = new ArrayList<>(group.size());
        for (CloudWmsNotifyLog row : group) {
            ids.add(row.getId());
        }
        return ids;
    }
    private boolean safeProcessOne(CloudWmsNotifyLog logRecord) {
        Long id = logRecord.getId();
        log.debug("云仓上报派发:单条抢占 id={}", id);
        if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
            log.debug("云仓上报派发:单条未抢到 id={}", id);
            return false;
        }
        try {
            processOne(logRecord);
        } catch (Exception e) {
            log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
        } finally {
            cloudWmsNotifyLogService.clearSending(id);
        }
        return true;
    }
    private void processOne(CloudWmsNotifyLog logRecord) {
@@ -70,13 +335,24 @@
        Date now = new Date();
        int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
        int effectiveMaxRetry = logRecord.getMaxRetryCount();
        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
        String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust();
        log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}",
                logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
        try {
            if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
                InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
            if (rtInOut.equals(reportType)) {
                JsonNode root = objectMapper.readTree(requestBody);
                Map<String, Object> res;
                if (root.has("lines") && root.get("lines").isArray()) {
                    InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class);
                    res = cloudWmsReportService.reportInOutResults(batch.getLines());
                } else {
                    InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
                    res = cloudWmsReportService.reportInOutResult(param);
                }
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
            } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
            } else if (rtAdj.equals(reportType)) {
                InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
@@ -87,6 +363,10 @@
        } catch (JsonProcessingException e) {
            log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage());
            setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
        } catch (FeignException e) {
            String err = feignErrorSummary(e);
            log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), err);
            setFailResult(logRecord, requestBody, "请求异常: " + err, nextRetry, now, effectiveMaxRetry);
        } catch (Exception e) {
            log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
            setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
@@ -101,35 +381,60 @@
            responseJson = String.valueOf(res);
        }
        Object codeObj = res != null ? res.get("code") : null;
        boolean success = Integer.valueOf(200).equals(codeObj);
        int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusPending();
        if (!success && !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry) {
            status = cloudWmsNotifyLogService.getNotifyStatusFail();
        }
        logRecord.setLastRequestBody(requestBody);
        logRecord.setLastResponseBody(responseJson);
        Object statusObj = res != null ? res.get("status") : null;
        boolean success = Integer.valueOf(200).equals(codeObj) || Integer.valueOf(200).equals(statusObj);
        int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusFail();
        logRecord.setLastRequestBody(truncateForStore(requestBody));
        logRecord.setLastResponseBody(truncateForStore(responseJson));
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        logRecord.setNotifyStatus(status);
        logRecord.setSending(0);
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
        log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}",
                logRecord.getId(), logRecord.getBizRef(), nextRetry, status, responseJson);
    }
    private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) {
        logRecord.setLastRequestBody(requestBody);
        logRecord.setLastResponseBody(errorMsg);
        logRecord.setLastRequestBody(truncateForStore(requestBody));
        logRecord.setLastResponseBody(truncateForStore(errorMsg));
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
        logRecord.setNotifyStatus(!isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
        int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
                ? cloudWmsNotifyLogService.getNotifyStatusFail()
                : cloudWmsNotifyLogService.getNotifyStatusPending());
                : cloudWmsNotifyLogService.getNotifyStatusPending();
        logRecord.setNotifyStatus(status);
        logRecord.setSending(0);
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
        log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}",
                logRecord.getId(), logRecord.getBizRef(), nextRetry, logRecord.getNotifyStatus(), errorMsg);
    }
    /** maxRetry = -1 表示无限重发 */
    private boolean isInfiniteRetry(Integer maxRetry) {
        return maxRetry != null && maxRetry == -1;
    }
    private String truncateForStore(String body) {
        int maxChars = resolveStoreBodyMaxChars();
        if (body == null || body.length() <= maxChars) {
            return body;
        }
        return body.substring(0, maxChars);
    }
    private int resolveStoreBodyMaxChars() {
        try {
            Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_NOTIFY_STORE_BODY_MAX_CHARS);
            if (cfg == null || cfg.getVal() == null || cfg.getVal().trim().isEmpty()) {
                return STORE_BODY_MAX_CHARS_DEFAULT;
            }
            int parsed = Integer.parseInt(cfg.getVal().trim());
            return parsed > 0 ? parsed : STORE_BODY_MAX_CHARS_DEFAULT;
        } catch (Exception e) {
            return STORE_BODY_MAX_CHARS_DEFAULT;
        }
    }
}