package com.vincent.rsf.server.manager.schedules;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.vincent.rsf.server.api.controller.erp.params.InOutResultBatchPayload;
|
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
|
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
|
import com.vincent.rsf.server.api.service.CloudWmsReportService;
|
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
|
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
|
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
|
import com.vincent.rsf.server.system.entity.Config;
|
import com.vincent.rsf.server.system.service.ConfigService;
|
import feign.FeignException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.io.IOException;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.HashSet;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
|
/** 云仓上报定时任务 */
|
@Slf4j
|
@Component
|
public class CloudWmsNotifySchedule {
|
|
private static final int BATCH_LIMIT = -1;
|
private static final int STORE_BODY_MAX_CHARS_DEFAULT = 2000;
|
|
@Autowired
|
private CloudWmsNotifyLogService cloudWmsNotifyLogService;
|
@Autowired
|
private CloudWmsReportService cloudWmsReportService;
|
@Autowired
|
private ObjectMapper objectMapper;
|
@Autowired
|
private ConfigService configService;
|
|
@Scheduled(cron = "${rsf.cloudwms.notify.recover-cron}")
|
public void recoverStaleSending() {
|
try {
|
cloudWmsNotifyLogService.recoverStaleSendingWhenRedisMiss();
|
} catch (Exception e) {
|
log.warn("云仓 sending 补偿任务异常:{}", e.getMessage());
|
}
|
}
|
|
@Scheduled(cron = "${rsf.cloudwms.notify.sync-cron}")
|
public void syncCloudWmsNotify() {
|
long t0 = System.currentTimeMillis();
|
try {
|
List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
|
if (pending.isEmpty()) {
|
log.debug("云仓上报调度:本轮待发送 0 条");
|
return;
|
}
|
log.debug("云仓上报调度:本轮待发送 {} 条", pending.size());
|
log.debug("云仓上报调度:开始派发");
|
dispatchPending(pending);
|
log.debug("云仓上报调度:派发完成");
|
log.debug("云仓上报调度:本轮调度结束,耗时 {} ms", System.currentTimeMillis() - t0);
|
} catch (Exception e) {
|
log.error("云仓上报调度异常,已耗时 {} ms", System.currentTimeMillis() - t0, e);
|
}
|
}
|
|
private static boolean isSendingBusy(CloudWmsNotifyLog row) {
|
Integer s = row.getSending();
|
return s != null && s != 0;
|
}
|
|
/** 同单多条合并上报 */
|
private void dispatchPending(List<CloudWmsNotifyLog> pending) {
|
int rowsReported = 0;
|
log.debug("云仓上报派发:开始,待办 {} 条", pending.size());
|
long tCfg = System.currentTimeMillis();
|
log.debug("云仓上报派发:读取上报类型配置");
|
String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
|
log.debug("云仓上报派发:上报类型={},读配置耗时 {} ms", rtInOut, System.currentTimeMillis() - tCfg);
|
LinkedHashMap<String, List<CloudWmsNotifyLog>> inOutGroups = new LinkedHashMap<>();
|
int n = pending.size();
|
for (int i = 0; i < n; i++) {
|
CloudWmsNotifyLog row = pending.get(i);
|
if (i == 0 || i == n - 1 || (i + 1) % 5 == 0) {
|
log.debug("云仓上报派发:分组进度 {}/{},id={}", i + 1, n, row.getId());
|
}
|
if (!rtInOut.equals(row.getReportType())) {
|
continue;
|
}
|
String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
|
if (key == null) {
|
continue;
|
}
|
inOutGroups.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
|
}
|
log.debug("云仓上报派发:入出库可分组键 {} 个", inOutGroups.size());
|
Set<Long> done = new HashSet<>();
|
for (CloudWmsNotifyLog row : pending) {
|
Long rid = row.getId();
|
if (rid != null && done.contains(rid)) {
|
continue;
|
}
|
if (!rtInOut.equals(row.getReportType())) {
|
if (isSendingBusy(row)) {
|
continue;
|
}
|
log.debug("云仓上报派发:其它类型单条 id={}", rid);
|
if (safeProcessOne(row)) {
|
rowsReported++;
|
if (rid != null) {
|
done.add(rid);
|
}
|
}
|
continue;
|
}
|
String key = cloudWmsNotifyLogService.inOutMergeKeyFromRequestBody(row.getRequestBody());
|
if (key == null) {
|
if (isSendingBusy(row)) {
|
continue;
|
}
|
log.debug("云仓上报派发:无合并键单条 id={}", rid);
|
if (safeProcessOne(row)) {
|
rowsReported++;
|
if (rid != null) {
|
done.add(rid);
|
}
|
}
|
continue;
|
}
|
List<CloudWmsNotifyLog> g = inOutGroups.get(key);
|
List<CloudWmsNotifyLog> work = new ArrayList<>();
|
if (g != null) {
|
for (CloudWmsNotifyLog x : g) {
|
if (x.getId() == null || done.contains(x.getId())) {
|
continue;
|
}
|
if (isSendingBusy(x)) {
|
continue;
|
}
|
work.add(x);
|
}
|
}
|
if (work.isEmpty()) {
|
continue;
|
}
|
if (work.size() >= 2) {
|
log.debug("云仓上报派发:同单合并 {} 条", work.size());
|
Set<Long> handled = safeProcessMergedInOutGroup(work);
|
done.addAll(handled);
|
rowsReported += handled.size();
|
} else {
|
log.debug("云仓上报派发:入出库单条 id={}", work.get(0).getId());
|
if (safeProcessOne(work.get(0))) {
|
rowsReported++;
|
if (work.get(0).getId() != null) {
|
done.add(work.get(0).getId());
|
}
|
}
|
}
|
}
|
log.debug("云仓上报派发:遍历结束");
|
if (rowsReported == 0) {
|
log.debug("云仓上报派发:待办 {} 条,实际处理 0 条", pending.size());
|
} else {
|
log.debug("云仓上报派发:待办 {} 条,实际处理 {} 条", pending.size(), rowsReported);
|
}
|
}
|
|
private Set<Long> safeProcessMergedInOutGroup(List<CloudWmsNotifyLog> group) {
|
Set<Long> handled = new HashSet<>();
|
List<Long> claimedIds = new ArrayList<>();
|
List<CloudWmsNotifyLog> claimedRows = new ArrayList<>();
|
log.debug("云仓上报派发:合并组抢占,行数 {}", group.size());
|
try {
|
for (CloudWmsNotifyLog row : group) {
|
Long id = row.getId();
|
log.debug("云仓上报派发:合并抢占尝试 id={}", id);
|
if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
|
log.debug("云仓上报派发:合并抢占未抢到 id={},跳过本行继续其它行", id);
|
continue;
|
}
|
claimedIds.add(id);
|
claimedRows.add(row);
|
}
|
if (claimedRows.isEmpty()) {
|
log.debug("云仓上报派发:合并组抢占 0 行");
|
return handled;
|
}
|
log.debug("云仓上报派发:合并组已抢占 {} 行,请求云仓", claimedRows.size());
|
processMergedInOut(claimedRows);
|
log.debug("云仓上报派发:合并组云仓调用已返回");
|
handled.addAll(claimedIds);
|
} catch (Exception e) {
|
log.warn("云仓上报同批合并异常:{}", e.getMessage());
|
} finally {
|
for (Long id : claimedIds) {
|
cloudWmsNotifyLogService.clearSending(id);
|
}
|
}
|
return handled;
|
}
|
|
private void processMergedInOut(List<CloudWmsNotifyLog> group) {
|
Date now = new Date();
|
List<InOutResultReportParam> lines = new ArrayList<>();
|
try {
|
for (CloudWmsNotifyLog row : group) {
|
lines.addAll(cloudWmsNotifyLogService.parseInOutLinesFromRequestBody(row.getRequestBody()));
|
}
|
} catch (IOException e) {
|
String msg = "反序列化失败: " + e.getMessage();
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, row.getRequestBody(), msg, nextRetry, now, row.getMaxRetryCount());
|
}
|
return;
|
}
|
if (lines.isEmpty()) {
|
return;
|
}
|
String mergedBody;
|
try {
|
mergedBody = objectMapper.writeValueAsString(new InOutResultBatchPayload().setLines(lines));
|
} catch (JsonProcessingException e) {
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, row.getRequestBody(), "合并请求体序列化失败: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
|
}
|
return;
|
}
|
log.info("云仓上报开始(同单合并),ids={},requestBody={}", idsOf(group), mergedBody);
|
try {
|
Map<String, Object> res = cloudWmsReportService.reportInOutResults(lines);
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
updateAfterNotify(row, mergedBody, res, nextRetry, now, row.getMaxRetryCount());
|
}
|
} catch (FeignException e) {
|
String responseBody = e.contentUTF8();
|
String fullMsg = "status=" + e.status() + ",message=" + e.getMessage()
|
+ (responseBody == null || responseBody.isEmpty() ? "" : ",responseBody=" + responseBody);
|
log.warn("云仓上报同批合并请求失败:{}", fullMsg);
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, mergedBody, "请求异常: " + fullMsg, nextRetry, now, row.getMaxRetryCount());
|
}
|
} catch (Exception e) {
|
log.warn("云仓上报同批合并请求失败:{}", e.getMessage());
|
for (CloudWmsNotifyLog row : group) {
|
int nextRetry = (row.getRetryCount() == null ? 0 : row.getRetryCount()) + 1;
|
setFailResult(row, mergedBody, "请求异常: " + e.getMessage(), nextRetry, now, row.getMaxRetryCount());
|
}
|
}
|
}
|
|
private static List<Long> idsOf(List<CloudWmsNotifyLog> group) {
|
List<Long> ids = new ArrayList<>(group.size());
|
for (CloudWmsNotifyLog row : group) {
|
ids.add(row.getId());
|
}
|
return ids;
|
}
|
|
private boolean safeProcessOne(CloudWmsNotifyLog logRecord) {
|
Long id = logRecord.getId();
|
log.debug("云仓上报派发:单条抢占 id={}", id);
|
if (!cloudWmsNotifyLogService.tryClaimSending(id)) {
|
log.debug("云仓上报派发:单条未抢到 id={}", id);
|
return false;
|
}
|
try {
|
processOne(logRecord);
|
} catch (Exception e) {
|
log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
|
} finally {
|
cloudWmsNotifyLogService.clearSending(id);
|
}
|
return true;
|
}
|
|
private void processOne(CloudWmsNotifyLog logRecord) {
|
String reportType = logRecord.getReportType();
|
String requestBody = logRecord.getRequestBody();
|
Date now = new Date();
|
int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
|
int effectiveMaxRetry = logRecord.getMaxRetryCount();
|
String rtInOut = cloudWmsNotifyLogService.getReportTypeInOutResult();
|
String rtAdj = cloudWmsNotifyLogService.getReportTypeInventoryAdjust();
|
log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}",
|
logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
|
|
try {
|
if (rtInOut.equals(reportType)) {
|
JsonNode root = objectMapper.readTree(requestBody);
|
Map<String, Object> res;
|
if (root.has("lines") && root.get("lines").isArray()) {
|
InOutResultBatchPayload batch = objectMapper.readValue(requestBody, InOutResultBatchPayload.class);
|
res = cloudWmsReportService.reportInOutResults(batch.getLines());
|
} else {
|
InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
|
res = cloudWmsReportService.reportInOutResult(param);
|
}
|
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
|
} else if (rtAdj.equals(reportType)) {
|
InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
|
Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
|
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
|
} else {
|
log.warn("未知上报类型,id={},reportType={}", logRecord.getId(), reportType);
|
return;
|
}
|
} catch (JsonProcessingException e) {
|
log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage());
|
setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
|
} catch (FeignException e) {
|
String responseBody = e.contentUTF8();
|
String fullMsg = "status=" + e.status() + ",message=" + e.getMessage()
|
+ (responseBody == null || responseBody.isEmpty() ? "" : ",responseBody=" + responseBody);
|
log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), fullMsg);
|
setFailResult(logRecord, requestBody, "请求异常: " + fullMsg, nextRetry, now, effectiveMaxRetry);
|
} catch (Exception e) {
|
log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
|
setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
|
}
|
}
|
|
private void updateAfterNotify(CloudWmsNotifyLog logRecord, String requestBody, Map<String, Object> res, int nextRetry, Date now, int effectiveMaxRetry) {
|
String responseJson;
|
try {
|
responseJson = res != null ? objectMapper.writeValueAsString(res) : "null";
|
} catch (JsonProcessingException e) {
|
responseJson = String.valueOf(res);
|
}
|
Object codeObj = res != null ? res.get("code") : null;
|
Object statusObj = res != null ? res.get("status") : null;
|
boolean success = Integer.valueOf(200).equals(codeObj) || Integer.valueOf(200).equals(statusObj);
|
int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusFail();
|
logRecord.setLastRequestBody(truncateForStore(requestBody));
|
logRecord.setLastResponseBody(truncateForStore(responseJson));
|
logRecord.setLastNotifyTime(now);
|
logRecord.setRetryCount(nextRetry);
|
logRecord.setNotifyStatus(status);
|
logRecord.setSending(0);
|
logRecord.setUpdateTime(now);
|
cloudWmsNotifyLogService.updateById(logRecord);
|
log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}",
|
logRecord.getId(), logRecord.getBizRef(), nextRetry, status, responseJson);
|
}
|
|
private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) {
|
logRecord.setLastRequestBody(truncateForStore(requestBody));
|
logRecord.setLastResponseBody(truncateForStore(errorMsg));
|
logRecord.setLastNotifyTime(now);
|
logRecord.setRetryCount(nextRetry);
|
int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
|
? cloudWmsNotifyLogService.getNotifyStatusFail()
|
: cloudWmsNotifyLogService.getNotifyStatusPending();
|
logRecord.setNotifyStatus(status);
|
logRecord.setSending(0);
|
logRecord.setUpdateTime(now);
|
cloudWmsNotifyLogService.updateById(logRecord);
|
log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}",
|
logRecord.getId(), logRecord.getBizRef(), nextRetry, logRecord.getNotifyStatus(), errorMsg);
|
}
|
|
/** maxRetry = -1 表示无限重发 */
|
private boolean isInfiniteRetry(Integer maxRetry) {
|
return maxRetry != null && maxRetry == -1;
|
}
|
|
private String truncateForStore(String body) {
|
int maxChars = resolveStoreBodyMaxChars();
|
if (body == null || body.length() <= maxChars) {
|
return body;
|
}
|
return body.substring(0, maxChars);
|
}
|
|
private int resolveStoreBodyMaxChars() {
|
try {
|
Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_NOTIFY_STORE_BODY_MAX_CHARS);
|
if (cfg == null || cfg.getVal() == null || cfg.getVal().trim().isEmpty()) {
|
return STORE_BODY_MAX_CHARS_DEFAULT;
|
}
|
int parsed = Integer.parseInt(cfg.getVal().trim());
|
return parsed > 0 ? parsed : STORE_BODY_MAX_CHARS_DEFAULT;
|
} catch (Exception e) {
|
return STORE_BODY_MAX_CHARS_DEFAULT;
|
}
|
}
|
}
|