From 2a34b52125d5fc356d65ee1e8912845dd601d4e3 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期五, 01 五月 2026 12:52:02 +0800
Subject: [PATCH] 多加入参数和修改规则
---
rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java | 284 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 261 insertions(+), 23 deletions(-)
diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java b/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
index ab82ef8..c9726d0 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
@@ -1,33 +1,214 @@
package com.vincent.rsf.server.manager.service.impl;
+import com.vincent.rsf.framework.common.R;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
+import com.vincent.rsf.server.common.service.RedisService;
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
import com.vincent.rsf.server.manager.mapper.CloudWmsNotifyLogMapper;
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
import com.vincent.rsf.server.system.entity.Config;
import com.vincent.rsf.server.system.service.ConfigService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import java.io.IOException;
+import java.util.Date;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+@Slf4j
@Service
public class CloudWmsNotifyLogServiceImpl extends ServiceImpl<CloudWmsNotifyLogMapper, CloudWmsNotifyLog> implements CloudWmsNotifyLogService {
+ /** 鍗曟潯寰呭姙銆屾鍦ㄤ笂鎶ャ�峈edis 鍗犱綅绉掓暟锛圫ET NX EX锛� */
+ private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120;
+ /** sending=1 浣� Redis 鏃犲崰浣嶏細update_time 鏃╀簬姝ゆ椂闀匡紙鍒嗛挓锛夊垯琛ュ伩娓呴浂 */
+ private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 6;
+
@Autowired
private ConfigService configService;
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired(required = false)
+ private RedisService redisService;
+
+ private static final String CLOUD_WMS_REDIS_FLAG = "cloudwms";
+
+ private boolean useSendingRedis() {
+ return redisService != null
+ && Boolean.TRUE.equals(redisService.initialize)
+ && CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS > 0;
+ }
+
+ private static String sendingRedisSubKey(Long id) {
+ return "sending." + id;
+ }
+
+ /** Redis 鍗犱綅瀛樺湪涓旀湭杩囨湡鏃惰涓烘鍦ㄤ笂鎶� */
+ private boolean isSendingHeldInRedis(Long id) {
+ if (id == null || !useSendingRedis()) {
+ return false;
+ }
+ try {
+ String v = redisService.getValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+ return StringUtils.isNotBlank(v);
+ } catch (Exception e) {
+ return false;
+ }
+ }
@Override
public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) {
- Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit));
LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>()
- .eq(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending())
- .lt(CloudWmsNotifyLog::getRetryCount, maxRetry)
+ // 浠呮煡璇㈡暟鎹簱閰嶇疆鐘舵�侊細寰呴�氱煡 + 澶辫触锛堝彲閲嶈瘯锛�
+ .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
+ .apply("(send_hold IS NULL OR send_hold = 0)")
+ // 浠呮煡璇㈠彲閲嶈瘯鏁版嵁锛氭棤闄愰噸璇曘�佹湭閰嶇疆涓婇檺銆佹垨鏈揪鍒颁笂闄�
+ .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)")
+ // 浠呮煡璇㈠凡鍒伴噸璇曟椂闂寸殑鏁版嵁锛岄伩鍏嶅墠 50 鏉℃湭鍒伴棿闅斿鑷村悗缁褰曢暱鏈熼ゥ楗�
+ .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)")
+ //缂洪噸璇曞弬鏁扮殑涓嶈繘鍏ュ緟鍙戦�佸垪琛�
+ .isNotNull(CloudWmsNotifyLog::getMaxRetryCount)
+ .isNotNull(CloudWmsNotifyLog::getRetryIntervalSeconds)
+ .orderByAsc(CloudWmsNotifyLog::getLastNotifyTime)
.orderByAsc(CloudWmsNotifyLog::getId);
+ if (maxRetry >= 0) {
+ wrapper.lt(CloudWmsNotifyLog::getRetryCount, maxRetry);
+ }
+ if (limit < 0) {
+ return list(wrapper);
+ }
+ Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit));
return page(page, wrapper).getRecords();
+ }
+
+ @Override
+ public boolean tryClaimSending(Long id) {
+ if (id == null) {
+ return false;
+ }
+ boolean useRedis = useSendingRedis();
+ if (useRedis) {
+ boolean got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
+ CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
+ if (!got) {
+ CloudWmsNotifyLog cur = getById(id);
+ if (cur != null && (cur.getSending() == null || cur.getSending() == 0)) {
+ redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+ got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
+ CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
+ }
+ if (!got) {
+ return false;
+ }
+ }
+ }
+ LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+ u.eq(CloudWmsNotifyLog::getId, id)
+ .set(CloudWmsNotifyLog::getSending, 1)
+ .set(CloudWmsNotifyLog::getUpdateTime, new Date());
+ if (!useRedis) {
+ u.and(w -> w.isNull(CloudWmsNotifyLog::getSending).or().eq(CloudWmsNotifyLog::getSending, 0));
+ }
+ boolean ok = update(u);
+ if (!ok && useRedis) {
+ try {
+ redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+ } catch (Exception ignored) {
+ }
+ }
+ return ok;
+ }
+
+ @Override
+ public void clearSending(Long id) {
+ if (id == null) {
+ return;
+ }
+ if (useSendingRedis()) {
+ try {
+ redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+ } catch (Exception e) {
+ log.debug("浜戜粨涓婃姤 clearSending Redis id={}", id, e);
+ }
+ }
+ LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+ u.eq(CloudWmsNotifyLog::getId, id)
+ .set(CloudWmsNotifyLog::getSending, 0)
+ .set(CloudWmsNotifyLog::getUpdateTime, new Date());
+ update(u);
+ }
+
+ @Override
+ public void recoverStaleSendingWhenRedisMiss() {
+ Date threshold = new Date(System.currentTimeMillis() - STALE_SENDING_RECOVER_AFTER_MINUTES * 60_000L);
+ List<CloudWmsNotifyLog> rows = list(new LambdaQueryWrapper<CloudWmsNotifyLog>()
+ .eq(CloudWmsNotifyLog::getSending, 1)
+ .isNotNull(CloudWmsNotifyLog::getUpdateTime)
+ .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
+ .last("LIMIT 500"));
+ if (rows.isEmpty()) {
+ return;
+ }
+ Date now = new Date();
+ int cleared = 0;
+ for (CloudWmsNotifyLog row : rows) {
+ Long id = row.getId();
+ if (id == null) {
+ continue;
+ }
+ if (useSendingRedis() && isSendingHeldInRedis(id)) {
+ continue;
+ }
+ LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+ u.eq(CloudWmsNotifyLog::getId, id)
+ .eq(CloudWmsNotifyLog::getSending, 1)
+ .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
+ .set(CloudWmsNotifyLog::getSending, 0)
+ .set(CloudWmsNotifyLog::getUpdateTime, now);
+ if (update(u)) {
+ cleared++;
+ }
+ }
+ if (cleared > 0) {
+ log.info("浜戜粨寰呭姙 sending 琛ュ伩娓呴浂 {} 鏉�", cleared);
+ }
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public R manualFlushToNotifyByOrderCode(String orderCode, boolean inbound) {
+ if (StringUtils.isBlank(orderCode)) {
+ return R.error("鍗曞彿涓嶈兘涓虹┖");
+ }
+ int flag = inbound ? 1 : 0;
+ Date now = new Date();
+ LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+ u.eq(CloudWmsNotifyLog::getReportType, getReportTypeInOutResult())
+ .eq(CloudWmsNotifyLog::getSourceOrderNo, orderCode.trim())
+ .eq(CloudWmsNotifyLog::getInboundFlag, flag)
+ .eq(CloudWmsNotifyLog::getSendHold, 1)
+ .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
+ .set(CloudWmsNotifyLog::getSendHold, 0)
+ .set(CloudWmsNotifyLog::getUpdateTime, now);
+ int n = getBaseMapper().update(null, u);
+ if (n <= 0) {
+ return R.error("褰撳墠鏃犲緟鏀捐鐨勫叆鍑哄簱寰呭姙锛堣纭 manual 妯″紡銆佸崟鍙蜂笌鍏�/鍑哄簱绫诲瀷涓�鑷达級");
+ }
+ return R.ok("宸叉斁琛屼簯浠撲笂鎶ュ緟鍔� " + n + " 鏉�").add(n);
}
@Override
@@ -55,36 +236,93 @@
return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_FAIL, CloudWmsNotifyLog.NOTIFY_STATUS_FAIL);
}
- private String getConfigString(String flag, String defaultVal) {
- Config c = configService.getOne(new LambdaQueryWrapper<Config>().eq(Config::getFlag, flag).last("LIMIT 1"));
- if (c != null && c.getVal() != null && !c.getVal().isEmpty()) {
- return c.getVal().trim();
+ private String getConfigValTrimmed(String flag) {
+ Config c = configService.getCachedOrLoad(flag);
+ if (c == null || c.getVal() == null) {
+ return null;
}
- return defaultVal;
+ String v = c.getVal().trim();
+ return v.isEmpty() ? null : v;
+ }
+
+ /** 涓庡疄浣撳父閲忔惌閰嶏細浠呭綋搴�/缂撳瓨鏃犳湁鏁� val 鏃剁敤甯搁噺 */
+ private String getConfigString(String flag, String defaultVal) {
+ String v = getConfigValTrimmed(flag);
+ return v != null ? v : defaultVal;
}
private int getConfigInt(String flag, int defaultVal) {
- Integer v = getConfigInt(flag);
- return v != null ? v : defaultVal;
+ Integer n = getConfigIntOrNull(flag);
+ return n != null ? n : defaultVal;
+ }
+
+ private Integer getConfigIntOrNull(String flag) {
+ String v = getConfigValTrimmed(flag);
+ if (v == null) {
+ return null;
+ }
+ try {
+ return Integer.parseInt(v);
+ } catch (NumberFormatException e) {
+ return null;
+ }
}
@Override
public void fillFromConfig(CloudWmsNotifyLog log) {
- Integer maxRetry = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY);
- Integer interval = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS);
- log.setMaxRetryCount(maxRetry);
- log.setRetryIntervalSeconds(interval);
+ log.setNotifyStatus(getNotifyStatusPending());
+ log.setMaxRetryCount(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY));
+ log.setRetryIntervalSeconds(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS));
}
- /** 杩斿洖 null 琛ㄧず鏈厤缃垨瑙f瀽澶辫触 */
- private Integer getConfigInt(String flag) {
- try {
- Config c = configService.getOne(new LambdaQueryWrapper<Config>().eq(Config::getFlag, flag).last("LIMIT 1"));
- if (c != null && c.getVal() != null && !c.getVal().isEmpty()) {
- return Integer.parseInt(c.getVal().trim());
- }
- } catch (Exception ignored) {
+ @Override
+ public String inOutMergeKeyFromRequestBody(String requestBody) {
+ return mergeKeyFromBody(requestBody);
+ }
+
+ @Override
+ public List<InOutResultReportParam> parseInOutLinesFromRequestBody(String requestBody) throws IOException {
+ return extractInOutLines(requestBody);
+ }
+
+ private String mergeKeyFromBody(String body) {
+ if (StringUtils.isBlank(body)) {
+ return null;
}
- return null;
+ try {
+ JsonNode root = objectMapper.readTree(body);
+ if (root.has("lines") && root.get("lines").isArray() && root.get("lines").size() > 0) {
+ return null;
+ }
+ JsonNode first = root;
+ String orderNo = textNode(first, "orderNo");
+ if (StringUtils.isBlank(orderNo)) {
+ return null;
+ }
+ String wh = textNode(first, "wareHouseId");
+ boolean inbound = !first.has("inbound") || first.get("inbound").isNull() || first.get("inbound").asBoolean();
+ return orderNo + "\t" + inbound + "\t" + StringUtils.defaultString(wh);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private static String textNode(JsonNode n, String field) {
+ if (n == null || !n.has(field) || n.get(field).isNull()) {
+ return null;
+ }
+ return n.get(field).asText();
+ }
+
+ private List<InOutResultReportParam> extractInOutLines(String body) throws java.io.IOException {
+ JsonNode root = objectMapper.readTree(body);
+ if (root.has("lines") && root.get("lines").isArray()) {
+ List<InOutResultReportParam> list = new ArrayList<>();
+ for (JsonNode n : root.get("lines")) {
+ list.add(objectMapper.treeToValue(n, InOutResultReportParam.class));
+ }
+ return list;
+ }
+ return Collections.singletonList(objectMapper.readValue(body, InOutResultReportParam.class));
}
}
--
Gitblit v1.9.1