From 2a34b52125d5fc356d65ee1e8912845dd601d4e3 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期五, 01 五月 2026 12:52:02 +0800
Subject: [PATCH] 多加入参数和修改规则

---
 rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java |  216 ++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 178 insertions(+), 38 deletions(-)

diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
index 203ecb9..6777097 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -1,7 +1,9 @@
 package com.vincent.rsf.server.manager.schedules;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload;
 import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
 import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
 import com.vincent.rsf.server.api.service.CloudWmsReportService;
@@ -16,10 +18,14 @@
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
 
 /** 浜戜粨涓婃姤瀹氭椂浠诲姟 */
 @Slf4j
@@ -38,51 +44,175 @@
     @Autowired
     private ConfigService configService;
 
-    @Scheduled(cron = "0/30 * * * * ?")
+    /** sending=1 涓� Redis 鍗犱綅宸插け銆乽pdate_time 瓒呮椂锛氳ˉ鍋挎竻闆� */
+    @Scheduled(cron = "0 0/2 * * * ?")
+//    @Scheduled(cron = "0/5 * * * * ?")
+    public void recoverStaleSending() {
+        try {
+            cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
+        } catch (Exception e) {
+            log.warn("浜戜粨 sending 琛ュ伩浠诲姟寮傚父锛歿}", e.getMessage());
+        }
+    }
+
+    @Scheduled(cron = "0/60 * * * * ?")
     public void syncCloudWmsNotify() {
-        // List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
         List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
         if (pending.isEmpty()) {
+            log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� 0 鏉�");
             return;
         }
-        long nowMs = System.currentTimeMillis();
-        List<CloudWmsNotifyLog> ready = pending.stream()
-                .filter(logRecord -> shouldProcess(logRecord, nowMs))
-                .collect(Collectors.toList());
-        ready.parallelStream().forEach(this::safeProcessOne);
+        log.info("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� {} 鏉�", pending.size());
+        dispatchPending(pending);
+    }
+
+    /** 鍚屽崟澶氭潯鍚堝苟涓婃姤 */
+    private void dispatchPending(List<CloudWmsNotifyLog> pending) {
+        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+        LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
+        for (CloudWmsNotifyLog row : pending) {
+            if (!rtInOut.equals(row.getReportType())) {
+                continue;
+            }
+            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
+            if (key == null) {
+                continue;
+            }
+            inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
+        }
+        Set<Long> done = new HashSet<>();
+        for (CloudWmsNotifyLog row : pending) {
+            Long rid = row.getId();
+            if (rid != null && done.contains(rid)) {
+                continue;
+            }
+            if (!rtInOut.equals(row.getReportType())) {
+                safeProcessOne(row);
+                if (rid != null) {
+                    done.add(rid);
+                }
+                continue;
+            }
+            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
+            if (key == null) {
+                safeProcessOne(row);
+                if (rid != null) {
+                    done.add(rid);
+                }
+                continue;
+            }
+            List<CloudWmsNotifyLog> g = inOutGroups.get(key);
+            if (g != null && g.size() >= 2) {
+                safeProcessMergedInOutGroup(g);
+                for (CloudWmsNotifyLog x : g) {
+                    if (x.getId() != null) {
+                        done.add(x.getId());
+                    }
+                }
+            } else {
+                safeProcessOne(row);
+                if (rid != null) {
+                    done.add(rid);
+                }
+            }
+        }
+    }
+
+    private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
+        List<Long> claimedIds = new ArrayList<>();
+        try {
+            for (CloudWmsNotifyLog row : group) {
+                Long id = row.getId();
+                if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
+                    log.debug("浜戜粨涓婃姤鍚屾壒鍚堝苟鏈姠鍒板彂閫佹潈 id={}", id);
+                    return;
+                }
+                claimedIds.add(id);
+            }
+            processMergedInOut(group);
+        } catch (Exception e) {
+            log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟寮傚父锛歿}", e.getMessage());
+        } finally {
+            for (Long id : claimedIds) {
+                cloudWmsNotifyLogService.clearSending(id);
+            }
+        }
+    }
+
+    private void processMergedInOut(List<CloudWmsNotifyLog> group) {
+        Date now = new Date();
+        List<InOutResultReportParam> lines = new ArrayList<>();
+        try {
+            for (CloudWmsNotifyLog row : group) {
+                lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody()));
+            }
+        } catch (IOException e) {
+            String msg = "鍙嶅簭鍒楀寲澶辫触: " + e.getMessage();
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount());
+            }
+            return;
+        }
+        if (lines.isEmpty()) {
+            return;
+        }
+        String mergedBody;
+        try {
+            mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines));
+        } catch (JsonProcessingException e) {
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, row.getRequestBody(), "鍚堝苟璇锋眰浣撳簭鍒楀寲澶辫触: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
+            }
+            return;
+        }
+        log.info("浜戜粨涓婃姤寮�濮嬶紙鍚屽崟鍚堝苟锛夛紝ids={}锛宺equestBody={}", idsOf(group), mergedBody);
+        try {
+            Map<String, Object> res = cloudWmsReportService.reportInOutResults(lines);
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount());
+            }
+        } catch (FeignException e) {
+            String responseBody = e.contentUTF8();
+            String fullMsg = "status=" + e.status() + "锛宮essage=" + e.getMessage()
+                    + (responseBody == null || responseBody.isEmpty() ? "" : "锛宺esponseBody=" + responseBody);
+            log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟璇锋眰澶辫触锛歿}", fullMsg);
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, mergedBody, "璇锋眰寮傚父: " + fullMsg, nextRetry, now, row.getMaxRetryCount());
+            }
+        } catch (Exception e) {
+            log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟璇锋眰澶辫触锛歿}", e.getMessage());
+            for (CloudWmsNotifyLog row : group) {
+                int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+                setFailResult(row, mergedBody, "璇锋眰寮傚父: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
+            }
+        }
+    }
+
+    private static List<Long> idsOf(List<CloudWmsNotifyLog> group) {
+        List<Long> ids = new ArrayList<>(group.size());
+        for (CloudWmsNotifyLog row : group) {
+            ids.add(row.getId());
+        }
+        return ids;
     }
 
     private void safeProcessOne(CloudWmsNotifyLog logRecord) {
+        Long id = logRecord.getId();
+        if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
+            log.debug("浜戜粨涓婃姤鏈姠鍒板彂閫佹潈 id={}", id);
+            return;
+        }
         try {
             processOne(logRecord);
         } catch (Exception e) {
             log.warn("浜戜粨涓婃姤瀹氭椂浠诲姟澶勭悊鍗曟潯寮傚父锛宨d={}锛宐izRef={}锛歿}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
+        } finally {
+            cloudWmsNotifyLogService.clearSending(id);
         }
-    }
-
-    private boolean shouldProcess(CloudWmsNotifyLog logRecord, long nowMs) {
-        Integer maxRetry = logRecord.getMaxRetryCount();
-        Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
-        if (maxRetry == null || intervalSeconds == null) {
-            log.warn("浜戜粨涓婃姤寰呭姙璺宠繃锛氶噸璇曞弬鏁扮己澶憋紝id={}锛宐izRef={}锛宮axRetry={}锛宨ntervalSeconds={}",
-                    logRecord.getId(), logRecord.getBizRef(), maxRetry, intervalSeconds);
-            return false;
-        }
-        if (!isInfiniteRetry(maxRetry)
-                && logRecord.getRetryCount() != null
-                && logRecord.getRetryCount() >= maxRetry) {
-            log.info("浜戜粨涓婃姤寰呭姙璺宠繃锛氶噸璇曟鏁板凡杈句笂闄愶紝id={}锛宐izRef={}锛宺etryCount={}锛宮axRetry={}",
-                    logRecord.getId(), logRecord.getBizRef(), logRecord.getRetryCount(), maxRetry);
-            return false;
-        }
-        int effectiveIntervalSeconds = Math.max(0, intervalSeconds);
-        if (logRecord.getLastNotifyTime() != null) {
-            long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
-            if (elapsed < effectiveIntervalSeconds) {
-                return false;
-            }
-        }
-        return true;
     }
 
     private void processOne(CloudWmsNotifyLog logRecord) {
@@ -91,15 +221,24 @@
         Date now = new Date();
         int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
         int effectiveMaxRetry = logRecord.getMaxRetryCount();
+        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+        String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust();
         log.info("浜戜粨涓婃姤寮�濮嬶紝id={}锛宐izRef={}锛宺eportType={}锛宎ttempt={}锛宺equestBody={}",
                 logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
 
         try {
-            if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
-                InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
-                Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
+            if (rtInOut.equals(reportType)) {
+                JsonNode root = objectMapper.readTree(requestBody);
+                Map<String, Object> res;
+                if (root.has("lines") && root.get("lines").isArray()) {
+                    InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class);
+                    res = cloudWmsReportService.reportInOutResults(batch.getLines());
+                } else {
+                    InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
+                    res = cloudWmsReportService.reportInOutResult(param);
+                }
                 updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
-            } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
+            } else if (rtAdj.equals(reportType)) {
                 InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
                 Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
                 updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
@@ -138,6 +277,7 @@
         logRecord.setLastNotifyTime(now);
         logRecord.setRetryCount(nextRetry);
         logRecord.setNotifyStatus(status);
+        logRecord.setSending(0);
         logRecord.setUpdateTime(now);
         cloudWmsNotifyLogService.updateById(logRecord);
         log.info("浜戜粨涓婃姤缁撴潫锛宨d={}锛宐izRef={}锛宎ttempt={}锛宯otifyStatus={}锛宺esponseBody={}",
@@ -149,11 +289,11 @@
         logRecord.setLastResponseBody(truncateForStore(errorMsg));
         logRecord.setLastNotifyTime(now);
         logRecord.setRetryCount(nextRetry);
-        // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
         int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
                 ? cloudWmsNotifyLogService.getNotifyStatusFail()
                 : cloudWmsNotifyLogService.getNotifyStatusPending();
         logRecord.setNotifyStatus(status);
+        logRecord.setSending(0);
         logRecord.setUpdateTime(now);
         cloudWmsNotifyLogService.updateById(logRecord);
         log.warn("浜戜粨涓婃姤澶辫触锛宨d={}锛宐izRef={}锛宎ttempt={}锛宯otifyStatus={}锛宔rror={}",

--
Gitblit v1.9.1