cl
6 天以前 cb2f02d60aac235f2f9e5ef777e0141fb697c264
多加入参数和修改规则
7个文件已修改
191 ■■■■ 已修改文件
rsf-server/src/main/java/com/vincent/rsf/server/api/service/impl/CloudWmsReportServiceImpl.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/common/service/RedisService.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java 133 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/system/service/impl/ConfigServiceImpl.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/resources/application-dev.yml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/resources/application-prod.yml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
rsf-server/src/main/java/com/vincent/rsf/server/api/service/impl/CloudWmsReportServiceImpl.java
@@ -44,7 +44,7 @@
    public Map<String, Object> syncMatnrsToCloud(Object body) {
        if (!isCloudWmsConfigured()) {
            log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过物料基础信息同步");
            return stubSuccess("云仓地址未配置,未实际同步");
            return stubWithoutUpstream("云仓地址未配置,未实际同步");
        }
        return cloudWmsErpFeignClient.syncMatnrs(body != null ? body : new HashMap<>());
    }
@@ -56,7 +56,7 @@
        }
        if (!isCloudWmsConfigured()) {
            log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过 9.1 入/出库结果上报,订单:{}", param.getOrderNo());
            return stubSuccess("云仓地址未配置,未实际上报");
            return stubWithoutUpstream("云仓地址未配置,未实际上报");
        }
        String err = validateDapBaseForInOut(param);
        if (err != null) {
@@ -79,7 +79,7 @@
        }
        if (!isCloudWmsConfigured()) {
            log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过 9.1 入出库合并上报");
            return stubSuccess("云仓地址未配置,未实际上报");
            return stubWithoutUpstream("云仓地址未配置,未实际上报");
        }
        InOutResultReportParam first = lines.get(0);
        boolean inbound = first.getInbound() == null || Boolean.TRUE.equals(first.getInbound());
@@ -112,7 +112,7 @@
        }
        if (!isCloudWmsConfigured()) {
            log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过 9.2 库存调整上报,物料:{}", param.getMatNr());
            return stubSuccess("云仓地址未配置,未实际上报");
            return stubWithoutUpstream("云仓地址未配置,未实际上报");
        }
        Integer changeType = param.getChangeType();
        if (changeType == null) {
@@ -307,10 +307,11 @@
        return baseUrl != null && !baseUrl.trim().isEmpty();
    }
    private Map<String, Object> stubSuccess(String msg) {
    /** 未走云仓 HTTP,code 非 200,避免调度误判成功 */
    private Map<String, Object> stubWithoutUpstream(String msg) {
        Map<String, Object> data = new HashMap<>();
        data.put("result", "SUCCESS");
        return resultMap(200, msg, data);
        data.put("result", "SKIPPED");
        return resultMap(503, msg, data);
    }
    private Map<String, Object> resultMap(int code, String msg, Map<String, Object> data) {
rsf-server/src/main/java/com/vincent/rsf/server/common/service/RedisService.java
@@ -36,6 +36,9 @@
        if (null == this.pool) {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setTestOnBorrow(false);
            config.setMaxWaitMillis(5000L);
            int maxTotal = redisProperties.getMax() > 0 ? redisProperties.getMax() : 32;
            config.setMaxTotal(maxTotal);
            this.index = redisProperties.getIndex();
            // 空白密码传 null,不向未开认证的 Redis 发 AUTH
            String pwd = StringUtils.trimToNull(redisProperties.getPassword());
@@ -256,6 +259,8 @@
            return jedis.get(flag + LINK + key);
        } catch (Exception e) {
            log.error(this.getClass().getSimpleName(), e);
        } finally {
            jedis.close();
        }
        return null;
    }
@@ -277,6 +282,8 @@
            return jedis.del(keys);
        } catch (Exception e) {
            log.error(this.getClass().getSimpleName(), e);
        } finally {
            jedis.close();
        }
        return null;
    }
@@ -595,6 +602,8 @@
        } catch (Exception e) {
            log.error(this.getClass().getSimpleName(), e);
            return true;
        } finally {
            jedis.close();
        }
    }
rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -44,9 +44,7 @@
    @Autowired
    private ConfigService configService;
    /** sending=1 且 Redis 占位已失、update_time 超时:补偿清零 */
    @Scheduled(cron = "0 0/2 * * * ?")
