From cb2f02d60aac235f2f9e5ef777e0141fb697c264 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期五, 01 五月 2026 15:18:11 +0800
Subject: [PATCH] 多加入参数和修改规则

---
 rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java |  133 ++++++++++++++++++++++++++++++++++----------
 1 files changed, 102 insertions(+), 31 deletions(-)

diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
index 6777097..6f175d2 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -44,9 +44,7 @@
     @Autowired
     private ConfigService configService;
 
-    /** sending=1 涓� Redis 鍗犱綅宸插け銆乽pdate_time 瓒呮椂锛氳ˉ鍋挎竻闆� */
-    @Scheduled(cron = "0 0/2 * * * ?")
-//    @Scheduled(cron = "0/5 * * * * ?")
+    @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}")
     public void recoverStaleSending() {
         try {
             cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
@@ -55,22 +53,45 @@
         }
     }
 
-    @Scheduled(cron = "0/60 * * * * ?")
+    @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}")
     public void syncCloudWmsNotify() {
-        List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
-        if (pending.isEmpty()) {
-            log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� 0 鏉�");
-            return;
+        long t0 = System.currentTimeMillis();
+        try {
+            List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
+            if (pending.isEmpty()) {
+                log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� 0 鏉�");
+                return;
+            }
+            log.info("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� {} 鏉�", pending.size());
+            log.info("浜戜粨涓婃姤璋冨害锛氬紑濮嬫淳鍙�");
+            dispatchPending(pending);
+            log.info("浜戜粨涓婃姤璋冨害锛氭淳鍙戝畬鎴�");
+            log.info("浜戜粨涓婃姤璋冨害锛氭湰杞皟搴︾粨鏉燂紝鑰楁椂 {} ms", System.currentTimeMillis() - t0);
+        } catch (Exception e) {
+            log.error("浜戜粨涓婃姤璋冨害寮傚父锛屽凡鑰楁椂 {} ms", System.currentTimeMillis() - t0, e);
         }
-        log.info("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� {} 鏉�", pending.size());
-        dispatchPending(pending);
+    }
+
+    private static boolean isSendingBusy(CloudWmsNotifyLog row) {
+        Integer s = row.getSending();
+        return s != null && s != 0;
     }
 
     /** 鍚屽崟澶氭潯鍚堝苟涓婃姤 */
     private void dispatchPending(List<CloudWmsNotifyLog> pending) {
+        int rowsReported = 0;
+        log.info("浜戜粨涓婃姤娲惧彂锛氬紑濮嬶紝寰呭姙 {} 鏉�", pending.size());
+        long tCfg = System.currentTimeMillis();
+        log.info("浜戜粨涓婃姤娲惧彂锛氳鍙栦笂鎶ョ被鍨嬮厤缃�");
         String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+        log.info("浜戜粨涓婃姤娲惧彂锛氫笂鎶ョ被鍨�={}锛岃閰嶇疆鑰楁椂 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
         LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
-        for (CloudWmsNotifyLog row : pending) {
+        int n = pending.size();
+        for (int i = 0; i < n; i++) {
+            CloudWmsNotifyLog row = pending.get(i);
+            if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) {
+                log.info("浜戜粨涓婃姤娲惧彂锛氬垎缁勮繘搴� {}/{}锛宨d={}", i + 1, n, row.getId());
+            }
             if (!rtInOut.equals(row.getReportType())) {
                 continue;
             }
@@ -80,6 +101,7 @@
             }
             inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
         }
+        log.info("浜戜粨涓婃姤娲惧彂锛氬叆鍑哄簱鍙垎缁勯敭 {} 涓�", inOutGroups.size());
         Set<Long> done = new HashSet<>();
         for (CloudWmsNotifyLog row : pending) {
             Long rid = row.getId();
@@ -87,49 +109,95 @@
                 continue;
             }
             if (!rtInOut.equals(row.getReportType())) {
-                safeProcessOne(row);
-                if (rid != null) {
-                    done.add(rid);
+                if (isSendingBusy(row)) {
+                    continue;
+                }
+                log.info("浜戜粨涓婃姤娲惧彂锛氬叾瀹冪被鍨嬪崟鏉� id={}", rid);
+                if (safeProcessOne(row)) {
+                    rowsReported++;
+                    if (rid != null) {
+                        done.add(rid);
+                    }
                 }
                 continue;
             }
             String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
             if (key == null) {
-                safeProcessOne(row);
-                if (rid != null) {
-                    done.add(rid);
+                if (isSendingBusy(row)) {
+                    continue;
+                }
+                log.info("浜戜粨涓婃姤娲惧彂锛氭棤鍚堝苟閿崟鏉� id={}", rid);
+                if (safeProcessOne(row)) {
+                    rowsReported++;
+                    if (rid != null) {
+                        done.add(rid);
+                    }
                 }
                 continue;
             }
             List<CloudWmsNotifyLog> g = inOutGroups.get(key);
-            if (g != null && g.size() >= 2) {
-                safeProcessMergedInOutGroup(g);
+            List<CloudWmsNotifyLog> work = new ArrayList<>();
+            if (g != null) {
                 for (CloudWmsNotifyLog x : g) {
-                    if (x.getId() != null) {
-                        done.add(x.getId());
+                    if (x.getId() == null || done.contains(x.getId())) {
+                        continue;
                     }
+                    if (isSendingBusy(x)) {
+                        continue;
+                    }
+                    work.add(x);
                 }
+            }
+            if (work.isEmpty()) {
+                continue;
+            }
+            if (work.size() >= 2) {
+                log.info("浜戜粨涓婃姤娲惧彂锛氬悓鍗曞悎骞� {} 鏉�", work.size());
+                Set<Long> handled = safeProcessMergedInOutGroup(work);
+                done.addAll(handled);
+                rowsReported += handled.size();
             } else {
-                safeProcessOne(row);
-                if (rid != null) {
-                    done.add(rid);
+                log.info("浜戜粨涓婃姤娲惧彂锛氬叆鍑哄簱鍗曟潯 id={}", work.get(0).getId());
+                if (safeProcessOne(work.get(0))) {
+                    rowsReported++;
+                    if (work.get(0).getId() != null) {
+                        done.add(work.get(0).getId());
+                    }
                 }
             }
         }
+        log.info("浜戜粨涓婃姤娲惧彂锛氶亶鍘嗙粨鏉�");
+        if (rowsReported == 0) {
+            log.warn("浜戜粨涓婃姤娲惧彂锛氬緟鍔� {} 鏉★紝瀹為檯澶勭悊 0 鏉�", pending.size());
+        } else {
+            log.info("浜戜粨涓婃姤娲惧彂锛氬緟鍔� {} 鏉★紝瀹為檯澶勭悊 {} 鏉�", pending.size(), rowsReported);
+        }
     }
 
-    private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
+    private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
+        Set<Long> handled = new HashSet<>();
         List<Long> claimedIds = new ArrayList<>();
+        List<CloudWmsNotifyLog> claimedRows = new ArrayList<>();
+        log.info("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍鎶㈠崰锛岃鏁� {}", group.size());
         try {
             for (CloudWmsNotifyLog row : group) {
                 Long id = row.getId();
+                log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞舵姠鍗犲皾璇� id={}", id);
                 if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
-                    log.debug("浜戜粨涓婃姤鍚屾壒鍚堝苟鏈姠鍒板彂閫佹潈 id={}", id);
-                    return;
+                    log.warn("浜戜粨涓婃姤娲惧彂锛氬悎骞舵姠鍗犳湭鎶㈠埌 id={}锛岃烦杩囨湰琛岀户缁叾瀹冭", id);
+                    continue;
                 }
                 claimedIds.add(id);
+                claimedRows.add(row);
             }
-            processMergedInOut(group);
+            if (claimedRows.isEmpty()) {
+                log.warn("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍鎶㈠崰 0 琛�");
+                return handled;
+            }
+            log.info("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍宸叉姠鍗� {} 琛岋紝璇锋眰浜戜粨", claimedRows.size());
+            processMergedInOut(claimedRows);
+            log.info("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍浜戜粨璋冪敤宸茶繑鍥�");
+            handled.addAll(claimedIds);
         } catch (Exception e) {
             log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟寮傚父锛歿}", e.getMessage());
         } finally {
@@ -137,6 +205,7 @@
                 cloudWmsNotifyLogService.clearSending(id);
             }
         }
+        return handled;
     }
 
     private void processMergedInOut(List<CloudWmsNotifyLog> group) {
@@ -200,11 +269,12 @@
         return ids;
     }
 
-    private void safeProcessOne(CloudWmsNotifyLog logRecord) {
+    private boolean safeProcessOne(CloudWmsNotifyLog logRecord) {
         Long id = logRecord.getId();
+        log.info("浜戜粨涓婃姤娲惧彂锛氬崟鏉℃姠鍗� id={}", id);
         if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
-            log.debug("浜戜粨涓婃姤鏈姠鍒板彂閫佹潈 id={}", id);
-            return;
+            log.warn("浜戜粨涓婃姤娲惧彂锛氬崟鏉℃湭鎶㈠埌 id={}", id);
+            return false;
         }
         try {
             processOne(logRecord);
@@ -213,6 +283,7 @@
         } finally {
             cloudWmsNotifyLogService.clearSending(id);
         }
+        return true;
     }
 
     private void processOne(CloudWmsNotifyLog logRecord) {

--
Gitblit v1.9.1