package com.vincent.rsf.server.manager.schedules;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload;
|
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
|
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
|
import com.vincent.rsf.server.api.service.CloudWmsReportService;
|
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
|
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
|
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
|
import com.vincent.rsf.server.system.entity.Config;
|
import com.vincent.rsf.server.system.service.ConfigService;
|
import feign.FeignException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.io.IOException;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.HashSet;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
|
/** 云仓上报定时任务 */
|
@Slf4j
|
@Component
|
public class CloudWmsNotifySchedule {
|
|
private static final int BATCH_LIMIT = -1;
|
private static final int STORE_BODY_MAX_CHARS_DEFAULT = 2000;
|
|
@Autowired
|
private CloudWmsNotifyLogService cloudWmsNotifyLogService;
|
@Autowired
|
private CloudWmsReportService cloudWmsReportService;
|
@Autowired
|
private ObjectMapper objectMapper;
|
@Autowired
|
private ConfigService configService;
|
|
/** sending=1 且 Redis 占位已失、update_time 超时:补偿清零 */
|
@Scheduled(cron = "0 0/2 * * * ?")
|
// @Scheduled(cron = "0/5 * * * * ?")
|
public void recoverStaleSending() {
|
try {
|
cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
|
} catch (Exception e) {
|
log.warn("云仓 sending 补偿任务异常:{}", e.getMessage());
|
}
|
}
|
|
@Scheduled(cron = "0/60 * * * * ?")
|
public void syncCloudWmsNotify() {
|
List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
|
if (pending.isEmpty()) {
|
log.debug("云仓上报调度:本轮待发送 0 条");
|
return;
|
}
|
log.info("云仓上报调度:本轮待发送 {} 条", pending.size());
|
dispatchPending(pending);
|
}
|
|
/** 同单多条合并上报 */
|
private void dispatchPending(List<CloudWmsNotifyLog> pending) {
|
String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
|
LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
|
for (CloudWmsNotifyLog row : pending) {
|
if (!rtInOut.equals(row.getReportType())) {
|
continue;
|
}
|
String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
|
if (key == null) {
|
continue;
|
}
|
inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
|
}
|
Set<Long> done = new HashSet<>();
|
for (CloudWmsNotifyLog row : pending) {
|
Long rid = row.getId();
|
if (rid != null && done.contains(rid)) {
|
continue;
|
}
|
if (!rtInOut.equals(row.getReportType())) {
|
safeProcessOne(row);
|
if (rid != null) {
|
done.add(rid);
|
}
|
continue;
|
}
|
String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
|
if (key == null) {
|
safeProcessOne(row);
|
if (rid != null) {
|
done.add(rid);
|
}
|
continue;
|
}
|
List<CloudWmsNotifyLog> g = inOutGroups.get(key);
|
if (g != null && g.size() >= 2) {
|
safeProcessMergedInOutGroup(g);
|
for (CloudWmsNotifyLog x : g) {
|
if (x.getId() != null) {
|
done.add(x.getId());
|
}
|
}
|
} else {
|
safeProcessOne(row);
|
if (rid != null) {
|
done.add(rid);
|
}
|
}
|
}
|
}
|
|
private void safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
|
List<Long> claimedIds = new ArrayList<>();
|
try {
|
for (CloudWmsNotifyLog row : group) {
|
Long id = row.getId();
|
if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
|
log.debug("云仓上报同批合并未抢到发送权 id={}", id);
|
return;
|
}
|
claimedIds.add(id);
|
}
|
processMergedInOut(group);
|
} catch (Exception e) {
|
log.warn("云仓上报同批合并异常:{}", e.getMessage());
|
} finally {
|
for (Long id : claimedIds) {
|
cloudWmsNotifyLogService.clearSending(id);
|
}
|
}
|
}
|
|
private void processMergedInOut(List<CloudWmsNotifyLog> group) {
|
Date now = new Date();
|
List<InOutResultReportParam> lines = new ArrayList<>();
|
try {
|
for (CloudWmsNotifyLog row : group) {
|
lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody()));
|
}
|
} catch (IOException e) {
|
String msg = "反序列化失败: " + e.getMessage();
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount());
|
}
|
return;
|
}
|
if (lines.isEmpty()) {
|
return;
|
}
|
String mergedBody;
|
try {
|
mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines));
|
} catch (JsonProcessingException e) {
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, row.getRequestBody(), "合并请求体序列化失败: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
|
}
|
return;
|
}
|
log.info("云仓上报开始(同单合并),ids={},requestBody={}", idsOf(group), mergedBody);
|
try {
|
Map<String, Object> res = cloudWmsReportService.reportInOutResults(lines);
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount());
|
}
|
} catch (FeignException e) {
|
String responseBody = e.contentUTF8();
|
String fullMsg = "status=" + e.status() + ",message=" + e.getMessage()
|
+ (responseBody == null || responseBody.isEmpty() ? "" : ",responseBody=" + responseBody);
|
log.warn("云仓上报同批合并请求失败:{}", fullMsg);
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, mergedBody, "请求异常: " + fullMsg, nextRetry, now, row.getMaxRetryCount());
|
}
|
} catch (Exception e) {
|
log.warn("云仓上报同批合并请求失败:{}", e.getMessage());
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, mergedBody, "请求异常: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
|
}
|
}
|
}
|
|
private static List<Long> idsOf(List<CloudWmsNotifyLog> group) {
|
List<Long> ids = new ArrayList<>(group.size());
|
for (CloudWmsNotifyLog row : group) {
|
ids.add(row.getId());
|
}
|
return ids;
|
}
|
|
private void safeProcessOne(CloudWmsNotifyLog logRecord) {
|
Long id = logRecord.getId();
|
if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
|
log.debug("云仓上报未抢到发送权 id={}", id);
|
return;
|
}
|
try {
|
processOne(logRecord);
|
} catch (Exception e) {
|
log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
|
} finally {
|
cloudWmsNotifyLogService.clearSending(id);
|
}
|
}
|
|
private void processOne(CloudWmsNotifyLog logRecord) {
|
String reportType = logRecord.getReportType();
|
String requestBody = logRecord.getRequestBody();
|
Date now = new Date();
|
int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
|
int effectiveMaxRetry = logRecord.getMaxRetryCount();
|
String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
|
String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust();
|
log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}",
|
logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
|
|
try {
|
if (rtInOut.equals(reportType)) {
|
JsonNode root = objectMapper.readTree(requestBody);
|
Map<String, Object> res;
|
if (root.has("lines") && root.get("lines").isArray()) {
|
InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class);
|
res = cloudWmsReportService.reportInOutResults(batch.getLines());
|
} else {
|
InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
|
res = cloudWmsReportService.reportInOutResult(param);
|
}
|
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
|
} else if (rtAdj.equals(reportType)) {
|
InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
|
Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
|
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
|
} else {
|
log.warn("未知上报类型,id={},reportType={}", logRecord.getId(), reportType);
|
return;
|
}
|
} catch (JsonProcessingException e) {
|
log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage());
|
setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
|
} catch (FeignException e) {
|
String responseBody = e.contentUTF8();
|
String fullMsg = "status=" + e.status() + ",message=" + e.getMessage()
|
+ (responseBody == null || responseBody.isEmpty() ? "" : ",responseBody=" + responseBody);
|
log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), fullMsg);
|
setFailResult(logRecord, requestBody, "请求异常: " + fullMsg, nextRetry, now, effectiveMaxRetry);
|
} catch (Exception e) {
|
log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
|
setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
|
}
|
}
|
|
private void updateAfterNotify(CloudWmsNotifyLog logRecord, String requestBody, Map<String, Object> res, int nextRetry, Date now, int effectiveMaxRetry) {
|
String responseJson;
|
try {
|
responseJson = res != null ? objectMapper.writeValueAsString(res) : "null";
|
} catch (JsonProcessingException e) {
|
responseJson = String.valueOf(res);
|
}
|
Object codeObj = res != null ? res.get("code") : null;
|
Object statusObj = res != null ? res.get("status") : null;
|
boolean success = Integer.valueOf(200).equals(codeObj) || Integer.valueOf(200).equals(statusObj);
|
int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusFail();
|
logRecord.setLastRequestBody(truncateForStore(requestBody));
|
logRecord.setLastResponseBody(truncateForStore(responseJson));
|
logRecord.setLastNotifyTime(now);
|
logRecord.setRetryCount(nextRetry);
|
logRecord.setNotifyStatus(status);
|
logRecord.setSending(0);
|
logRecord.setUpdateTime(now);
|
cloudWmsNotifyLogService.updateById(logRecord);
|
log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}",
|
logRecord.getId(), logRecord.getBizRef(), nextRetry, status, responseJson);
|
}
|
|
private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) {
|
logRecord.setLastRequestBody(truncateForStore(requestBody));
|
logRecord.setLastResponseBody(truncateForStore(errorMsg));
|
logRecord.setLastNotifyTime(now);
|
logRecord.setRetryCount(nextRetry);
|
int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
|
? cloudWmsNotifyLogService.getNotifyStatusFail()
|
: cloudWmsNotifyLogService.getNotifyStatusPending();
|
logRecord.setNotifyStatus(status);
|
logRecord.setSending(0);
|
logRecord.setUpdateTime(now);
|
cloudWmsNotifyLogService.updateById(logRecord);
|
log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}",
|
logRecord.getId(), logRecord.getBizRef(), nextRetry, logRecord.getNotifyStatus(), errorMsg);
|
}
|
|
/** maxRetry = -1 表示无限重发 */
|
private boolean isInfiniteRetry(Integer maxRetry) {
|
return maxRetry != null && maxRetry == -1;
|
}
|
|
private String truncateForStore(String body) {
|
int maxChars = resolveStoreBodyMaxChars();
|
if (body == null || body.length() <= maxChars) {
|
return body;
|
}
|
return body.substring(0, maxChars);
|
}
|
|
private int resolveStoreBodyMaxChars() {
|
try {
|
Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_NOTIFY_STORE_BODY_MAX_CHARS);
|
if (cfg == null || cfg.getVal() == null || cfg.getVal().trim().isEmpty()) {
|
return STORE_BODY_MAX_CHARS_DEFAULT;
|
}
|
int parsed = Integer.parseInt(cfg.getVal().trim());
|
return parsed > 0 ? parsed : STORE_BODY_MAX_CHARS_DEFAULT;
|
} catch (Exception e) {
|
return STORE_BODY_MAX_CHARS_DEFAULT;
|
}
|
}
|
}
|