From 50393719d85fc30438456b0d0f065573a404fba5 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期五, 01 五月 2026 17:26:31 +0800
Subject: [PATCH] 增加一个单条上报的模式

---
 rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java |  346 +++++++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 306 insertions(+), 40 deletions(-)

diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
index d5b3c13..875b320 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -1,23 +1,32 @@
 package com.vincent.rsf.server.manager.schedules;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload;
 import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
 import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
 import com.vincent.rsf.server.api.service.CloudWmsReportService;
+import com.vincent.rsf.server.manager.constant.CloudWmsInoutReportMode;
 import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
 import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
 import com.vincent.rsf.server.system.constant.GlobalConfigCode;
 import com.vincent.rsf.server.system.entity.Config;
 import com.vincent.rsf.server.system.service.ConfigService;
+import feign.FeignException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /** 浜戜粨涓婃姤瀹氭椂浠诲姟 */
 @Slf4j
@@ -36,45 +45,288 @@
     @Autowired
     private ConfigService configService;
 
-    @Scheduled(cron = "0/30 * * * * ?")
-    public void syncCloudWmsNotify() {
-        // List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
-        List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
-        if (pending.isEmpty()) {
-            return;
+    @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}")
+    public void recoverStaleSending() {
+        try {
+            cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
+        } catch (Exception e) {
+            log.warn("浜戜粨 sending 琛ュ伩浠诲姟寮傚父锛歿}", e.getMessage());
         }
-        long nowMs = System.currentTimeMillis();
-        for (CloudWmsNotifyLog logRecord : pending) {
-            try {
-                Integer maxRetry = logRecord.getMaxRetryCount();
-                Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
-                if (maxRetry == null || intervalSeconds == null) {
-                    log.warn("浜戜粨涓婃姤寰呭姙璺宠繃锛氶噸璇曞弬鏁扮己澶憋紝id={}锛宐izRef={}锛宮axRetry={}锛宨ntervalSeconds={}",
-                            logRecord.getId(), logRecord.getBizRef(), maxRetry, intervalSeconds);
-                    continue;
+    }
+
+    @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}")
+    public void syncCloudWmsNotify() {
+        long t0 = System.currentTimeMillis();
+        try {
+            List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
+            if (pending.isEmpty()) {
+                log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� 0 鏉�");
+                return;
+            }
+            log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� {} 鏉�", pending.size());
+            log.debug("浜戜粨涓婃姤璋冨害锛氬紑濮嬫淳鍙�");
+            dispatchPending(pending);
+            log.debug("浜戜粨涓婃姤璋冨害锛氭淳鍙戝畬鎴�");
+            log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞皟搴︾粨鏉燂紝鑰楁椂 {} ms", System.currentTimeMillis() - t0);
+        } catch (Exception e) {
+            log.error("浜戜粨涓婃姤璋冨害寮傚父锛屽凡鑰楁椂 {} ms", System.currentTimeMillis() - t0, e);
+        }
+    }
+
+    private static boolean isSendingBusy(CloudWmsNotifyLog row) {
+        Integer s = row.getSending();
+        return s != null && s != 0;
+    }
+
+    /** CLOUD_WMS_INOUT_REPORT_MODE锛岀己鐪� immediate */
+    private String resolveInOutReportMode() {
+        try {
+            Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_INOUT_REPORT_MODE);
+            if (cfg != null && cfg.getVal() != null) {
+                String v = cfg.getVal().trim();
+                if (!v.isEmpty()) {
+                    return v.toLowerCase();
                 }
-                int effectiveIntervalSeconds = Math.max(0, intervalSeconds);
-                // if (logRecord.getRetryCount() != null && logRecord.getRetryCount() >= maxRetry) {
-                if (!isInfiniteRetry(maxRetry)
-                        && logRecord.getRetryCount() != null
-                        && logRecord.getRetryCount() >= maxRetry) {
-                    log.info("浜戜粨涓婃姤寰呭姙璺宠繃锛氶噸璇曟鏁板凡杈句笂闄愶紝id={}锛宐izRef={}锛宺etryCount={}锛宮axRetry={}",
-                            logRecord.getId(), logRecord.getBizRef(), logRecord.getRetryCount(), maxRetry);
-                    continue;
-                }
-                if (logRecord.getLastNotifyTime() != null) {
-                    long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
-                    if (elapsed < effectiveIntervalSeconds) {
-//                        log.info("浜戜粨涓婃姤寰呭姙璺宠繃锛氭湭鍒伴噸璇曢棿闅旓紝id={}锛宐izRef={}锛宔lapsed={}s锛宨nterval={}s",
-//                                logRecord.getId(), logRecord.getBizRef(), elapsed, effectiveIntervalSeconds);
-                        continue;
-                    }
-                }
-                processOne(logRecord);
-            } catch (Exception e) {
-                log.warn("浜戜粨涓婃姤瀹氭椂浠诲姟澶勭悊鍗曟潯寮傚父锛宨d={}锛宐izRef={}锛歿}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
+            }
+        } catch (Exception ignored) {
+        }
+        return CloudWmsInoutReportMode.IMMEDIATE;
+    }
+
+    /** 鍚屽崟澶氭潯鍚堝苟涓婃姤 */
+    private void dispatchPending(List<CloudWmsNotifyLog> pending) {
+        int rowsReported = 0;
+        log.debug("浜戜粨涓婃姤娲惧彂锛氬紑濮嬶紝寰呭姙 {} 鏉�", pending.size());
+        long tCfg = System.currentTimeMillis();
+        log.debug("浜戜粨涓婃姤娲惧彂锛氳鍙栦笂鎶ョ被鍨嬮厤缃�");
+        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+        log.debug("浜戜粨涓婃姤娲惧彂锛氫笂鎶ョ被鍨�={}锛岃閰嶇疆鑰楁椂 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
+        String inoutMode = resolveInOutReportMode();
+        boolean singleRowOnly = CloudWmsInoutReportMode.SINGLE.equals(inoutMode);
+        if (singleRowOnly) {
+            log.debug("浜戜粨涓婃姤娲惧彂锛氬叆鍑哄簱妯″紡 single锛屼笉鍚堝苟鍚屽崟澶氭潯");
+        }
+        LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
+        int n = pending.size();
+        for (int i = 0; i < n; i++) {
+            CloudWmsNotifyLog row = pending.get(i);
+            if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) {
+                log.debug("浜戜粨涓婃姤娲惧彂锛氬垎缁勮繘搴� {}/{}锛宨d={}", i + 1, n, row.getId());
+            }
+            if (!rtInOut.equals(row.getReportType())) {
+                continue;
+            }
+            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
+            if (key == null) {
+                continue;
+            }
+            if (!singleRowOnly) {
+                inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
             }
         }
+        log.debug("浜戜粨涓婃姤娲惧彂锛氬叆鍑哄簱鍙垎缁勯敭 {} 涓�", inOutGroups.size());
+        Set<Long> done = new HashSet<>();
+        for (CloudWmsNotifyLog row : pending) {
+            Long rid = row.getId();
+            if (rid != null && done.contains(rid)) {
+                continue;
+            }
+            if (!rtInOut.equals(row.getReportType())) {
+                if (isSendingBusy(row)) {
+                    continue;
+                }
+                log.debug("浜戜粨涓婃姤娲惧彂锛氬叾瀹冪被鍨嬪崟鏉� id={}", rid);
+                if (safeProcessOne(row)) {
+                    rowsReported++;
+                    if (rid != null) {
+                        done.add(rid);
+                    }
+                }
+                continue;
+            }
+            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
+            if (key == null) {
+                if (isSendingBusy(row)) {
+                    continue;
+                }
+                log.debug("浜戜粨涓婃姤娲惧彂锛氭棤鍚堝苟閿崟鏉� id={}", rid);
+                if (safeProcessOne(row)) {
+                    rowsReported++;
+                    if (rid != null) {
+                        done.add(rid);
+                    }
+                }
+                continue;
+            }
+            if (singleRowOnly) {
+                if (isSendingBusy(row)) {
+                    continue;
+                }
+                log.debug("浜戜粨涓婃姤娲惧彂锛歴ingle 妯″紡鍗曟潯 id={}", rid);
+                if (safeProcessOne(row)) {
+                    rowsReported++;
+                    if (rid != null) {
+                        done.add(rid);
+                    }
+                }
+                continue;
+            }
+            List<CloudWmsNotifyLog> g = inOutGroups.get(key);
+            List<CloudWmsNotifyLog> work = new ArrayList<>();
+            if (g != null) {
+                for (CloudWmsNotifyLog x : g) {
+                    if (x.getId() == null || done.contains(x.getId())) {
+                        continue;
+                    }
+                    if (isSendingBusy(x)) {
+                        continue;
+                    }
+                    work.add(x);
+                }
+            }
+            if (work.isEmpty()) {
+                continue;
+            }
+            if (work.size() >= 2) {
+                log.debug("浜戜粨涓婃姤娲惧彂锛氬悓鍗曞悎骞� {} 鏉�", work.size());
+                Set<Long> handled = safeProcessMergedInOutGroup(work);
+                done.addAll(handled);
+                rowsReported += handled.size();
+            } else {
+                log.debug("浜戜粨涓婃姤娲惧彂锛氬叆鍑哄簱鍗曟潯 id={}", work.get(0).getId());
+                if (safeProcessOne(work.get(0))) {
+                    rowsReported++;
+                    if (work.get(0).getId() != null) {
+                        done.add(work.get(0).getId());
+                    }
+                }
+            }
+        }
+        log.debug("浜戜粨涓婃姤娲惧彂锛氶亶鍘嗙粨鏉�");
+        if (rowsReported == 0) {
+            log.debug("浜戜粨涓婃姤娲惧彂锛氬緟鍔� {} 鏉★紝瀹為檯澶勭悊 0 鏉�", pending.size());
+        } else {
+            log.debug("浜戜粨涓婃姤娲惧彂锛氬緟鍔� {} 鏉★紝瀹為檯澶勭悊 {} 鏉�", pending.size(), rowsReported);
+        }
+    }
+
+    private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
+        Set<Long> handled = new HashSet<>();
+        List<Long> claimedIds = new ArrayList<>();
+        List<CloudWmsNotifyLog> claimedRows = new ArrayList<>();
+        log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍鎶㈠崰锛岃鏁� {}", group.size());
+        try {
+            for (CloudWmsNotifyLog row : group) {
+                Long id = row.getId();
+                log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞舵姠鍗犲皾璇� id={}", id);
+                if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
+                    log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞舵姠鍗犳湭鎶㈠埌 id={}锛岃烦杩囨湰琛岀户缁叾瀹冭", id);
+                    continue;
+                }
+                claimedIds.add(id);
+                claimedRows.add(row);
+            }
+            if (claimedRows.isEmpty()) {
+                log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍鎶㈠崰 0 琛�");
+                return handled;
+            }
+            log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍宸叉姠鍗� {} 琛岋紝璇锋眰浜戜粨", claimedRows.size());
+            processMergedInOut(claimedRows);
+            log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍浜戜粨璋冪敤宸茶繑鍥�");
+            handled.addAll(claimedIds);
+        } catch (Exception e) {
+            log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟寮傚父锛歿}", e.getMessage());
+        } finally {
+            for (Long id : claimedIds) {
+                cloudWmsNotifyLogService.clearSending(id);
+            }
+        }
+        return handled;
+    }
+
+    private void processMergedInOut(List<CloudWmsNotifyLog> group) {
+        Date now = new Date();
+        List<InOutResultReportParam> lines = new ArrayList<>();
+        try {
+            for (CloudWmsNotifyLog row : group) {
+                lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody()));
+            }
+        } catch (IOException e) {
+            String msg = "鍙嶅簭鍒楀寲澶辫触: " + e.getMessage();
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount());
+            }
+            return;
+        }
+        if (lines.isEmpty()) {
+            return;
+        }
+        String mergedBody;
+        try {
+            mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines));
+        } catch (JsonProcessingException e) {
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, row.getRequestBody(), "鍚堝苟璇锋眰浣撳簭鍒楀寲澶辫触: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
+            }
+            return;
+        }
+        log.info("浜戜粨涓婃姤寮�濮嬶紙鍚屽崟鍚堝苟锛夛紝ids={}锛宺equestBody={}", idsOf(group), mergedBody);
+        try {
+            Map<String, Object> res = cloudWmsReportService.reportInOutResults(lines);
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount());
+            }
+        } catch (FeignException e) {
+            String err = feignErrorSummary(e);
+            log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟璇锋眰澶辫触锛歿}", err);
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, mergedBody, "璇锋眰寮傚父: " + err, nextRetry, now, row.getMaxRetryCount());
+            }
+        } catch (Exception e) {
+            log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟璇锋眰澶辫触锛歿}", e.getMessage());
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, mergedBody, "璇锋眰寮傚父: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
+            }
+        }
+    }
+
+    /** 鎵撳嵃鍏ㄩ儴杩斿洖鏃ュ織 */
+    private static String feignErrorSummary(FeignException e) {
+        String body = e.contentUTF8();
+        if (body != null && !body.trim().isEmpty()) {
+            return "status=" + e.status() + "锛宺esponseBody=" + body.trim();
+        }
+        return "status=" + e.status() + "锛宮essage=" + e.getMessage();
+    }
+
+    private static List<Long> idsOf(List<CloudWmsNotifyLog> group) {
+        List<Long> ids = new ArrayList<>(group.size());
+        for (CloudWmsNotifyLog row : group) {
+            ids.add(row.getId());
+        }
+        return ids;
+    }
+
+    private boolean safeProcessOne(CloudWmsNotifyLog logRecord) {
+        Long id = logRecord.getId();
+        log.debug("浜戜粨涓婃姤娲惧彂锛氬崟鏉℃姠鍗� id={}", id);
+        if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
+            log.debug("浜戜粨涓婃姤娲惧彂锛氬崟鏉℃湭鎶㈠埌 id={}", id);
+            return false;
+        }
+        try {
+            processOne(logRecord);
+        } catch (Exception e) {
+            log.warn("浜戜粨涓婃姤瀹氭椂浠诲姟澶勭悊鍗曟潯寮傚父锛宨d={}锛宐izRef={}锛歿}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
+        } finally {
+            cloudWmsNotifyLogService.clearSending(id);
+        }
+        return true;
     }
 
     private void processOne(CloudWmsNotifyLog logRecord) {
@@ -83,15 +335,24 @@
         Date now = new Date();
         int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
         int effectiveMaxRetry = logRecord.getMaxRetryCount();
+        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+        String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust();
         log.info("浜戜粨涓婃姤寮�濮嬶紝id={}锛宐izRef={}锛宺eportType={}锛宎ttempt={}锛宺equestBody={}",
                 logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
 
         try {
-            if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
-                InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
-                Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
+            if (rtInOut.equals(reportType)) {
+                JsonNode root = objectMapper.readTree(requestBody);
+                Map<String, Object> res;
+                if (root.has("lines") && root.get("lines").isArray()) {
+                    InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class);
+                    res = cloudWmsReportService.reportInOutResults(batch.getLines());
+                } else {
+                    InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
+                    res = cloudWmsReportService.reportInOutResult(param);
+                }
                 updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
-            } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
+            } else if (rtAdj.equals(reportType)) {
                 InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
                 Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
                 updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
@@ -102,6 +363,10 @@
         } catch (JsonProcessingException e) {
             log.warn("浜戜粨涓婃姤璇锋眰浣撳弽搴忓垪鍖栧け璐ワ紝id={}锛歿}", logRecord.getId(), e.getMessage());
             setFailResult(logRecord, requestBody, "鍙嶅簭鍒楀寲澶辫触: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
+        } catch (FeignException e) {
+            String err = feignErrorSummary(e);
+            log.warn("浜戜粨涓婃姤璇锋眰澶辫触锛宨d={}锛宐izRef={}锛歿}", logRecord.getId(), logRecord.getBizRef(), err);
+            setFailResult(logRecord, requestBody, "璇锋眰寮傚父: " + err, nextRetry, now, effectiveMaxRetry);
         } catch (Exception e) {
             log.warn("浜戜粨涓婃姤璇锋眰澶辫触锛宨d={}锛宐izRef={}锛歿}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
             setFailResult(logRecord, requestBody, "璇锋眰寮傚父: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
@@ -124,6 +389,7 @@
         logRecord.setLastNotifyTime(now);
         logRecord.setRetryCount(nextRetry);
         logRecord.setNotifyStatus(status);
+        logRecord.setSending(0);
         logRecord.setUpdateTime(now);
         cloudWmsNotifyLogService.updateById(logRecord);
         log.info("浜戜粨涓婃姤缁撴潫锛宨d={}锛宐izRef={}锛宎ttempt={}锛宯otifyStatus={}锛宺esponseBody={}",
@@ -135,11 +401,11 @@
         logRecord.setLastResponseBody(truncateForStore(errorMsg));
         logRecord.setLastNotifyTime(now);
         logRecord.setRetryCount(nextRetry);
-        // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
         int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
                 ? cloudWmsNotifyLogService.getNotifyStatusFail()
                 : cloudWmsNotifyLogService.getNotifyStatusPending();
         logRecord.setNotifyStatus(status);
+        logRecord.setSending(0);
         logRecord.setUpdateTime(now);
         cloudWmsNotifyLogService.updateById(logRecord);
         log.warn("浜戜粨涓婃姤澶辫触锛宨d={}锛宐izRef={}锛宎ttempt={}锛宯otifyStatus={}锛宔rror={}",

--
Gitblit v1.9.1