| | |
| | | package com.vincent.rsf.server.manager.service.impl; |
| | | |
| | | import com.vincent.rsf.framework.common.R; |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
| | | import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
| | | import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
| | | import com.fasterxml.jackson.databind.JsonNode; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam; |
| | | import com.vincent.rsf.server.common.service.RedisService; |
| | | import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog; |
| | | import com.vincent.rsf.server.manager.mapper.CloudWmsNotifyLogMapper; |
| | | import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService; |
| | | import com.vincent.rsf.server.system.constant.GlobalConfigCode; |
| | | import com.vincent.rsf.server.system.entity.Config; |
| | | import com.vincent.rsf.server.system.service.ConfigService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.transaction.annotation.Transactional; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.Date; |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | @Slf4j |
| | | @Service |
| | | public class CloudWmsNotifyLogServiceImpl extends ServiceImpl<CloudWmsNotifyLogMapper, CloudWmsNotifyLog> implements CloudWmsNotifyLogService { |
| | | |
| | | /** 单条待办「正在上报」Redis 占位秒数(SET NX EX) */ |
| | | private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120; |
| | | /** sending=1 但 Redis 无占位:update_time 早于此时长(分钟)则补偿清零 */ |
| | | private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 2; |
| | | |
| | | @Autowired |
| | | private ConfigService configService; |
| | | @Autowired |
| | | private ObjectMapper objectMapper; |
| | | @Autowired(required = false) |
| | | private RedisService redisService; |
| | | |
| | | private static final String CLOUD_WMS_REDIS_FLAG = "cloudwms"; |
| | | |
| | | private boolean useSendingRedis() { |
| | | return redisService != null |
| | | && Boolean.TRUE.equals(redisService.initialize) |
| | | && CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS > 0; |
| | | } |
| | | |
| | | private static String sendingRedisSubKey(Long id) { |
| | | return "sending." + id; |
| | | } |
| | | |
| | | /** Redis 占位存在且未过期时视为正在上报 */ |
| | | private boolean isSendingHeldInRedis(Long id) { |
| | | if (id == null || !useSendingRedis()) { |
| | | return false; |
| | | } |
| | | try { |
| | | String v = redisService.getValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); |
| | | return StringUtils.isNotBlank(v); |
| | | } catch (Exception e) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) { |
| | | Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit)); |
| | | LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>() |
| | | .eq(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending()) |
| | | .lt(CloudWmsNotifyLog::getRetryCount, maxRetry) |
| | | // 仅查询数据库配置状态 |
| | | .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail()) |
| | | .apply("(send_hold IS NULL OR send_hold = 0)") |
| | | // 仅查询可重试数据 |
| | | .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)") |
| | | // 仅查询已到重试时间的数据 |
| | | .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)") |
| | | //缺重试参数的不进入待发送列表 |
| | | .isNotNull(CloudWmsNotifyLog::getMaxRetryCount) |
| | | .isNotNull(CloudWmsNotifyLog::getRetryIntervalSeconds) |
| | | .orderByAsc(CloudWmsNotifyLog::getLastNotifyTime) |
| | | .orderByAsc(CloudWmsNotifyLog::getId); |
| | | if (maxRetry >= 0) { |
| | | wrapper.lt(CloudWmsNotifyLog::getRetryCount, maxRetry); |
| | | } |
| | | if (limit < 0) { |
| | | return list(wrapper); |
| | | } |
| | | Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit)); |
| | | return page(page, wrapper).getRecords(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean tryClaimSending(Long id) { |
| | | if (id == null) { |
| | | return false; |
| | | } |
| | | boolean useRedis = useSendingRedis(); |
| | | if (useRedis) { |
| | | boolean got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1", |
| | | CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS); |
| | | if (!got) { |
| | | CloudWmsNotifyLog cur = getById(id); |
| | | if (cur != null && (cur.getSending() == null || cur.getSending() == 0)) { |
| | | redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); |
| | | got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1", |
| | | CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS); |
| | | } |
| | | if (!got) { |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>(); |
| | | u.eq(CloudWmsNotifyLog::getId, id) |
| | | .set(CloudWmsNotifyLog::getSending, 1) |
| | | .set(CloudWmsNotifyLog::getUpdateTime, new Date()); |
| | | if (!useRedis) { |
| | | u.and(w -> w.isNull(CloudWmsNotifyLog::getSending).or().eq(CloudWmsNotifyLog::getSending, 0)); |
| | | } |
| | | boolean ok = update(u); |
| | | if (!ok && useRedis) { |
| | | try { |
| | | redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); |
| | | } catch (Exception ignored) { |
| | | } |
| | | } |
| | | return ok; |
| | | } |
| | | |
| | | @Override |
| | | public void clearSending(Long id) { |
| | | if (id == null) { |
| | | return; |
| | | } |
| | | if (useSendingRedis()) { |
| | | try { |
| | | redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id)); |
| | | } catch (Exception e) { |
| | | log.debug("云仓上报 clearSending Redis id={}", id, e); |
| | | } |
| | | } |
| | | LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>(); |
| | | u.eq(CloudWmsNotifyLog::getId, id) |
| | | .set(CloudWmsNotifyLog::getSending, 0) |
| | | .set(CloudWmsNotifyLog::getUpdateTime, new Date()); |
| | | update(u); |
| | | } |
| | | |
| | | @Override |
| | | public void recoverStaleSendingWhenRedisMiss() { |
| | | Date threshold = new Date(System.currentTimeMillis() - STALE_SENDING_RECOVER_AFTER_MINUTES * 60_000L); |
| | | List<CloudWmsNotifyLog> rows = list(new LambdaQueryWrapper<CloudWmsNotifyLog>() |
| | | .eq(CloudWmsNotifyLog::getSending, 1) |
| | | .isNotNull(CloudWmsNotifyLog::getUpdateTime) |
| | | .lt(CloudWmsNotifyLog::getUpdateTime, threshold) |
| | | .last("LIMIT 500")); |
| | | if (rows.isEmpty()) { |
| | | return; |
| | | } |
| | | Date now = new Date(); |
| | | int cleared = 0; |
| | | for (CloudWmsNotifyLog row : rows) { |
| | | Long id = row.getId(); |
| | | if (id == null) { |
| | | continue; |
| | | } |
| | | if (useSendingRedis() && isSendingHeldInRedis(id)) { |
| | | continue; |
| | | } |
| | | LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>(); |
| | | u.eq(CloudWmsNotifyLog::getId, id) |
| | | .eq(CloudWmsNotifyLog::getSending, 1) |
| | | .lt(CloudWmsNotifyLog::getUpdateTime, threshold) |
| | | .set(CloudWmsNotifyLog::getSending, 0) |
| | | .set(CloudWmsNotifyLog::getUpdateTime, now); |
| | | if (update(u)) { |
| | | cleared++; |
| | | } |
| | | } |
| | | if (cleared > 0) { |
| | | log.info("云仓待办 sending 补偿清零 {} 条", cleared); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | @Transactional(rollbackFor = Exception.class) |
| | | public R manualFlushToNotifyByOrderCode(String orderCode, boolean inbound) { |
| | | if (StringUtils.isBlank(orderCode)) { |
| | | return R.error("单号不能为空"); |
| | | } |
| | | int flag = inbound ? 1 : 0; |
| | | Date now = new Date(); |
| | | LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>(); |
| | | u.eq(CloudWmsNotifyLog::getReportType, getReportTypeInOutResult()) |
| | | .eq(CloudWmsNotifyLog::getSourceOrderNo, orderCode.trim()) |
| | | .eq(CloudWmsNotifyLog::getInboundFlag, flag) |
| | | .eq(CloudWmsNotifyLog::getSendHold, 1) |
| | | .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail()) |
| | | .set(CloudWmsNotifyLog::getSendHold, 0) |
| | | .set(CloudWmsNotifyLog::getUpdateTime, now); |
| | | int n = getBaseMapper().update(null, u); |
| | | if (n <= 0) { |
| | | return R.error("当前无待放行的入出库待办(请确认 manual 模式、单号与入/出库类型一致)"); |
| | | } |
| | | return R.ok("已放行云仓上报待办 " + n + " 条").add(n); |
| | | } |
| | | |
| | | @Override |
| | |
| | | return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_FAIL, CloudWmsNotifyLog.NOTIFY_STATUS_FAIL); |
| | | } |
| | | |
| | | private String getConfigString(String flag, String defaultVal) { |
| | | Config c = configService.getOne(new LambdaQueryWrapper<Config>().eq(Config::getFlag, flag).last("LIMIT 1")); |
| | | if (c != null && c.getVal() != null && !c.getVal().isEmpty()) { |
| | | return c.getVal().trim(); |
| | | private String getConfigValTrimmed(String flag) { |
| | | Config c = configService.getCachedOrLoad(flag); |
| | | if (c == null || c.getVal() == null) { |
| | | return null; |
| | | } |
| | | return defaultVal; |
| | | String v = c.getVal().trim(); |
| | | return v.isEmpty() ? null : v; |
| | | } |
| | | |
| | | /** 与实体常量搭配:仅当库/缓存无有效 val 时用常量 */ |
| | | private String getConfigString(String flag, String defaultVal) { |
| | | String v = getConfigValTrimmed(flag); |
| | | return v != null ? v : defaultVal; |
| | | } |
| | | |
| | | private int getConfigInt(String flag, int defaultVal) { |
| | | Integer v = getConfigInt(flag); |
| | | return v != null ? v : defaultVal; |
| | | Integer n = getConfigIntOrNull(flag); |
| | | return n != null ? n : defaultVal; |
| | | } |
| | | |
| | | private Integer getConfigIntOrNull(String flag) { |
| | | String v = getConfigValTrimmed(flag); |
| | | if (v == null) { |
| | | return null; |
| | | } |
| | | try { |
| | | return Integer.parseInt(v); |
| | | } catch (NumberFormatException e) { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void fillFromConfig(CloudWmsNotifyLog log) { |
| | | Integer maxRetry = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY); |
| | | Integer interval = getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS); |
| | | log.setMaxRetryCount(maxRetry); |
| | | log.setRetryIntervalSeconds(interval); |
| | | log.setNotifyStatus(getNotifyStatusPending()); |
| | | log.setMaxRetryCount(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY)); |
| | | log.setRetryIntervalSeconds(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS)); |
| | | } |
| | | |
| | | /** 返回 null 表示未配置或解析失败 */ |
| | | private Integer getConfigInt(String flag) { |
| | | try { |
| | | Config c = configService.getOne(new LambdaQueryWrapper<Config>().eq(Config::getFlag, flag).last("LIMIT 1")); |
| | | if (c != null && c.getVal() != null && !c.getVal().isEmpty()) { |
| | | return Integer.parseInt(c.getVal().trim()); |
| | | } |
| | | } catch (Exception ignored) { |
| | | @Override |
| | | public String inOutMergeKeyFromRequestBody(String requestBody) { |
| | | return mergeKeyFromBody(requestBody); |
| | | } |
| | | |
| | | @Override |
| | | public List<InOutResultReportParam> parseInOutLinesFromRequestBody(String requestBody) throws IOException { |
| | | return extractInOutLines(requestBody); |
| | | } |
| | | |
| | | private String mergeKeyFromBody(String body) { |
| | | if (StringUtils.isBlank(body)) { |
| | | return null; |
| | | } |
| | | return null; |
| | | try { |
| | | JsonNode root = objectMapper.readTree(body); |
| | | if (root.has("lines") && root.get("lines").isArray() && root.get("lines").size() > 0) { |
| | | return null; |
| | | } |
| | | JsonNode first = root; |
| | | String orderNo = textNode(first, "orderNo"); |
| | | if (StringUtils.isBlank(orderNo)) { |
| | | return null; |
| | | } |
| | | String wh = textNode(first, "wareHouseId"); |
| | | boolean inbound = !first.has("inbound") || first.get("inbound").isNull() || first.get("inbound").asBoolean(); |
| | | return orderNo + "\t" + inbound + "\t" + StringUtils.defaultString(wh); |
| | | } catch (Exception e) { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | private static String textNode(JsonNode n, String field) { |
| | | if (n == null || !n.has(field) || n.get(field).isNull()) { |
| | | return null; |
| | | } |
| | | return n.get(field).asText(); |
| | | } |
| | | |
| | | private List<InOutResultReportParam> extractInOutLines(String body) throws java.io.IOException { |
| | | JsonNode root = objectMapper.readTree(body); |
| | | if (root.has("lines") && root.get("lines").isArray()) { |
| | | List<InOutResultReportParam> list = new ArrayList<>(); |
| | | for (JsonNode n : root.get("lines")) { |
| | | list.add(objectMapper.treeToValue(n, InOutResultReportParam.class)); |
| | | } |
| | | return list; |
| | | } |
| | | return Collections.singletonList(objectMapper.readValue(body, InOutResultReportParam.class)); |
| | | } |
| | | } |