cl
6 天以前 2a34b52125d5fc356d65ee1e8912845dd601d4e3
rsf-server/src/main/java/com/vincent/rsf/server/manager/service/impl/CloudWmsNotifyLogServiceImpl.java
@@ -1,34 +1,88 @@
package com.vincent.rsf.server.manager.service.impl;
import com.vincent.rsf.framework.common.R;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
import com.vincent.rsf.server.common.service.RedisService;
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
import com.vincent.rsf.server.manager.mapper.CloudWmsNotifyLogMapper;
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
import com.vincent.rsf.server.system.entity.Config;
import com.vincent.rsf.server.system.service.ConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.Date;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class CloudWmsNotifyLogServiceImpl extends ServiceImpl<CloudWmsNotifyLogMapper, CloudWmsNotifyLog> implements CloudWmsNotifyLogService {
    /** 单条待办「正在上报」Redis 占位秒数(SET NX EX) */
    private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120;
    /** sending=1 但 Redis 无占位:update_time 早于此时长(分钟)则补偿清零 */
    private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 6;
    @Autowired
    private ConfigService configService;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired(required = false)
    private RedisService redisService;
    private static final String CLOUD_WMS_REDIS_FLAG = "cloudwms";
    private boolean useSendingRedis() {
        return redisService != null
                && Boolean.TRUE.equals(redisService.initialize)
                && CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS > 0;
    }
    private static String sendingRedisSubKey(Long id) {
        return "sending." + id;
    }
    /** Redis 占位存在且未过期时视为正在上报 */
    private boolean isSendingHeldInRedis(Long id) {
        if (id == null || !useSendingRedis()) {
            return false;
        }
        try {
            String v = redisService.getValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
            return StringUtils.isNotBlank(v);
        } catch (Exception e) {
            return false;
        }
    }
    @Override
    public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) {
        LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>()
                // 仅查询数据库配置状态:待通知 + 失败(可重试)
                .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
                .apply("(send_hold IS NULL OR send_hold = 0)")
                // 仅查询可重试数据:无限重试、未配置上限、或未达到上限
                .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)")
                // 仅查询已到重试时间的数据,避免前 50 条未到间隔导致后续记录长期饥饿
                .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)")
                //缺重试参数的不进入待发送列表
                .isNotNull(CloudWmsNotifyLog::getMaxRetryCount)
                .isNotNull(CloudWmsNotifyLog::getRetryIntervalSeconds)
                .orderByAsc(CloudWmsNotifyLog::getLastNotifyTime)
                .orderByAsc(CloudWmsNotifyLog::getId);
        if (maxRetry >= 0) {
@@ -39,6 +93,122 @@
        }
        Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit));
        return page(page, wrapper).getRecords();
    }
    @Override
    public boolean tryClaimSending(Long id) {
        if (id == null) {
            return false;
        }
        boolean useRedis = useSendingRedis();
        if (useRedis) {
            boolean got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
                    CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
            if (!got) {
                CloudWmsNotifyLog cur = getById(id);
                if (cur != null && (cur.getSending() == null || cur.getSending() == 0)) {
                    redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
                    got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
                            CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
                }
                if (!got) {
                    return false;
                }
            }
        }
        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
        u.eq(CloudWmsNotifyLog::getId, id)
                .set(CloudWmsNotifyLog::getSending, 1)
                .set(CloudWmsNotifyLog::getUpdateTime, new Date());
        if (!useRedis) {
            u.and(w -> w.isNull(CloudWmsNotifyLog::getSending).or().eq(CloudWmsNotifyLog::getSending, 0));
        }
        boolean ok = update(u);
        if (!ok && useRedis) {
            try {
                redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
            } catch (Exception ignored) {
            }
        }
        return ok;
    }
    @Override
    public void clearSending(Long id) {
        if (id == null) {
            return;
        }
        if (useSendingRedis()) {
            try {
                redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
            } catch (Exception e) {
                log.debug("云仓上报 clearSending Redis id={}", id, e);
            }
        }
        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
        u.eq(CloudWmsNotifyLog::getId, id)
                .set(CloudWmsNotifyLog::getSending, 0)
                .set(CloudWmsNotifyLog::getUpdateTime, new Date());
        update(u);
    }
    @Override
    public void recoverStaleSendingWhenRedisMiss() {
        Date threshold = new Date(System.currentTimeMillis() - STALE_SENDING_RECOVER_AFTER_MINUTES * 60_000L);
        List<CloudWmsNotifyLog> rows = list(new LambdaQueryWrapper<CloudWmsNotifyLog>()
                .eq(CloudWmsNotifyLog::getSending, 1)
                .isNotNull(CloudWmsNotifyLog::getUpdateTime)
                .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
                .last("LIMIT 500"));
        if (rows.isEmpty()) {
            return;
        }
        Date now = new Date();
        int cleared = 0;
        for (CloudWmsNotifyLog row : rows) {
            Long id = row.getId();
            if (id == null) {
                continue;
            }
            if (useSendingRedis() && isSendingHeldInRedis(id)) {
                continue;
            }
            LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
            u.eq(CloudWmsNotifyLog::getId, id)
                    .eq(CloudWmsNotifyLog::getSending, 1)
                    .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
                    .set(CloudWmsNotifyLog::getSending, 0)
                    .set(CloudWmsNotifyLog::getUpdateTime, now);
            if (update(u)) {
                cleared++;
            }
        }
        if (cleared > 0) {
            log.info("云仓待办 sending 补偿清零 {} 条", cleared);
        }
    }
    @Override
    @Transactional(rollbackFor = Exception.class)
    public R manualFlushToNotifyByOrderCode(String orderCode, boolean inbound) {
        if (StringUtils.isBlank(orderCode)) {
            return R.error("单号不能为空");
        }
        int flag = inbound ? 1 : 0;
        Date now = new Date();
        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
        u.eq(CloudWmsNotifyLog::getReportType, getReportTypeInOutResult())
                .eq(CloudWmsNotifyLog::getSourceOrderNo, orderCode.trim())
                .eq(CloudWmsNotifyLog::getInboundFlag, flag)
                .eq(CloudWmsNotifyLog::getSendHold, 1)
                .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
                .set(CloudWmsNotifyLog::getSendHold, 0)
                .set(CloudWmsNotifyLog::getUpdateTime, now);
        int n = getBaseMapper().update(null, u);
        if (n <= 0) {
            return R.error("当前无待放行的入出库待办(请确认 manual 模式、单号与入/出库类型一致)");
        }
        return R.ok("已放行云仓上报待办 " + n + " 条").add(n);
    }
    @Override
