From 2a34b52125d5fc356d65ee1e8912845dd601d4e3 Mon Sep 17 00:00:00 2001
From: cl <1442464845@qq.com>
Date: 星期五, 01 五月 2026 12:52:02 +0800
Subject: [PATCH] 多加入参数和修改规则

---
 rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java |  272 +++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 246 insertions(+), 26 deletions(-)

diff --git a/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java b/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
index e3b7043..c9726d0 100644
--- a/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
+++ b/rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
@@ -1,34 +1,88 @@
 package com.vincent.rsf.server.manager.service.impl;
 
+import com.vincent.rsf.framework.common.R;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
+import com.vincent.rsf.server.common.service.RedisService;
 import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
 import com.vincent.rsf.server.manager.mapper.CloudWmsNotifyLogMapper;
 import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
 import com.vincent.rsf.server.system.constant.GlobalConfigCode;
 import com.vincent.rsf.server.system.entity.Config;
 import com.vincent.rsf.server.system.service.ConfigService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
+import java.io.IOException;
+import java.util.Date;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
+@Slf4j
 @Service
 public class CloudWmsNotifyLogServiceImpl extends ServiceImpl<CloudWmsNotifyLogMapper, CloudWmsNotifyLog> implements CloudWmsNotifyLogService {
 
+    /** 鍗曟潯寰呭姙銆屾鍦ㄤ笂鎶ャ�峈edis 鍗犱綅绉掓暟锛圫ET NX EX锛� */
+    private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120;
+    /** sending=1 浣� Redis 鏃犲崰浣嶏細update_time 鏃╀簬姝ゆ椂闀匡紙鍒嗛挓锛夊垯琛ュ伩娓呴浂 */
+    private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 6;
+
     @Autowired
     private ConfigService configService;
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired(required = false)
+    private RedisService redisService;
+
+    private static final String CLOUD_WMS_REDIS_FLAG = "cloudwms";
+
+    private boolean useSendingRedis() {
+        return redisService != null
+                && Boolean.TRUE.equals(redisService.initialize)
+                && CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS > 0;
+    }
+
+    private static String sendingRedisSubKey(Long id) {
+        return "sending." + id;
+    }
+
+    /** Redis 鍗犱綅瀛樺湪涓旀湭杩囨湡鏃惰涓烘鍦ㄤ笂鎶� */
+    private boolean isSendingHeldInRedis(Long id) {
+        if (id == null || !useSendingRedis()) {
+            return false;
+        }
+        try {
+            String v = redisService.getValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+            return StringUtils.isNotBlank(v);
+        } catch (Exception e) {
+            return false;
+        }
+    }
 
     @Override
     public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) {
         LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>()
                 // 浠呮煡璇㈡暟鎹簱閰嶇疆鐘舵�侊細寰呴�氱煡 + 澶辫触锛堝彲閲嶈瘯锛�
                 .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
+                .apply("(send_hold IS NULL OR send_hold = 0)")
                 // 浠呮煡璇㈠彲閲嶈瘯鏁版嵁锛氭棤闄愰噸璇曘�佹湭閰嶇疆涓婇檺銆佹垨鏈揪鍒颁笂闄�
                 .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)")
                 // 浠呮煡璇㈠凡鍒伴噸璇曟椂闂寸殑鏁版嵁锛岄伩鍏嶅墠 50 鏉℃湭鍒伴棿闅斿鑷村悗缁褰曢暱鏈熼ゥ楗�
                 .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)")
+                //缂洪噸璇曞弬鏁扮殑涓嶈繘鍏ュ緟鍙戦�佸垪琛�
+                .isNotNull(CloudWmsNotifyLog::getMaxRetryCount)
+                .isNotNull(CloudWmsNotifyLog::getRetryIntervalSeconds)
                 .orderByAsc(CloudWmsNotifyLog::getLastNotifyTime)
                 .orderByAsc(CloudWmsNotifyLog::getId);
         if (maxRetry >= 0) {
@@ -39,6 +93,122 @@
         }
         Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit));
         return page(page, wrapper).getRecords();