//    @Scheduled(cron = "0/5 * * * * ?")
    @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}")
    public void recoverStaleSending() {
        try {
            cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
@@ -55,22 +53,45 @@
        }
    }
    @Scheduled(cron = "0/60 * * * * ?")
    @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}")
    public void syncCloudWmsNotify() {
        List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
        if (pending.isEmpty()) {
            log.debug("云仓上报调度:本轮待发送 0 条");
            return;
        long t0 = System.currentTimeMillis();
        try {
            List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
            if (pending.isEmpty()) {
                log.debug("云仓上报调度:本轮待发送 0 条");
                return;
            }
            log.info("云仓上报调度:本轮待发送 {} 条", pending.size());
            log.info("云仓上报调度:开始派发");
            dispatchPending(pending);
            log.info("云仓上报调度:派发完成");
            log.info("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0);
        } catch (Exception e) {
            log.error("云仓上报调度异常,已耗时 {} ms", System.currentTimeMillis() - t0, e);
        }
        log.info("云仓上报调度:本轮待发送 {} 条", pending.size());
        dispatchPending(pending);
    }
    private static boolean isSendingBusy(CloudWmsNotifyLog row) {
        Integer s = row.getSending();
        return s != null && s != 0;
    }
    /** 同单多条合并上报 */
    private void dispatchPending(List<CloudWmsNotifyLog> pending) {
        int rowsReported = 0;
        log.info("云仓上报派发:开始,待办 {} 条", pending.size());
        long tCfg = System.currentTimeMillis();
        log.info("云仓上报派发:读取上报类型配置");
        String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
        log.info("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
        LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
        for (CloudWmsNotifyLog row : pending) {
        int n = pending.size();
        for (int i = 0; i < n; i++) {
            CloudWmsNotifyLog row = pending.get(i);
            if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) {
                log.info("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId());
            }
            if (!rtInOut.equals(row.getReportType())) {
                continue;
            }
@@ -80,6 +101,7 @@
            }
            inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
        }
        log.info("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size());
        Set<Long> done = new HashSet<>();
        for (CloudWmsNotifyLog row : pending) {
            Long rid = row.getId();
@@ -87,49 +109,95 @@
                continue;
            }
            if (!rtInOut.equals(row.getReportType())) {
                safeProcessOne(row);
                if (rid != null) {
                    done.add(rid);
                if (isSendingBusy(row)) {
                    continue;
                }
                log.info("云仓上报派发:其它类型单条 id={}", rid);
                if (safeProcessOne(row)) {
                    rowsReported++;
                    if (rid != null) {
                        done.add(rid);
                    }
                }
                continue;
            }
            String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
            if (key == null) {
                safeProcessOne(row);
                if (rid != null) {
                    done.add(rid);
                if (isSendingBusy(row)) {
                    continue;
                }
                log.info("云仓上报派发:无合并键单条 id={}", rid);
                if (safeProcessOne(row)) {
                    rowsReported++;
                    if (rid != null) {
                        done.add(rid);
                    }
                }
                continue;
            }
            List<CloudWmsNotifyLog> g = inOutGroups.get(key);
            if (g != null && g.size() >= 2) {
                safeProcessMergedInOutGroup(g);
            List<CloudWmsNotifyLog> work = new ArrayList<>();
            if (g != null) {
                for (CloudWmsNotifyLog x : g) {
                    if (x.getId() != null) {
                        done.add(x.getId());
                    if (x.getId() == null || done.contains(x.getId())) {
                        continue;
                    }
                    if (isSendingBusy(x)) {
                        continue;
                    }
                    work.add(x);
                }
            }
            if (work.isEmpty()) {
                continue;
            }
            if (work.size() >= 2) {
                log.info("云仓上报派发:同单合并 {} 条", work.size());
                Set<Long> handled = safeProcessMergedInOutGroup(work);
                done.addAll(handled);
                rowsReported += handled.size();
            } else {
                safeProcessOne(row);
                if (rid != null) {
                    done.add(rid);
                log.info("云仓上报派发:入出库单条 id={}", work.get(0).getId());
                if (safeProcessOne(work.get(0))) {
                    rowsReported++;
                    if (work.get(0).getId() != null) {
                        done.add(work.get(0).getId());
                    }
                }
            }
        }
        log.info("云仓上报派发:遍历结束");
        if (rowsReported == 0) {
            log.warn("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size());
        } else {
            log.info("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported);
        }
    }
    private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
    private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
        Set<Long> handled = new HashSet<>();
        List<Long> claimedIds = new ArrayList<>();
        List<CloudWmsNotifyLog> claimedRows = new ArrayList<>();
        log.info("云仓上报派发:合并组抢占,行数 {}", group.size());
        try {
            for (CloudWmsNotifyLog row : group) {
                Long id = row.getId();
                log.debug("云仓上报派发:合并抢占尝试 id={}", id);
                if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
                    log.debug("云仓上报同批合并未抢到发送权 id={}", id);
                    return;
                    log.warn("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id);
                    continue;
                }
                claimedIds.add(id);
                claimedRows.add(row);
            }
            processMergedInOut(group);
            if (claimedRows.isEmpty()) {
                log.warn("云仓上报派发:合并组抢占 0 行");
                return handled;
            }
            log.info("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size());
            processMergedInOut(claimedRows);
            log.info("云仓上报派发:合并组云仓调用已返回");
            handled.addAll(claimedIds);
        } catch (Exception e) {
            log.warn("云仓上报同批合并异常:{}", e.getMessage());
        } finally {
@@ -137,6 +205,7 @@
                cloudWmsNotifyLogService.clearSending(id);
            }
        }
        return handled;
    }
    private void processMergedInOut(List<CloudWmsNotifyLog> group) {
@@ -200,11 +269,12 @@
        return ids;
    }
    private void safeProcessOne(CloudWmsNotifyLog logRecord) {
    private boolean safeProcessOne(CloudWmsNotifyLog logRecord) {
        Long id = logRecord.getId();
        log.info("云仓上报派发:单条抢占 id={}", id);
        if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
            log.debug("云仓上报未抢到发送权 id={}", id);
            return;
            log.warn("云仓上报派发:单条未抢到 id={}", id);
            return false;
        }
        try {
            processOne(logRecord);
@@ -213,6 +283,7 @@
        } finally {
            cloudWmsNotifyLogService.clearSending(id);
        }
        return true;
    }
    private void processOne(CloudWmsNotifyLog logRecord) {
rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
@@ -36,7 +36,7 @@
    /** 单条待办「正在上报」Redis 占位秒数(SET NX EX) */
    private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120;
    /** sending=1 但 Redis 无占位:update_time 早于此时长(分钟)则补偿清零 */
    private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 6;
    private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 2;
    @Autowired
    private ConfigService configService;
@@ -73,12 +73,12 @@
    @Override
    public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) {
        LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>()
                // 仅查询数据库配置状态:待通知 + 失败(可重试)
                // 仅查询数据库配置状态
                .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
                .apply("(send_hold IS NULL OR send_hold = 0)")
                // 仅查询可重试数据:无限重试、未配置上限、或未达到上限
                // 仅查询可重试数据
                .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)")
                // 仅查询已到重试时间的数据,避免前 50 条未到间隔导致后续记录长期饥饿
                // 仅查询已到重试时间的数据
                .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)")
                //缺重试参数的不进入待发送列表
                .isNotNull(CloudWmsNotifyLog::getMaxRetryCount)
rsf-server/src/main/java/com/vincent/rsf/server/system/service/impl/ConfigServiceImpl.java
@@ -111,6 +111,13 @@
        if (StringUtils.isBlank(flag)) {
            return null;
        }
        Config mem = CONFIG_CACHE.get(flag);
        if (isEffectiveConfig(mem)) {
            return mem;
        }
        if (mem != null) {
            CONFIG_CACHE.remove(flag);
        }
        if (redisReady()) {
            Config fromRedis = tryRedisGetConfig(flag);
            if (isEffectiveConfig(fromRedis)) {
@@ -123,13 +130,6 @@
                tryRedisSetexConfig(flag, loaded);
            }
            return loaded;
        }
        Config cached = CONFIG_CACHE.get(flag);
        if (isEffectiveConfig(cached)) {
            return cached;
        }
        if (cached != null) {
            CONFIG_CACHE.remove(flag);
        }
        Config loaded = loadConfigFromDb(flag);
        if (loaded != null) {
rsf-server/src/main/resources/application-dev.yml
@@ -132,6 +132,12 @@
    #判断是否校验合格后,才允许收货
    flagReceiving: false
rsf:
  cloudwms:
    notify:
      sync-cron: "0/3 * * * * ?"
      recover-cron: "0/5 * * * * ?"
# HTTP 接口审计(rsf-http-audit,引入依赖即生效,可 enabled=false 关闭)
# whitelist-only=true:仅 sys_http_audit_rule 命中规则才写审计;无规则时不落库。false:排除路径外全量记录。
# rule-cache-refresh-ms:规则表缓存刷新间隔(毫秒)
rsf-server/src/main/resources/application-prod.yml
@@ -133,6 +133,12 @@
    #判断是否校验合格后,才允许收货
    flagReceiving: false
rsf:
  cloudwms:
    notify:
      sync-cron: "0 0/1 * * * ?"
      recover-cron: "0 0/2 * * * ?"
# HTTP 接口审计(rsf-http-audit,引入依赖即生效,可 enabled=false 关闭)
# whitelist-only=true:仅 sys_http_audit_rule 命中规则才写审计;无规则时不落库。false:排除路径外全量记录。
# rule-cache-refresh-ms:规则表缓存刷新间隔(毫秒)