@@ -66,43 +236,93 @@
        return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_FAIL, CloudWmsNotifyLog.NOTIFY_STATUS_FAIL);
    }
    private String getConfigString(String flag, String defaultVal) {
        Config c = configService.getOne(new LambdaQueryWrapper<Config>()
                .eq(Config::getFlag, flag)
                .orderByDesc(Config::getId)
                .last("LIMIT 1"));
        if (c != null && c.getVal() != null && !c.getVal().isEmpty()) {
            return c.getVal().trim();
    private String getConfigValTrimmed(String flag) {
        Config c = configService.getCachedOrLoad(flag);
        if (c == null || c.getVal() == null) {
            return null;
        }
        return defaultVal;
        String v = c.getVal().trim();
        return v.isEmpty() ? null : v;
    }
    /** 与实体常量搭配:仅当库/缓存无有效 val 时用常量 */
    private String getConfigString(String flag, String defaultVal) {
        String v = getConfigValTrimmed(flag);
        return v != null ? v : defaultVal;
    }
    private int getConfigInt(String flag, int defaultVal) {
        Integer v = getConfigInt(flag);
        return v != null ? v : defaultVal;
        Integer n = getConfigIntOrNull(flag);
        return n != null ? n : defaultVal;
    }
    private Integer getConfigIntOrNull(String flag) {
        String v = getConfigValTrimmed(flag);
        if (v == null) {
            return null;
        }
        try {
            return Integer.parseInt(v);
        } catch (NumberFormatException e) {
            return null;
        }
    }
    @Override
    public void fillFromConfig(CloudWmsNotifyLog log) {
        Integer maxRetry = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY);
        Integer interval = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS);
        log.setNotifyStatus(getNotifyStatusPending());
        log.setMaxRetryCount(maxRetry);
        log.setRetryIntervalSeconds(interval);
        log.setMaxRetryCount(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY));
        log.setRetryIntervalSeconds(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS));
    }
    /** 返回 null 表示未配置或解析失败 */
    private Integer getConfigInt(String flag) {
        try {
            Config c = configService.getOne(new LambdaQueryWrapper<Config>()
                    .eq(Config::getFlag, flag)
                    .orderByDesc(Config::getId)
                    .last("LIMIT 1"));
            if (c != null && c.getVal() != null && !c.getVal().isEmpty()) {
                return Integer.parseInt(c.getVal().trim());
    @Override
    public String inOutMergeKeyFromRequestBody(String requestBody) {
        return mergeKeyFromBody(requestBody);
            }
        } catch (Exception ignored) {
    @Override
    public List<InOutResultReportParam> parseInOutLinesFromRequestBody(String requestBody) throws IOException {
        return extractInOutLines(requestBody);
        }
    private String mergeKeyFromBody(String body) {
        if (StringUtils.isBlank(body)) {
        return null;
    }
        try {
            JsonNode root = objectMapper.readTree(body);
            if (root.has("lines") && root.get("lines").isArray() && root.get("lines").size() > 0) {
                return null;
            }
            JsonNode first = root;
            String orderNo = textNode(first, "orderNo");
            if (StringUtils.isBlank(orderNo)) {
                return null;
            }
            String wh = textNode(first, "wareHouseId");
            boolean inbound = !first.has("inbound") || first.get("inbound").isNull() || first.get("inbound").asBoolean();
            return orderNo + "\t" + inbound + "\t" + StringUtils.defaultString(wh);
        } catch (Exception e) {
            return null;
        }
    }
    private static String textNode(JsonNode n, String field) {
        if (n == null || !n.has(field) || n.get(field).isNull()) {
            return null;
        }
        return n.get(field).asText();
    }
    private List<InOutResultReportParam> extractInOutLines(String body) throws java.io.IOException {
        JsonNode root = objectMapper.readTree(body);
        if (root.has("lines") && root.get("lines").isArray()) {
            List<InOutResultReportParam> list = new ArrayList<>();
            for (JsonNode n : root.get("lines")) {
                list.add(objectMapper.treeToValue(n, InOutResultReportParam.class));
            }
            return list;
        }
        return Collections.singletonList(objectMapper.readValue(body, InOutResultReportParam.class));
    }
}