package com.vincent.rsf.server.manager.service.impl;
|
|
import com.vincent.rsf.framework.common.R;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
|
import com.vincent.rsf.server.common.service.RedisService;
|
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
|
import com.vincent.rsf.server.manager.mapper.CloudWmsNotifyLogMapper;
|
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
|
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
|
import com.vincent.rsf.server.system.entity.Config;
|
import com.vincent.rsf.server.system.service.ConfigService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import java.io.IOException;
|
import java.util.Date;
|
import java.util.ArrayList;
|
import java.util.Collections;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
@Slf4j
|
@Service
|
public class CloudWmsNotifyLogServiceImpl extends ServiceImpl<CloudWmsNotifyLogMapper, CloudWmsNotifyLog> implements CloudWmsNotifyLogService {
|
|
/** 单条待办「正在上报」Redis 占位秒数(SET NX EX) */
|
private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120;
|
/** sending=1 但 Redis 无占位:update_time 早于此时长(分钟)则补偿清零 */
|
private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 2;
|
|
@Autowired
|
private ConfigService configService;
|
@Autowired
|
private ObjectMapper objectMapper;
|
@Autowired(required = false)
|
private RedisService redisService;
|
|
private static final String CLOUD_WMS_REDIS_FLAG = "cloudwms";
|
|
private boolean useSendingRedis() {
|
return redisService != null
|
&& Boolean.TRUE.equals(redisService.initialize)
|
&& CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS > 0;
|
}
|
|
private static String sendingRedisSubKey(Long id) {
|
return "sending." + id;
|
}
|
|
/** Redis 占位存在且未过期时视为正在上报 */
|
private boolean isSendingHeldInRedis(Long id) {
|
if (id == null || !useSendingRedis()) {
|
return false;
|
}
|
try {
|
String v = redisService.getValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
|
return StringUtils.isNotBlank(v);
|
} catch (Exception e) {
|
return false;
|
}
|
}
|
|
@Override
|
public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) {
|
LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>()
|
// 仅查询数据库配置状态
|
.in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
|
.apply("(send_hold IS NULL OR send_hold = 0)")
|
// 仅查询可重试数据
|
.apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)")
|
// 仅查询已到重试时间的数据
|
.apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)")
|
//缺重试参数的不进入待发送列表
|
.isNotNull(CloudWmsNotifyLog::getMaxRetryCount)
|
.isNotNull(CloudWmsNotifyLog::getRetryIntervalSeconds)
|
.orderByAsc(CloudWmsNotifyLog::getLastNotifyTime)
|
.orderByAsc(CloudWmsNotifyLog::getId);
|
if (maxRetry >= 0) {
|
wrapper.lt(CloudWmsNotifyLog::getRetryCount, maxRetry);
|
}
|
if (limit < 0) {
|
return list(wrapper);
|
}
|
Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit));
|
return page(page, wrapper).getRecords();
|
}
|
|
@Override
|
public boolean tryClaimSending(Long id) {
|
if (id == null) {
|
return false;
|
}
|
boolean useRedis = useSendingRedis();
|
if (useRedis) {
|
boolean got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
|
CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
|
if (!got) {
|
CloudWmsNotifyLog cur = getById(id);
|
if (cur != null && (cur.getSending() == null || cur.getSending() == 0)) {
|
redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
|
got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
|
CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
|
}
|
if (!got) {
|
return false;
|
}
|
}
|
}
|
LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
|
u.eq(CloudWmsNotifyLog::getId, id)
|
.set(CloudWmsNotifyLog::getSending, 1)
|
.set(CloudWmsNotifyLog::getUpdateTime, new Date());
|
if (!useRedis) {
|
u.and(w -> w.isNull(CloudWmsNotifyLog::getSending).or().eq(CloudWmsNotifyLog::getSending, 0));
|
}
|
boolean ok = update(u);
|
if (!ok && useRedis) {
|
try {
|
redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
|
} catch (Exception ignored) {
|
}
|
}
|
return ok;
|
}
|
|
@Override
|
public void clearSending(Long id) {
|
if (id == null) {
|
return;
|
}
|
if (useSendingRedis()) {
|
try {
|
redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
|
} catch (Exception e) {
|
log.debug("云仓上报 clearSending Redis id={}", id, e);
|
}
|
}
|
LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
|
u.eq(CloudWmsNotifyLog::getId, id)
|
.set(CloudWmsNotifyLog::getSending, 0)
|
.set(CloudWmsNotifyLog::getUpdateTime, new Date());
|
update(u);
|
}
|
|
@Override
|
public void recoverStaleSendingWhenRedisMiss() {
|
Date threshold = new Date(System.currentTimeMillis() - STALE_SENDING_RECOVER_AFTER_MINUTES * 60_000L);
|
List<CloudWmsNotifyLog> rows = list(new LambdaQueryWrapper<CloudWmsNotifyLog>()
|
.eq(CloudWmsNotifyLog::getSending, 1)
|
.isNotNull(CloudWmsNotifyLog::getUpdateTime)
|
.lt(CloudWmsNotifyLog::getUpdateTime, threshold)
|
.last("LIMIT 500"));
|
if (rows.isEmpty()) {
|
return;
|
}
|
Date now = new Date();
|
int cleared = 0;
|
for (CloudWmsNotifyLog row : rows) {
|
Long id = row.getId();
|
if (id == null) {
|
continue;
|
}
|
if (useSendingRedis() && isSendingHeldInRedis(id)) {
|
continue;
|
}
|
LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
|
u.eq(CloudWmsNotifyLog::getId, id)
|
.eq(CloudWmsNotifyLog::getSending, 1)
|
.lt(CloudWmsNotifyLog::getUpdateTime, threshold)
|
.set(CloudWmsNotifyLog::getSending, 0)
|
.set(CloudWmsNotifyLog::getUpdateTime, now);
|
if (update(u)) {
|
cleared++;
|
}
|
}
|
if (cleared > 0) {
|
log.info("云仓待办 sending 补偿清零 {} 条", cleared);
|
}
|
}
|
|
@Override
|
@Transactional(rollbackFor = Exception.class)
|
public R manualFlushToNotifyByOrderCode(String orderCode, boolean inbound) {
|
if (StringUtils.isBlank(orderCode)) {
|
return R.error("单号不能为空");
|
}
|
int flag = inbound ? 1 : 0;
|
Date now = new Date();
|
LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
|
u.eq(CloudWmsNotifyLog::getReportType, getReportTypeInOutResult())
|
.eq(CloudWmsNotifyLog::getSourceOrderNo, orderCode.trim())
|
.eq(CloudWmsNotifyLog::getInboundFlag, flag)
|
.eq(CloudWmsNotifyLog::getSendHold, 1)
|
.in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
|
.set(CloudWmsNotifyLog::getSendHold, 0)
|
.set(CloudWmsNotifyLog::getUpdateTime, now);
|
int n = getBaseMapper().update(null, u);
|
if (n <= 0) {
|
return R.error("当前无待放行的入出库待办(请确认 manual 模式、单号与入/出库类型一致)");
|
}
|
return R.ok("已放行云仓上报待办 " + n + " 条").add(n);
|
}
|
|
@Override
|
public String getReportTypeInOutResult() {
|
return getConfigString(GlobalConfigCode.CLOUD_WMS_REPORT_TYPE_IN_OUT_RESULT, CloudWmsNotifyLog.REPORT_TYPE_IN_OUT_RESULT);
|
}
|
|
@Override
|
public String getReportTypeInventoryAdjust() {
|
return getConfigString(GlobalConfigCode.CLOUD_WMS_REPORT_TYPE_INVENTORY_ADJUST, CloudWmsNotifyLog.REPORT_TYPE_INVENTORY_ADJUST);
|
}
|
|
@Override
|
public int getNotifyStatusPending() {
|
return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_PENDING, CloudWmsNotifyLog.NOTIFY_STATUS_PENDING);
|
}
|
|
@Override
|
public int getNotifyStatusSuccess() {
|
return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_SUCCESS, CloudWmsNotifyLog.NOTIFY_STATUS_SUCCESS);
|
}
|
|
@Override
|
public int getNotifyStatusFail() {
|
return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_FAIL, CloudWmsNotifyLog.NOTIFY_STATUS_FAIL);
|
}
|
|
private String getConfigValTrimmed(String flag) {
|
Config c = configService.getCachedOrLoad(flag);
|
if (c == null || c.getVal() == null) {
|
return null;
|
}
|
String v = c.getVal().trim();
|
return v.isEmpty() ? null : v;
|
}
|
|
/** 与实体常量搭配:仅当库/缓存无有效 val 时用常量 */
|
private String getConfigString(String flag, String defaultVal) {
|
String v = getConfigValTrimmed(flag);
|
return v != null ? v : defaultVal;
|
}
|
|
private int getConfigInt(String flag, int defaultVal) {
|
Integer n = getConfigIntOrNull(flag);
|
return n != null ? n : defaultVal;
|
}
|
|
private Integer getConfigIntOrNull(String flag) {
|
String v = getConfigValTrimmed(flag);
|
if (v == null) {
|
return null;
|
}
|
try {
|
return Integer.parseInt(v);
|
} catch (NumberFormatException e) {
|
return null;
|
}
|
}
|
|
@Override
|
public void fillFromConfig(CloudWmsNotifyLog log) {
|
log.setNotifyStatus(getNotifyStatusPending());
|
log.setMaxRetryCount(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY));
|
log.setRetryIntervalSeconds(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS));
|
}
|
|
@Override
|
public String inOutMergeKeyFromRequestBody(String requestBody) {
|
return mergeKeyFromBody(requestBody);
|
}
|
|
@Override
|
public List<InOutResultReportParam> parseInOutLinesFromRequestBody(String requestBody) throws IOException {
|
return extractInOutLines(requestBody);
|
}
|
|
private String mergeKeyFromBody(String body) {
|
if (StringUtils.isBlank(body)) {
|
return null;
|
}
|
try {
|
JsonNode root = objectMapper.readTree(body);
|
if (root.has("lines") && root.get("lines").isArray() && root.get("lines").size() > 0) {
|
return null;
|
}
|
JsonNode first = root;
|
String orderNo = textNode(first, "orderNo");
|
if (StringUtils.isBlank(orderNo)) {
|
return null;
|
}
|
String wh = textNode(first, "wareHouseId");
|
boolean inbound = !first.has("inbound") || first.get("inbound").isNull() || first.get("inbound").asBoolean();
|
return orderNo + "\t" + inbound + "\t" + StringUtils.defaultString(wh);
|
} catch (Exception e) {
|
return null;
|
}
|
}
|
|
private static String textNode(JsonNode n, String field) {
|
if (n == null || !n.has(field) || n.get(field).isNull()) {
|
return null;
|
}
|
return n.get(field).asText();
|
}
|
|
private List<InOutResultReportParam> extractInOutLines(String body) throws java.io.IOException {
|
JsonNode root = objectMapper.readTree(body);
|
if (root.has("lines") && root.get("lines").isArray()) {
|
List<InOutResultReportParam> list = new ArrayList<>();
|
for (JsonNode n : root.get("lines")) {
|
list.add(objectMapper.treeToValue(n, InOutResultReportParam.class));
|
}
|
return list;
|
}
|
return Collections.singletonList(objectMapper.readValue(body, InOutResultReportParam.class));
|
}
|
}
|