From 2a34b52125d5fc356d65ee1e8912845dd601d4e3 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期五, 01 五月 2026 12:52:02 +0800
Subject: [PATCH] 多加入参数和修改规则
---
rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java | 216 ++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 178 insertions(+), 38 deletions(-)
diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
index 203ecb9..6777097 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -1,7 +1,9 @@
package com.vincent.rsf.server.manager.schedules;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
import com.vincent.rsf.server.api.service.CloudWmsReportService;
@@ -16,10 +18,14 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
/** 浜戜粨涓婃姤瀹氭椂浠诲姟 */
@Slf4j
@@ -38,51 +44,175 @@
@Autowired
private ConfigService configService;
- @Scheduled(cron = "0/30 * * * * ?")
+ /** sending=1 涓� Redis 鍗犱綅宸插け銆乽pdate_time 瓒呮椂锛氳ˉ鍋挎竻闆� */
+ @Scheduled(cron = "0 0/2 * * * ?")
+// @Scheduled(cron = "0/5 * * * * ?")
+ public void recoverStaleSending() {
+ try {
+ cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
+ } catch (Exception e) {
+ log.warn("浜戜粨 sending 琛ュ伩浠诲姟寮傚父锛歿}", e.getMessage());
+ }
+ }
+
+ @Scheduled(cron = "0/60 * * * * ?")
public void syncCloudWmsNotify() {
- // List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
if (pending.isEmpty()) {
+ log.debug("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� 0 鏉�");
return;
}
- long nowMs = System.currentTimeMillis();
- List<CloudWmsNotifyLog> ready = pending.stream()
- .filter(logRecord -> shouldProcess(logRecord, nowMs))
- .collect(Collectors.toList());
- ready.parallelStream().forEach(this::safeProcessOne);
+ log.info("浜戜粨涓婃姤璋冨害锛氭湰杞緟鍙戦�� {} 鏉�", pending.size());
+ dispatchPending(pending);
+ }
+
+ /** 鍚屽崟澶氭潯鍚堝苟涓婃姤 */
+ private void dispatchPending(List<CloudWmsNotifyLog> pending) {
+ String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+ LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
+ for (CloudWmsNotifyLog row : pending) {
+ if (!rtInOut.equals(row.getReportType())) {
+ continue;
+ }
+ String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
+ if (key == null) {
+ continue;
+ }
+ inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
+ }
+ Set<Long> done = new HashSet<>();
+ for (CloudWmsNotifyLog row : pending) {
+ Long rid = row.getId();
+ if (rid != null && done.contains(rid)) {
+ continue;
+ }
+ if (!rtInOut.equals(row.getReportType())) {
+ safeProcessOne(row);
+ if (rid != null) {
+ done.add(rid);
+ }
+ continue;
+ }
+ String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
+ if (key == null) {
+ safeProcessOne(row);
+ if (rid != null) {
+ done.add(rid);
+ }
+ continue;
+ }
+ List<CloudWmsNotifyLog> g = inOutGroups.get(key);
+ if (g != null && g.size() >= 2) {
+ safeProcessMergedInOutGroup(g);
+ for (CloudWmsNotifyLog x : g) {
+ if (x.getId() != null) {
+ done.add(x.getId());
+ }
+ }
+ } else {
+ safeProcessOne(row);
+ if (rid != null) {
+ done.add(rid);
+ }
+ }
+ }
+ }
+
+ private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
+ List<Long> claimedIds = new ArrayList<>();
+ try {
+ for (CloudWmsNotifyLog row : group) {
+ Long id = row.getId();
+ if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
+ log.debug("浜戜粨涓婃姤鍚屾壒鍚堝苟鏈姠鍒板彂閫佹潈 id={}", id);
+ return;
+ }
+ claimedIds.add(id);
+ }
+ processMergedInOut(group);
+ } catch (Exception e) {
+ log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟寮傚父锛歿}", e.getMessage());
+ } finally {
+ for (Long id : claimedIds) {
+ cloudWmsNotifyLogService.clearSending(id);
+ }
+ }
+ }
+
+ private void processMergedInOut(List<CloudWmsNotifyLog> group) {
+ Date now = new Date();
+ List<InOutResultReportParam> lines = new ArrayList<>();
+ try {
+ for (CloudWmsNotifyLog row : group) {
+ lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody()));
+ }
+ } catch (IOException e) {
+ String msg = "鍙嶅簭鍒楀寲澶辫触: " + e.getMessage();
+ for (CloudWmsNotifyLog row : group) {
+ int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+ setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount());
+ }
+ return;
+ }
+ if (lines.isEmpty()) {
+ return;
+ }
+ String mergedBody;
+ try {
+ mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines));
+ } catch (JsonProcessingException e) {
+ for (CloudWmsNotifyLog row : group) {
+ int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+ setFailResult(row, row.getRequestBody(), "鍚堝苟璇锋眰浣撳簭鍒楀寲澶辫触: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
+ }
+ return;
+ }
+ log.info("浜戜粨涓婃姤寮�濮嬶紙鍚屽崟鍚堝苟锛夛紝ids={}锛宺equestBody={}", idsOf(group), mergedBody);
+ try {
+ Map<String, Object> res = cloudWmsReportService.reportInOutResults(lines);
+ for (CloudWmsNotifyLog row : group) {
+ int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+ updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount());
+ }
+ } catch (FeignException e) {
+ String responseBody = e.contentUTF8();
+ String fullMsg = "status=" + e.status() + "锛宮essage=" + e.getMessage()
+ + (responseBody == null || responseBody.isEmpty() ? "" : "锛宺esponseBody=" + responseBody);
+ log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟璇锋眰澶辫触锛歿}", fullMsg);
+ for (CloudWmsNotifyLog row : group) {
+ int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+ setFailResult(row, mergedBody, "璇锋眰寮傚父: " + fullMsg, nextRetry, now, row.getMaxRetryCount());
+ }
+ } catch (Exception e) {
+ log.warn("浜戜粨涓婃姤鍚屾壒鍚堝苟璇锋眰澶辫触锛歿}", e.getMessage());
+ for (CloudWmsNotifyLog row : group) {
+ int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
+ setFailResult(row, mergedBody, "璇锋眰寮傚父: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
+ }
+ }
+ }
+
+ private static List<Long> idsOf(List<CloudWmsNotifyLog> group) {
+ List<Long> ids = new ArrayList<>(group.size());
+ for (CloudWmsNotifyLog row : group) {
+ ids.add(row.getId());
+ }
+ return ids;
}
private void safeProcessOne(CloudWmsNotifyLog logRecord) {
+ Long id = logRecord.getId();
+ if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
+ log.debug("浜戜粨涓婃姤鏈姠鍒板彂閫佹潈 id={}", id);
+ return;
+ }
try {
processOne(logRecord);
} catch (Exception e) {
log.warn("浜戜粨涓婃姤瀹氭椂浠诲姟澶勭悊鍗曟潯寮傚父锛宨d={}锛宐izRef={}锛歿}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
+ } finally {
+ cloudWmsNotifyLogService.clearSending(id);
}
- }
-
- private boolean shouldProcess(CloudWmsNotifyLog logRecord, long nowMs) {
- Integer maxRetry = logRecord.getMaxRetryCount();
- Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
- if (maxRetry == null || intervalSeconds == null) {
- log.warn("浜戜粨涓婃姤寰呭姙璺宠繃锛氶噸璇曞弬鏁扮己澶憋紝id={}锛宐izRef={}锛宮axRetry={}锛宨ntervalSeconds={}",
- logRecord.getId(), logRecord.getBizRef(), maxRetry, intervalSeconds);
- return false;
- }
- if (!isInfiniteRetry(maxRetry)
- && logRecord.getRetryCount() != null
- && logRecord.getRetryCount() >= maxRetry) {
- log.info("浜戜粨涓婃姤寰呭姙璺宠繃锛氶噸璇曟鏁板凡杈句笂闄愶紝id={}锛宐izRef={}锛宺etryCount={}锛宮axRetry={}",
- logRecord.getId(), logRecord.getBizRef(), logRecord.getRetryCount(), maxRetry);
- return false;
- }
- int effectiveIntervalSeconds = Math.max(0, intervalSeconds);
- if (logRecord.getLastNotifyTime() != null) {
- long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
- if (elapsed < effectiveIntervalSeconds) {
- return false;
- }
- }
- return true;
}
private void processOne(CloudWmsNotifyLog logRecord) {
@@ -91,15 +221,24 @@
Date now = new Date();
int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
int effectiveMaxRetry = logRecord.getMaxRetryCount();
+ String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
+ String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust();
log.info("浜戜粨涓婃姤寮�濮嬶紝id={}锛宐izRef={}锛宺eportType={}锛宎ttempt={}锛宺equestBody={}",
logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
try {
- if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
- InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
- Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
+ if (rtInOut.equals(reportType)) {
+ JsonNode root = objectMapper.readTree(requestBody);
+ Map<String, Object> res;
+ if (root.has("lines") && root.get("lines").isArray()) {
+ InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class);
+ res = cloudWmsReportService.reportInOutResults(batch.getLines());
+ } else {
+ InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
+ res = cloudWmsReportService.reportInOutResult(param);
+ }
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
- } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
+ } else if (rtAdj.equals(reportType)) {
InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
@@ -138,6 +277,7 @@
logRecord.setLastNotifyTime(now);
logRecord.setRetryCount(nextRetry);
logRecord.setNotifyStatus(status);
+ logRecord.setSending(0);
logRecord.setUpdateTime(now);
cloudWmsNotifyLogService.updateById(logRecord);
log.info("浜戜粨涓婃姤缁撴潫锛宨d={}锛宐izRef={}锛宎ttempt={}锛宯otifyStatus={}锛宺esponseBody={}",
@@ -149,11 +289,11 @@
logRecord.setLastResponseBody(truncateForStore(errorMsg));
logRecord.setLastNotifyTime(now);
logRecord.setRetryCount(nextRetry);
- // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
? cloudWmsNotifyLogService.getNotifyStatusFail()
: cloudWmsNotifyLogService.getNotifyStatusPending();
logRecord.setNotifyStatus(status);
+ logRecord.setSending(0);
logRecord.setUpdateTime(now);
cloudWmsNotifyLogService.updateById(logRecord);
log.warn("浜戜粨涓婃姤澶辫触锛宨d={}锛宐izRef={}锛宎ttempt={}锛宯otifyStatus={}锛宔rror={}",
--
Gitblit v1.9.1