rsf-server/src/main/java/com/vincent/rsf/server/api/service/impl/CloudWmsReportServiceImpl.java
@@ -44,7 +44,7 @@ public Map<String, Object> syncMatnrsToCloud(Object body) { if (!isCloudWmsConfigured()) { log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过物料基础信息同步"); return stubSuccess("云仓地址未配置,未实际同步"); return stubWithoutUpstream("云仓地址未配置,未实际同步"); } return cloudWmsErpFeignClient.syncMatnrs(body != null ? body : new HashMap<>()); } @@ -56,7 +56,7 @@ } if (!isCloudWmsConfigured()) { log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过 9.1 入/出库结果上报,订单:{}", param.getOrderNo()); return stubSuccess("云仓地址未配置,未实际上报"); return stubWithoutUpstream("云仓地址未配置,未实际上报"); } String err = validateDapBaseForInOut(param); if (err != null) { @@ -79,7 +79,7 @@ } if (!isCloudWmsConfigured()) { log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过 9.1 入出库合并上报"); return stubSuccess("云仓地址未配置,未实际上报"); return stubWithoutUpstream("云仓地址未配置,未实际上报"); } InOutResultReportParam first = lines.get(0); boolean inbound = first.getInbound() == null || Boolean.TRUE.equals(first.getInbound()); @@ -112,7 +112,7 @@ } if (!isCloudWmsConfigured()) { log.warn("ErpApi(云仓WMS) 未配置 host/base-url,跳过 9.2 库存调整上报,物料:{}", param.getMatNr()); return stubSuccess("云仓地址未配置,未实际上报"); return stubWithoutUpstream("云仓地址未配置,未实际上报"); } Integer changeType = param.getChangeType(); if (changeType == null) { @@ -307,10 +307,11 @@ return baseUrl != null && !baseUrl.trim().isEmpty(); } private Map<String, Object> stubSuccess(String msg) { /** 未走云仓 HTTP,code 非 200,避免调度误判成功 */ private Map<String, Object> stubWithoutUpstream(String msg) { Map<String, Object> data = new HashMap<>(); data.put("result", "SUCCESS"); return resultMap(200, msg, data); data.put("result", "SKIPPED"); return resultMap(503, msg, data); } private Map<String, Object> resultMap(int code, String msg, Map<String, Object> data) { rsf-server/src/main/java/com/vincent/rsf/server/common/service/RedisService.java
@@ -36,6 +36,9 @@ if (null == this.pool) { JedisPoolConfig config = new JedisPoolConfig(); config.setTestOnBorrow(false); config.setMaxWaitMillis(5000L); int maxTotal = redisProperties.getMax() > 0 ? redisProperties.getMax() : 32; config.setMaxTotal(maxTotal); this.index = redisProperties.getIndex(); // 空白密码传 null,不向未开认证的 Redis 发 AUTH String pwd = StringUtils.trimToNull(redisProperties.getPassword()); @@ -256,6 +259,8 @@ return jedis.get(flag + LINK + key); } catch (Exception e) { log.error(this.getClass().getSimpleName(), e); } finally { jedis.close(); } return null; } @@ -277,6 +282,8 @@ return jedis.del(keys); } catch (Exception e) { log.error(this.getClass().getSimpleName(), e); } finally { jedis.close(); } return null; } @@ -595,6 +602,8 @@ } catch (Exception e) { log.error(this.getClass().getSimpleName(), e); return true; } finally { jedis.close(); } } rsf-server/src/main/java/com/vincent/rsf/server/manager/schedules/CloudWmsNotifySchedule.java
@@ -44,9 +44,7 @@ @Autowired private ConfigService configService; /** sending=1 且 Redis 占位已失、update_time 超时:补偿清零 */ @Scheduled(cron = "0 0/2 * * * ?") // @Scheduled(cron = "0/5 * * * * ?") @Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}") public void recoverStaleSending() { try { cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss(); @@ -55,22 +53,45 @@ } } @Scheduled(cron = "0/60 * * * * ?") @Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}") public void syncCloudWmsNotify() { List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1); if (pending.isEmpty()) { log.debug("云仓上报调度:本轮待发送 0 条"); return; long t0 = System.currentTimeMillis(); try { List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1); if (pending.isEmpty()) { log.debug("云仓上报调度:本轮待发送 0 条"); return; } log.info("云仓上报调度:本轮待发送 {} 条", pending.size()); log.info("云仓上报调度:开始派发"); dispatchPending(pending); log.info("云仓上报调度:派发完成"); log.info("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0); } catch (Exception e) { log.error("云仓上报调度异常,已耗时 {} ms", System.currentTimeMillis() - t0, e); } log.info("云仓上报调度:本轮待发送 {} 条", pending.size()); dispatchPending(pending); } private static boolean isSendingBusy(CloudWmsNotifyLog row) { Integer s = row.getSending(); return s != null && s != 0; } /** 同单多条合并上报 */ private void dispatchPending(List<CloudWmsNotifyLog> pending) { int rowsReported = 0; log.info("云仓上报派发:开始,待办 {} 条", pending.size()); long tCfg = System.currentTimeMillis(); log.info("云仓上报派发:读取上报类型配置"); String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult(); log.info("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg); LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>(); for (CloudWmsNotifyLog row : pending) { int n = pending.size(); for (int i = 0; i < n; i++) { CloudWmsNotifyLog row = pending.get(i); if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) { log.info("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId()); } if (!rtInOut.equals(row.getReportType())) { continue; } @@ -80,6 +101,7 @@ } inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row); } log.info("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size()); Set<Long> done = new HashSet<>(); for (CloudWmsNotifyLog row : pending) { Long rid = row.getId(); @@ -87,49 +109,95 @@ continue; } if (!rtInOut.equals(row.getReportType())) { safeProcessOne(row); if (rid != null) { done.add(rid); if (isSendingBusy(row)) { continue; } log.info("云仓上报派发:其它类型单条 id={}", rid); if (safeProcessOne(row)) { rowsReported++; if (rid != null) { done.add(rid); } } continue; } String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody()); if (key == null) { safeProcessOne(row); if (rid != null) { done.add(rid); if (isSendingBusy(row)) { continue; } log.info("云仓上报派发:无合并键单条 id={}", rid); if (safeProcessOne(row)) { rowsReported++; if (rid != null) { done.add(rid); } } continue; } List<CloudWmsNotifyLog> g = inOutGroups.get(key); if (g != null && g.size() >= 2) { safeProcessMergedInOutGroup(g); List<CloudWmsNotifyLog> work = new ArrayList<>(); if (g != null) { for (CloudWmsNotifyLog x : g) { if (x.getId() != null) { done.add(x.getId()); if (x.getId() == null || done.contains(x.getId())) { continue; } if (isSendingBusy(x)) { continue; } work.add(x); } } if (work.isEmpty()) { continue; } if (work.size() >= 2) { log.info("云仓上报派发:同单合并 {} 条", work.size()); Set<Long> handled = safeProcessMergedInOutGroup(work); done.addAll(handled); rowsReported += handled.size(); } else { safeProcessOne(row); if (rid != null) { done.add(rid); log.info("云仓上报派发:入出库单条 id={}", work.get(0).getId()); if (safeProcessOne(work.get(0))) { rowsReported++; if (work.get(0).getId() != null) { done.add(work.get(0).getId()); } } } } log.info("云仓上报派发:遍历结束"); if (rowsReported == 0) { log.warn("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size()); } else { log.info("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported); } } private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) { private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) { Set<Long> handled = new HashSet<>(); List<Long> claimedIds = new ArrayList<>(); List<CloudWmsNotifyLog> claimedRows = new ArrayList<>(); log.info("云仓上报派发:合并组抢占,行数 {}", group.size()); try { for (CloudWmsNotifyLog row : group) { Long id = row.getId(); log.debug("云仓上报派发:合并抢占尝试 id={}", id); if (!cloudWmsNotifyLogService.tryClaimSending(id)) { log.debug("云仓上报同批合并未抢到发送权 id={}", id); return; log.warn("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id); continue; } claimedIds.add(id); claimedRows.add(row); } processMergedInOut(group); if (claimedRows.isEmpty()) { log.warn("云仓上报派发:合并组抢占 0 行"); return handled; } log.info("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size()); processMergedInOut(claimedRows); log.info("云仓上报派发:合并组云仓调用已返回"); handled.addAll(claimedIds); } catch (Exception e) { log.warn("云仓上报同批合并异常:{}", e.getMessage()); } finally { @@ -137,6 +205,7 @@ cloudWmsNotifyLogService.clearSending(id); } } return handled; } private void processMergedInOut(List<CloudWmsNotifyLog> group) { @@ -200,11 +269,12 @@ return ids; } private void safeProcessOne(CloudWmsNotifyLog logRecord) { private boolean safeProcessOne(CloudWmsNotifyLog logRecord) { Long id = logRecord.getId(); log.info("云仓上报派发:单条抢占 id={}", id); if (!cloudWmsNotifyLogService.tryClaimSending(id)) { log.debug("云仓上报未抢到发送权 id={}", id); return; log.warn("云仓上报派发:单条未抢到 id={}", id); return false; } try { processOne(logRecord); @@ -213,6 +283,7 @@ } finally { cloudWmsNotifyLogService.clearSending(id); } return true; } private void processOne(CloudWmsNotifyLog logRecord) { rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
@@ -36,7 +36,7 @@ /** 单条待办「正在上报」Redis 占位秒数(SET NX EX) */ private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120; /** sending=1 但 Redis 无占位:update_time 早于此时长(分钟)则补偿清零 */ private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 6; private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 2; @Autowired private ConfigService configService; @@ -73,12 +73,12 @@ @Override public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) { LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>() // 仅查询数据库配置状态:待通知 + 失败(可重试) // 仅查询数据库配置状态 .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail()) .apply("(send_hold IS NULL OR send_hold = 0)") // 仅查询可重试数据:无限重试、未配置上限、或未达到上限 // 仅查询可重试数据 .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)") // 仅查询已到重试时间的数据,避免前 50 条未到间隔导致后续记录长期饥饿 // 仅查询已到重试时间的数据 .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)") //缺重试参数的不进入待发送列表 .isNotNull(CloudWmsNotifyLog::getMaxRetryCount) rsf-server/src/main/java/com/vincent/rsf/server/system/service/impl/ConfigServiceImpl.java
@@ -111,6 +111,13 @@ if (StringUtils.isBlank(flag)) { return null; } Config mem = CONFIG_CACHE.get(flag); if (isEffectiveConfig(mem)) { return mem; } if (mem != null) { CONFIG_CACHE.remove(flag); } if (redisReady()) { Config fromRedis = tryRedisGetConfig(flag); if (isEffectiveConfig(fromRedis)) { @@ -123,13 +130,6 @@ tryRedisSetexConfig(flag, loaded); } return loaded; } Config cached = CONFIG_CACHE.get(flag); if (isEffectiveConfig(cached)) { return cached; } if (cached != null) { CONFIG_CACHE.remove(flag); } Config loaded = loadConfigFromDb(flag); if (loaded != null) { rsf-server/src/main/resources/application-dev.yml
@@ -132,6 +132,12 @@ #判断是否校验合格后,才允许收货 flagReceiving: false rsf: cloudwms: notify: sync-cron: "0/3 * * * * ?" recover-cron: "0/5 * * * * ?" # HTTP 接口审计(rsf-http-audit,引入依赖即生效,可 enabled=false 关闭) # whitelist-only=true:仅 sys_http_audit_rule 命中规则才写审计;无规则时不落库。false:排除路径外全量记录。 # rule-cache-refresh-ms:规则表缓存刷新间隔(毫秒) rsf-server/src/main/resources/application-prod.yml
@@ -133,6 +133,12 @@ #判断是否校验合格后,才允许收货 flagReceiving: false rsf: cloudwms: notify: sync-cron: "0 0/1 * * * ?" recover-cron: "0 0/2 * * * ?" # HTTP 接口审计(rsf-http-audit,引入依赖即生效,可 enabled=false 关闭) # whitelist-only=true:仅 sys_http_audit_rule 命中规则才写审计;无规则时不落库。false:排除路径外全量记录。 # rule-cache-refresh-ms:规则表缓存刷新间隔(毫秒)