From cb2f02d60aac235f2f9e5ef777e0141fb697c264 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期五, 01 五月 2026 15:18:11 +0800
Subject: [PATCH] 多加入参数和修改规则
---
rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java | 133 ++++++++++++++++++++++++++++++++++----------
1 files changed, 102 insertions(+), 31 deletions(-)
diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
index 6777097..6f175d2 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -44,9 +44,7 @@
@Autowired
private ConfigService configService;
- /** sending=1 涓� Redis 鍗犱綅宸插け銆乽pdate_time 瓒呮椂锛氳ˉ鍋挎竻闆� */
- @Scheduled(cron = "0 0/2 * * * ?")
-// @Scheduled(cron = "0/5 * * * * ?")
+ @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}")
public void recoverStaleSending() {
try {
cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
@@ -55,22 +53,45 @@
}
}
- @Scheduled(cron = "0/60 * * * * ?")
+ @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}")
public void syncCloudWmsNotify() {
- List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
- if (pending.isEmpty()) {
- log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� 0 鏉�");
- return;
+ long t0 = System.currentTimeMillis();
+ try {
+ List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
+ if (pending.isEmpty()) {
+ log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� 0 鏉�");
+ return;
+ }
+ log.info("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� {} 鏉�", pending.size());
+ log.info("浜戜粨涓婃姤璋冨害锛氬紑濮嬫淳鍙�");
+ dispatchPending(pending);
+ log.info("浜戜粨涓婃姤璋冨害锛氭淳鍙戝畬鎴�");
+ log.info("浜戜粨涓婃姤璋冨害锛氭湰杞皟搴︾粨鏉燂紝鑰楁椂 {} ms", System.currentTimeMillis() - t0);
+ } catch (Exception e) {
+ log.error("浜戜粨涓婃姤璋冨害寮傚父锛屽凡鑰楁椂 {} ms", System.currentTimeMillis() - t0, e);
}
- log.info("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� {} 鏉�", pending.size());
- dispatchPending(pending);
+ }
+
+ private static boolean isSendingBusy(CloudWmsNotifyLog row) {
+ Integer s = row.getSending();
+ return s != null && s != 0;
}
/** 鍚屽崟澶氭潯鍚堝苟涓婃姤 */
private void dispatchPending(List<CloudWmsNotifyLog> pending) {
+ int rowsReported = 0;
+ log.info("浜戜粨涓婃姤娲惧彂锛氬紑濮嬶紝寰呭姙 {} 鏉�", pending.size());
+ long tCfg = System.currentTimeMillis();
+ log.info("浜戜粨涓婃姤娲惧彂锛氳鍙栦笂鎶ョ被鍨嬮厤缃�");
String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+ log.info("浜戜粨涓婃姤娲惧彂锛氫笂鎶ョ被鍨�={}锛岃閰嶇疆鑰楁椂 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
- for (CloudWmsNotifyLog row : pending) {
+ int n = pending.size();
+ for (int i = 0; i < n; i++) {
+ CloudWmsNotifyLog row = pending.get(i);
+ if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) {
+ log.info("浜戜粨涓婃姤娲惧彂锛氬垎缁勮繘搴� {}/{}锛宨d={}", i + 1, n, row.getId());
+ }
if (!rtInOut.equals(row.getReportType())) {
continue;
}
@@ -80,6 +101,7 @@
}
inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
}
+ log.info("浜戜粨涓婃姤娲惧彂锛氬叆鍑哄簱鍙垎缁勯敭 {} 涓�", inOutGroups.size());
Set<Long> done = new HashSet<>();
for (CloudWmsNotifyLog row : pending) {
Long rid = row.getId();
@@ -87,49 +109,95 @@
continue;
}
if (!rtInOut.equals(row.getReportType())) {
- safeProcessOne(row);
- if (rid != null) {
- done.add(rid);
+ if (isSendingBusy(row)) {
+ continue;
+ }
+ log.info("浜戜粨涓婃姤娲惧彂锛氬叾瀹冪被鍨嬪崟鏉� id={}", rid);
+ if (safeProcessOne(row)) {
+ rowsReported++;
+ if (rid != null) {
+ done.add(rid);
+ }
}
continue;
}
String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
if (key == null) {
- safeProcessOne(row);
- if (rid != null) {
- done.add(rid);
+ if (isSendingBusy(row)) {
+ continue;
+ }
+ log.info("浜戜粨涓婃姤娲惧彂锛氭棤鍚堝苟閿崟鏉� id={}", rid);
+ if (safeProcessOne(row)) {
+ rowsReported++;
+ if (rid != null) {
+ done.add(rid);
+ }
}
continue;
}
List<CloudWmsNotifyLog> g = inOutGroups.get(key);
- if (g != null && g.size() >= 2) {
- safeProcessMergedInOutGroup(g);
+ List<CloudWmsNotifyLog> work = new ArrayList<>();
+ if (g != null) {
for (CloudWmsNotifyLog x : g) {
- if (x.getId() != null) {
- done.add(x.getId());
+ if (x.getId() == null || done.contains(x.getId())) {
+ continue;
}
+ if (isSendingBusy(x)) {
+ continue;
+ }
+ work.add(x);
}
+ }
+ if (work.isEmpty()) {
+ continue;
+ }
+ if (work.size() >= 2) {
+ log.info("浜戜粨涓婃姤娲惧彂锛氬悓鍗曞悎骞� {} 鏉�", work.size());
+ Set<Long> handled = safeProcessMergedInOutGroup(work);
+ done.addAll(handled);
+ rowsReported += handled.size();
} else {
- safeProcessOne(row);
- if (rid != null) {
- done.add(rid);
+ log.info("浜戜粨涓婃姤娲惧彂锛氬叆鍑哄簱鍗曟潯 id={}", work.get(0).getId());
+ if (safeProcessOne(work.get(0))) {
+ rowsReported++;
+ if (work.get(0).getId() != null) {
+ done.add(work.get(0).getId());
+ }
}
}
}
+ log.info("浜戜粨涓婃姤娲惧彂锛氶亶鍘嗙粨鏉�");
+ if (rowsReported == 0) {
+ log.warn("浜戜粨涓婃姤娲惧彂锛氬緟鍔� {} 鏉★紝瀹為檯澶勭悊 0 鏉�", pending.size());
+ } else {
+ log.info("浜戜粨涓婃姤娲惧彂锛氬緟鍔� {} 鏉★紝瀹為檯澶勭悊 {} 鏉�", pending.size(), rowsReported);
+ }
}
- private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
+ private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
+ Set<Long> handled = new HashSet<>();
List<Long> claimedIds = new ArrayList<>();
+ List<CloudWmsNotifyLog> claimedRows = new ArrayList<>();
+ log.info("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍鎶㈠崰锛岃鏁� {}", group.size());
try {
for (CloudWmsNotifyLog row : group) {
Long id = row.getId();
+ log.debug("浜戜粨涓婃姤娲惧彂锛氬悎骞舵姠鍗犲皾璇� id={}", id);
if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
- log.debug("浜戜粨涓婃姤鍚屾壒鍚堝苟鏈姠鍒板彂閫佹潈 id={}", id);
- return;
+ log.warn("浜戜粨涓婃姤娲惧彂锛氬悎骞舵姠鍗犳湭鎶㈠埌 id={}锛岃烦杩囨湰琛岀户缁叾瀹冭", id);
+ continue;
}
claimedIds.add(id);
+ claimedRows.add(row);
}
- processMergedInOut(group);
+ if (claimedRows.isEmpty()) {
+ log.warn("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍鎶㈠崰 0 琛�");
+ return handled;
+ }
+ log.info("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍宸叉姠鍗� {} 琛岋紝璇锋眰浜戜粨", claimedRows.size());
+ processMergedInOut(claimedRows);
+ log.info("浜戜粨涓婃姤娲惧彂锛氬悎骞剁粍浜戜粨璋冪敤宸茶繑鍥�");
+ handled.addAll(claimedIds);
} catch (Exception e) {
log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟寮傚父锛歿}", e.getMessage());
} finally {
@@ -137,6 +205,7 @@
cloudWmsNotifyLogService.clearSending(id);
}
}
+ return handled;
}
private void processMergedInOut(List<CloudWmsNotifyLog> group) {
@@ -200,11 +269,12 @@
return ids;
}
- private void safeProcessOne(CloudWmsNotifyLog logRecord) {
+ private boolean safeProcessOne(CloudWmsNotifyLog logRecord) {
Long id = logRecord.getId();
+ log.info("浜戜粨涓婃姤娲惧彂锛氬崟鏉℃姠鍗� id={}", id);
if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
- log.debug("浜戜粨涓婃姤鏈姠鍒板彂閫佹潈 id={}", id);
- return;
+ log.warn("浜戜粨涓婃姤娲惧彂锛氬崟鏉℃湭鎶㈠埌 id={}", id);
+ return false;
}
try {
processOne(logRecord);
@@ -213,6 +283,7 @@
} finally {
cloudWmsNotifyLogService.clearSending(id);
}
+ return true;
}
private void processOne(CloudWmsNotifyLog logRecord) {
--
Gitblit v1.9.1