+    }
+
+    @Override
+    public boolean tryClaimSending(Long id) {
+        if (id == null) {
+            return false;
+        }
+        boolean useRedis = useSendingRedis();
+        if (useRedis) {
+            boolean got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
+                    CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
+            if (!got) {
+                CloudWmsNotifyLog cur = getById(id);
+                if (cur != null && (cur.getSending() == null || cur.getSending() == 0)) {
+                    redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+                    got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
+                            CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
+                }
+                if (!got) {
+                    return false;
+                }
+            }
+        }
+        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+        u.eq(CloudWmsNotifyLog::getId, id)
+                .set(CloudWmsNotifyLog::getSending, 1)
+                .set(CloudWmsNotifyLog::getUpdateTime, new Date());
+        if (!useRedis) {
+            u.and(w -> w.isNull(CloudWmsNotifyLog::getSending).or().eq(CloudWmsNotifyLog::getSending, 0));
+        }
+        boolean ok = update(u);
+        if (!ok && useRedis) {
+            try {
+                redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+            } catch (Exception ignored) {
+            }
+        }
+        return ok;
+    }
+
+    @Override
+    public void clearSending(Long id) {
+        if (id == null) {
+            return;
+        }
+        if (useSendingRedis()) {
+            try {
+                redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
+            } catch (Exception e) {
+                log.debug("浜戜粨涓婃姤 clearSending Redis id={}", id, e);
+            }
+        }
+        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+        u.eq(CloudWmsNotifyLog::getId, id)
+                .set(CloudWmsNotifyLog::getSending, 0)
+                .set(CloudWmsNotifyLog::getUpdateTime, new Date());
+        update(u);
+    }
+
+    @Override
+    public void recoverStaleSendingWhenRedisMiss() {
+        Date threshold = new Date(System.currentTimeMillis() - STALE_SENDING_RECOVER_AFTER_MINUTES * 60_000L);
+        List<CloudWmsNotifyLog> rows = list(new LambdaQueryWrapper<CloudWmsNotifyLog>()
+                .eq(CloudWmsNotifyLog::getSending, 1)
+                .isNotNull(CloudWmsNotifyLog::getUpdateTime)
+                .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
+                .last("LIMIT 500"));
+        if (rows.isEmpty()) {
+            return;
+        }
+        Date now = new Date();
+        int cleared = 0;
+        for (CloudWmsNotifyLog row : rows) {
+            Long id = row.getId();
+            if (id == null) {
+                continue;
+            }
+            if (useSendingRedis() && isSendingHeldInRedis(id)) {
+                continue;
+            }
+            LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+            u.eq(CloudWmsNotifyLog::getId, id)
+                    .eq(CloudWmsNotifyLog::getSending, 1)
+                    .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
+                    .set(CloudWmsNotifyLog::getSending, 0)
+                    .set(CloudWmsNotifyLog::getUpdateTime, now);
+            if (update(u)) {
+                cleared++;
+            }
+        }
+        if (cleared > 0) {
+            log.info("浜戜粨寰呭姙 sending 琛ュ伩娓呴浂 {} 鏉�", cleared);
+        }
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public R manualFlushToNotifyByOrderCode(String orderCode, boolean inbound) {
+        if (StringUtils.isBlank(orderCode)) {
+            return R.error("鍗曞彿涓嶈兘涓虹┖");
+        }
+        int flag = inbound ? 1 : 0;
+        Date now = new Date();
+        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
+        u.eq(CloudWmsNotifyLog::getReportType, getReportTypeInOutResult())
+                .eq(CloudWmsNotifyLog::getSourceOrderNo, orderCode.trim())
+                .eq(CloudWmsNotifyLog::getInboundFlag, flag)
+                .eq(CloudWmsNotifyLog::getSendHold, 1)
+                .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
+                .set(CloudWmsNotifyLog::getSendHold, 0)
+                .set(CloudWmsNotifyLog::getUpdateTime, now);
+        int n = getBaseMapper().update(null, u);
+        if (n <= 0) {
+            return R.error("褰撳墠鏃犲緟鏀捐鐨勫叆鍑哄簱寰呭姙锛堣纭 manual 妯″紡銆佸崟鍙蜂笌鍏�/鍑哄簱绫诲瀷涓�鑷达級");
+        }
+        return R.ok("宸叉斁琛屼簯浠撲笂鎶ュ緟鍔� " + n + " 鏉�").add(n);
     }
 
     @Override
@@ -66,43 +236,93 @@
         return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_FAIL, CloudWmsNotifyLog.NOTIFY_STATUS_FAIL);
     }
 
