package com.vincent.rsf.server.manager.service.impl; import com.vincent.rsf.framework.common.R; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam; import com.vincent.rsf.server.common.service.RedisService; import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog; import com.vincent.rsf.server.manager.mapper.CloudWmsNotifyLogMapper; import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService; import com.vincent.rsf.server.system.constant.GlobalConfigCode; import com.vincent.rsf.server.system.entity.Config; import com.vincent.rsf.server.system.service.ConfigService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.IOException; import java.util.Date; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @Slf4j @Service public class CloudWmsNotifyLogServiceImpl extends ServiceImpl implements CloudWmsNotifyLogService { /** 单条待办「正在上报」Redis 占位秒数(SET NX EX) */ private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120; /** sending=1 但 Redis 无占位:update_time 早于此时长(分钟)则补偿清零 */ private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 2; @Autowired private ConfigService configService; @Autowired private ObjectMapper objectMapper; @Autowired(required = false) private RedisService redisService; private static final String CLOUD_WMS_REDIS_FLAG = "cloudwms"; private boolean useSendingRedis() { return redisService != null && Boolean.TRUE.equals(redisService.initialize) && CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS > 0; } private static String sendingRedisSubKey(Long id) { return "sending." + id; } /** Redis 占位存在且未过期时视为正在上报 */ private boolean isSendingHeldInRedis(Long id) { if (id == null || !useSendingRedis()) { return false; } try { String v = redisService.getValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); return StringUtils.isNotBlank(v); } catch (Exception e) { return false; } } @Override public List listPending(int limit, int maxRetry) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper() // 仅查询数据库配置状态 .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail()) .apply("(send_hold IS NULL OR send_hold = 0)") // 仅查询可重试数据 .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)") // 仅查询已到重试时间的数据 .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)") //缺重试参数的不进入待发送列表 .isNotNull(CloudWmsNotifyLog::getMaxRetryCount) .isNotNull(CloudWmsNotifyLog::getRetryIntervalSeconds) .orderByAsc(CloudWmsNotifyLog::getLastNotifyTime) .orderByAsc(CloudWmsNotifyLog::getId); if (maxRetry >= 0) { wrapper.lt(CloudWmsNotifyLog::getRetryCount, maxRetry); } if (limit < 0) { return list(wrapper); } Page page = new Page<>(1, Math.max(1, limit)); return page(page, wrapper).getRecords(); } @Override public boolean tryClaimSending(Long id) { if (id == null) { return false; } boolean useRedis = useSendingRedis(); if (useRedis) { boolean got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1", CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS); if (!got) { CloudWmsNotifyLog cur = getById(id); if (cur != null && (cur.getSending() == null || cur.getSending() == 0)) { redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1", CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS); } if (!got) { return false; } } } LambdaUpdateWrapper u = new LambdaUpdateWrapper<>(); u.eq(CloudWmsNotifyLog::getId, id) .set(CloudWmsNotifyLog::getSending, 1) .set(CloudWmsNotifyLog::getUpdateTime, new Date()); if (!useRedis) { u.and(w -> w.isNull(CloudWmsNotifyLog::getSending).or().eq(CloudWmsNotifyLog::getSending, 0)); } boolean ok = update(u); if (!ok && useRedis) { try { redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); } catch (Exception ignored) { } } return ok; } @Override public void clearSending(Long id) { if (id == null) { return; } if (useSendingRedis()) { try { redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); } catch (Exception e) { log.debug("云仓上报 clearSending Redis id={}", id, e); } } LambdaUpdateWrapper u = new LambdaUpdateWrapper<>(); u.eq(CloudWmsNotifyLog::getId, id) .set(CloudWmsNotifyLog::getSending, 0) .set(CloudWmsNotifyLog::getUpdateTime, new Date()); update(u); } @Override public void recoverStaleSendingWhenRedisMiss() { Date threshold = new Date(System.currentTimeMillis() - STALE_SENDING_RECOVER_AFTER_MINUTES * 60_000L); List rows = list(new LambdaQueryWrapper() .eq(CloudWmsNotifyLog::getSending, 1) .isNotNull(CloudWmsNotifyLog::getUpdateTime) .lt(CloudWmsNotifyLog::getUpdateTime, threshold) .last("LIMIT 500")); if (rows.isEmpty()) { return; } Date now = new Date(); int cleared = 0; for (CloudWmsNotifyLog row : rows) { Long id = row.getId(); if (id == null) { continue; } if (useSendingRedis() && isSendingHeldInRedis(id)) { continue; } LambdaUpdateWrapper u = new LambdaUpdateWrapper<>(); u.eq(CloudWmsNotifyLog::getId, id) .eq(CloudWmsNotifyLog::getSending, 1) .lt(CloudWmsNotifyLog::getUpdateTime, threshold) .set(CloudWmsNotifyLog::getSending, 0) .set(CloudWmsNotifyLog::getUpdateTime, now); if (update(u)) { cleared++; } } if (cleared > 0) { log.info("云仓待办 sending 补偿清零 {} 条", cleared); } } @Override @Transactional(rollbackFor = Exception.class) public R manualFlushToNotifyByOrderCode(String orderCode, boolean inbound) { if (StringUtils.isBlank(orderCode)) { return R.error("单号不能为空"); } int flag = inbound ? 1 : 0; Date now = new Date(); LambdaUpdateWrapper u = new LambdaUpdateWrapper<>(); u.eq(CloudWmsNotifyLog::getReportType, getReportTypeInOutResult()) .eq(CloudWmsNotifyLog::getSourceOrderNo, orderCode.trim()) .eq(CloudWmsNotifyLog::getInboundFlag, flag) .eq(CloudWmsNotifyLog::getSendHold, 1) .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail()) .set(CloudWmsNotifyLog::getSendHold, 0) .set(CloudWmsNotifyLog::getUpdateTime, now); int n = getBaseMapper().update(null, u); if (n <= 0) { return R.error("当前无待放行的入出库待办(请确认 manual 模式、单号与入/出库类型一致)"); } return R.ok("已放行云仓上报待办 " + n + " 条").add(n); } @Override public String getReportTypeInOutResult() { return getConfigString(GlobalConfigCode.CLOUD_WMS_REPORT_TYPE_IN_OUT_RESULT, CloudWmsNotifyLog.REPORT_TYPE_IN_OUT_RESULT); } @Override public String getReportTypeInventoryAdjust() { return getConfigString(GlobalConfigCode.CLOUD_WMS_REPORT_TYPE_INVENTORY_ADJUST, CloudWmsNotifyLog.REPORT_TYPE_INVENTORY_ADJUST); } @Override public int getNotifyStatusPending() { return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_PENDING, CloudWmsNotifyLog.NOTIFY_STATUS_PENDING); } @Override public int getNotifyStatusSuccess() { return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_SUCCESS, CloudWmsNotifyLog.NOTIFY_STATUS_SUCCESS); } @Override public int getNotifyStatusFail() { return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_FAIL, CloudWmsNotifyLog.NOTIFY_STATUS_FAIL); } private String getConfigValTrimmed(String flag) { Config c = configService.getCachedOrLoad(flag); if (c == null || c.getVal() == null) { return null; } String v = c.getVal().trim(); return v.isEmpty() ? null : v; } /** 与实体常量搭配:仅当库/缓存无有效 val 时用常量 */ private String getConfigString(String flag, String defaultVal) { String v = getConfigValTrimmed(flag); return v != null ? v : defaultVal; } private int getConfigInt(String flag, int defaultVal) { Integer n = getConfigIntOrNull(flag); return n != null ? n : defaultVal; } private Integer getConfigIntOrNull(String flag) { String v = getConfigValTrimmed(flag); if (v == null) { return null; } try { return Integer.parseInt(v); } catch (NumberFormatException e) { return null; } } @Override public void fillFromConfig(CloudWmsNotifyLog log) { log.setNotifyStatus(getNotifyStatusPending()); log.setMaxRetryCount(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY)); log.setRetryIntervalSeconds(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS)); } @Override public String inOutMergeKeyFromRequestBody(String requestBody) { return mergeKeyFromBody(requestBody); } @Override public List parseInOutLinesFromRequestBody(String requestBody) throws IOException { return extractInOutLines(requestBody); } private String mergeKeyFromBody(String body) { if (StringUtils.isBlank(body)) { return null; } try { JsonNode root = objectMapper.readTree(body); if (root.has("lines") && root.get("lines").isArray() && root.get("lines").size() > 0) { return null; } JsonNode first = root; String orderNo = textNode(first, "orderNo"); if (StringUtils.isBlank(orderNo)) { return null; } String wh = textNode(first, "wareHouseId"); boolean inbound = !first.has("inbound") || first.get("inbound").isNull() || first.get("inbound").asBoolean(); return orderNo + "\t" + inbound + "\t" + StringUtils.defaultString(wh); } catch (Exception e) { return null; } } private static String textNode(JsonNode n, String field) { if (n == null || !n.has(field) || n.get(field).isNull()) { return null; } return n.get(field).asText(); } private List extractInOutLines(String body) throws java.io.IOException { JsonNode root = objectMapper.readTree(body); if (root.has("lines") && root.get("lines").isArray()) { List list = new ArrayList<>(); for (JsonNode n : root.get("lines")) { list.add(objectMapper.treeToValue(n, InOutResultReportParam.class)); } return list; } return Collections.singletonList(objectMapper.readValue(body, InOutResultReportParam.class)); } }