cl
6 天以前 2a34b52125d5fc356d65ee1e8912845dd601d4e3
rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -1,7 +1,9 @@
package com.vincent.rsf.server.manager.schedules;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
import com.vincent.rsf.server.api.service.CloudWmsReportService;
@@ -16,10 +18,14 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;
/** 云仓上报定时任务 */
@Slf4j
@@ -38,51 +44,175 @@
    @Autowired
    private ConfigService configService;
    @Scheduled(cron = "0/30 * * * * ?")
    /** sending=1 且 Redis 占位已失、update_time 超时:补偿清零 */
    @Scheduled(cron = "0 0/2 * * * ?")
//    @Scheduled(cron = "0/5 * * * * ?")
    public void recoverStaleSending() {
        try {
            cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
        } catch (Exception e) {
            log.warn("云仓 sending 补偿任务异常:{}", e.getMessage());
        }
    }
    @Scheduled(cron = "0/60 * * * * ?")
    public void syncCloudWmsNotify() {
        // List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
        List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
        if (pending.isEmpty()) {
            log.debug("云仓上报调度:本轮待发送 0 条");
            return;
        }
        long nowMs = System.currentTimeMillis();
        List<CloudWmsNotifyLog> ready = pending.stream()
                .filter(logRecord -> shouldProcess(logRecord, nowMs))
                .collect(Collectors.toList());
        ready.parallelStream().forEach(this::safeProcessOne);
        log.info("云仓上报调度:本轮待发送 {} 条", pending.size());
        dispatchPending(pending);
    }
    /** 同单多条合并上报 */
    private void dispatchPending(List<CloudWmsNotifyLog> pending) {
        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
        LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
        for (CloudWmsNotifyLog row : pending) {
            if (!rtInOut.equals(row.getReportType())) {
                continue;
            }
            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
            if (key == null) {
                continue;
            }
            inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
        }
        Set<Long> done = new HashSet<>();
        for (CloudWmsNotifyLog row : pending) {
            Long rid = row.getId();
            if (rid != null && done.contains(rid)) {
                continue;
            }
            if (!rtInOut.equals(row.getReportType())) {
                safeProcessOne(row);
                if (rid != null) {
                    done.add(rid);
                }
                continue;
            }
            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
            if (key == null) {
                safeProcessOne(row);
                if (rid != null) {
                    done.add(rid);
                }
                continue;
            }
            List<CloudWmsNotifyLog> g = inOutGroups.get(key);
            if (g != null && g.size() >= 2) {
                safeProcessMergedInOutGroup(g);
                for (CloudWmsNotifyLog x : g) {
                    if (x.getId() != null) {
                        done.add(x.getId());
                    }
                }
            } else {
                safeProcessOne(row);
                if (rid != null) {
                    done.add(rid);
                }
            }
        }
    }
    private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
        List<Long> claimedIds = new ArrayList<>();
        try {
            for (CloudWmsNotifyLog row : group) {
                Long id = row.getId();
                if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
                    log.debug("云仓上报同批合并未抢到发送权 id={}", id);
                    return;
                }
                claimedIds.add(id);
            }
            processMergedInOut(group);
        } catch (Exception e) {
            log.warn("云仓上报同批合并异常:{}", e.getMessage());
        } finally {
            for (Long id : claimedIds) {
                cloudWmsNotifyLogService.clearSending(id);
            }
        }
    }
    private void processMergedInOut(List<CloudWmsNotifyLog> group) {
        Date now = new Date();
        List<InOutResultReportParam> lines = new ArrayList<>();
        try {
            for (CloudWmsNotifyLog row : group) {
                lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody()));
            }
        } catch (IOException e) {
            String msg = "反序列化失败: " + e.getMessage();
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount());
            }
            return;
        }
        if (lines.isEmpty()) {
            return;
        }
        String mergedBody;
        try {
            mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines));
        } catch (JsonProcessingException e) {
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, row.getRequestBody(), "合并请求体序列化失败: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
            }
            return;
        }
        log.info("云仓上报开始(同单合并),ids={},requestBody={}", idsOf(group), mergedBody);
        try {
            Map<String, Object> res = cloudWmsReportService.reportInOutResults(lines);
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount());
            }
        } catch (FeignException e) {
            String responseBody = e.contentUTF8();
            String fullMsg = "status=" + e.status() + ",message=" + e.getMessage()
                    + (responseBody == null || responseBody.isEmpty() ? "" : ",responseBody=" + responseBody);
            log.warn("云仓上报同批合并请求失败:{}", fullMsg);
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, mergedBody, "请求异常: " + fullMsg, nextRetry, now, row.getMaxRetryCount());
            }
        } catch (Exception e) {
            log.warn("云仓上报同批合并请求失败:{}", e.getMessage());
            for (CloudWmsNotifyLog row : group) {
                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
                setFailResult(row, mergedBody, "请求异常: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
            }
        }
    }
    private static List<Long> idsOf(List<CloudWmsNotifyLog> group) {
        List<Long> ids = new ArrayList<>(group.size());
        for (CloudWmsNotifyLog row : group) {
            ids.add(row.getId());
        }
        return ids;
    }
    private void safeProcessOne(CloudWmsNotifyLog logRecord) {
        Long id = logRecord.getId();
        if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
            log.debug("云仓上报未抢到发送权 id={}", id);
            return;
        }
        try {
            processOne(logRecord);
        } catch (Exception e) {
            log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
        } finally {
            cloudWmsNotifyLogService.clearSending(id);
        }
    }
    private boolean shouldProcess(CloudWmsNotifyLog logRecord, long nowMs) {
        Integer maxRetry = logRecord.getMaxRetryCount();
        Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
        if (maxRetry == null || intervalSeconds == null) {
            log.warn("云仓上报待办跳过:重试参数缺失,id={},bizRef={},maxRetry={},intervalSeconds={}",
                    logRecord.getId(), logRecord.getBizRef(), maxRetry, intervalSeconds);
            return false;
        }
        if (!isInfiniteRetry(maxRetry)
                && logRecord.getRetryCount() != null
                && logRecord.getRetryCount() >= maxRetry) {
            log.info("云仓上报待办跳过:重试次数已达上限,id={},bizRef={},retryCount={},maxRetry={}",
                    logRecord.getId(), logRecord.getBizRef(), logRecord.getRetryCount(), maxRetry);
            return false;
        }
        int effectiveIntervalSeconds = Math.max(0, intervalSeconds);
        if (logRecord.getLastNotifyTime() != null) {
            long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
            if (elapsed < effectiveIntervalSeconds) {
                return false;
            }
        }
        return true;
    }
    private void processOne(CloudWmsNotifyLog logRecord) {
@@ -91,15 +221,24 @@
        Date now = new Date();
        int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
        int effectiveMaxRetry = logRecord.getMaxRetryCount();
        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
        String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust();
        log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}",
                logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
        try {
            if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
                InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
            if (rtInOut.equals(reportType)) {
                JsonNode root = objectMapper.readTree(requestBody);
                Map<String, Object> res;
                if (root.has("lines") && root.get("lines").isArray()) {
                    InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class);
                    res = cloudWmsReportService.reportInOutResults(batch.getLines());
                } else {
                    InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
                    res = cloudWmsReportService.reportInOutResult(param);
                }
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
            } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
            } else if (rtAdj.equals(reportType)) {
                InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
@@ -138,6 +277,7 @@
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        logRecord.setNotifyStatus(status);
        logRecord.setSending(0);
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
        log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}",
@@ -149,11 +289,11 @@
        logRecord.setLastResponseBody(truncateForStore(errorMsg));
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
        int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
                ? cloudWmsNotifyLogService.getNotifyStatusFail()
                : cloudWmsNotifyLogService.getNotifyStatusPending();
        logRecord.setNotifyStatus(status);
        logRecord.setSending(0);
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
        log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}",