-    private String getConfigString(String flag, String defaultVal) {
-        Config c = configService.getOne(new LambdaQueryWrapper<Config>()
-                .eq(Config::getFlag, flag)
-                .orderByDesc(Config::getId)
-                .last("LIMIT 1"));
-        if (c != null && c.getVal() != null && !c.getVal().isEmpty()) {
-            return c.getVal().trim();
+    private String getConfigValTrimmed(String flag) {
+        Config c = configService.getCachedOrLoad(flag);
+        if (c == null || c.getVal() == null) {
+            return null;
         }
-        return defaultVal;
+        String v = c.getVal().trim();
+        return v.isEmpty() ? null : v;
+    }
+
+    /** 涓庡疄浣撳父閲忔惌閰嶏細浠呭綋搴�/缂撳瓨鏃犳湁鏁� val 鏃剁敤甯搁噺 */
+    private String getConfigString(String flag, String defaultVal) {
+        String v = getConfigValTrimmed(flag);
+        return v != null ? v : defaultVal;
     }
 
     private int getConfigInt(String flag, int defaultVal) {
-        Integer v = getConfigInt(flag);
-        return v != null ? v : defaultVal;
+        Integer n = getConfigIntOrNull(flag);
+        return n != null ? n : defaultVal;
+    }
+
+    private Integer getConfigIntOrNull(String flag) {
+        String v = getConfigValTrimmed(flag);
+        if (v == null) {
+            return null;
+        }
+        try {
+            return Integer.parseInt(v);
+        } catch (NumberFormatException e) {
+            return null;
+        }
     }
 
     @Override
     public void fillFromConfig(CloudWmsNotifyLog log) {
-        Integer maxRetry = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY);
-        Integer interval = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS);
         log.setNotifyStatus(getNotifyStatusPending());
-        log.setMaxRetryCount(maxRetry);
-        log.setRetryIntervalSeconds(interval);
+        log.setMaxRetryCount(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY));
+        log.setRetryIntervalSeconds(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS));
     }
 
-    /** 杩斿洖 null 琛ㄧず鏈厤缃垨瑙f瀽澶辫触 */
-    private Integer getConfigInt(String flag) {
-        try {
-            Config c = configService.getOne(new LambdaQueryWrapper<Config>()
-                    .eq(Config::getFlag, flag)
-                    .orderByDesc(Config::getId)
-                    .last("LIMIT 1"));
-            if (c != null && c.getVal() != null && !c.getVal().isEmpty()) {
-                return Integer.parseInt(c.getVal().trim());
-            }
-        } catch (Exception ignored) {
+    @Override
+    public String inOutMergeKeyFromRequestBody(String requestBody) {
+        return mergeKeyFromBody(requestBody);
+    }
+
+    @Override
+    public List<InOutResultReportParam> parseInOutLinesFromRequestBody(String requestBody) throws IOException {
+        return extractInOutLines(requestBody);
+    }
+
+    private String mergeKeyFromBody(String body) {
+        if (StringUtils.isBlank(body)) {
+            return null;
         }
-        return null;
+        try {
+            JsonNode root = objectMapper.readTree(body);
+            if (root.has("lines") && root.get("lines").isArray() && root.get("lines").size() > 0) {
+                return null;
+            }
+            JsonNode first = root;
+            String orderNo = textNode(first, "orderNo");
+            if (StringUtils.isBlank(orderNo)) {
+                return null;
+            }
+            String wh = textNode(first, "wareHouseId");
+            boolean inbound = !first.has("inbound") || first.get("inbound").isNull() || first.get("inbound").asBoolean();
+            return orderNo + "\t" + inbound + "\t" + StringUtils.defaultString(wh);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private static String textNode(JsonNode n, String field) {
+        if (n == null || !n.has(field) || n.get(field).isNull()) {
+            return null;
+        }
+        return n.get(field).asText();
+    }
+
+    private List<InOutResultReportParam> extractInOutLines(String body) throws java.io.IOException {
+        JsonNode root = objectMapper.readTree(body);
+        if (root.has("lines") && root.get("lines").isArray()) {
+            List<InOutResultReportParam> list = new ArrayList<>();
+            for (JsonNode n : root.get("lines")) {
+                list.add(objectMapper.treeToValue(n, InOutResultReportParam.class));
+            }
+            return list;
+        }
+        return Collections.singletonList(objectMapper.readValue(body, InOutResultReportParam.class));
     }
 }

--
Gitblit v1.9.1