package com.vincent.rsf.server.manager.schedules;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
|
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
|
import com.vincent.rsf.server.api.service.CloudWmsReportService;
|
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
|
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
|
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
|
import com.vincent.rsf.server.system.entity.Config;
|
import com.vincent.rsf.server.system.service.ConfigService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Date;
|
import java.util.List;
|
import java.util.Map;
|
|
/** 云仓上报定时任务 */
|
@Slf4j
|
@Component
|
public class CloudWmsNotifySchedule {
|
|
private static final int BATCH_LIMIT = -1;
|
private static final int STORE_BODY_MAX_CHARS_DEFAULT = 2000;
|
|
@Autowired
|
private CloudWmsNotifyLogService cloudWmsNotifyLogService;
|
@Autowired
|
private CloudWmsReportService cloudWmsReportService;
|
@Autowired
|
private ObjectMapper objectMapper;
|
@Autowired
|
private ConfigService configService;
|
|
@Scheduled(cron = "0/30 * * * * ?")
|
public void syncCloudWmsNotify() {
|
// List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
|
List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
|
if (pending.isEmpty()) {
|
return;
|
}
|
long nowMs = System.currentTimeMillis();
|
for (CloudWmsNotifyLog logRecord : pending) {
|
try {
|
Integer maxRetry = logRecord.getMaxRetryCount();
|
Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
|
if (maxRetry == null || intervalSeconds == null) {
|
log.warn("云仓上报待办跳过:重试参数缺失,id={},bizRef={},maxRetry={},intervalSeconds={}",
|
logRecord.getId(), logRecord.getBizRef(), maxRetry, intervalSeconds);
|
continue;
|
}
|
int effectiveIntervalSeconds = Math.max(0, intervalSeconds);
|
// if (logRecord.getRetryCount() != null && logRecord.getRetryCount() >= maxRetry) {
|
if (!isInfiniteRetry(maxRetry)
|
&& logRecord.getRetryCount() != null
|
&& logRecord.getRetryCount() >= maxRetry) {
|
log.info("云仓上报待办跳过:重试次数已达上限,id={},bizRef={},retryCount={},maxRetry={}",
|
logRecord.getId(), logRecord.getBizRef(), logRecord.getRetryCount(), maxRetry);
|
continue;
|
}
|
if (logRecord.getLastNotifyTime() != null) {
|
long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
|
if (elapsed < effectiveIntervalSeconds) {
|
// log.info("云仓上报待办跳过:未到重试间隔,id={},bizRef={},elapsed={}s,interval={}s",
|
// logRecord.getId(), logRecord.getBizRef(), elapsed, effectiveIntervalSeconds);
|
continue;
|
}
|
}
|
processOne(logRecord);
|
} catch (Exception e) {
|
log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
|
}
|
}
|
}
|
|
private void processOne(CloudWmsNotifyLog logRecord) {
|
String reportType = logRecord.getReportType();
|
String requestBody = logRecord.getRequestBody();
|
Date now = new Date();
|
int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
|
int effectiveMaxRetry = logRecord.getMaxRetryCount();
|
log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}",
|
logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
|
|
try {
|
if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
|
InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
|
Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
|
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
|
} else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
|
InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
|
Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
|
updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
|
} else {
|
log.warn("未知上报类型,id={},reportType={}", logRecord.getId(), reportType);
|
return;
|
}
|
} catch (JsonProcessingException e) {
|
log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage());
|
setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
|
} catch (Exception e) {
|
log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
|
setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
|
}
|
}
|
|
private void updateAfterNotify(CloudWmsNotifyLog logRecord, String requestBody, Map<String, Object> res, int nextRetry, Date now, int effectiveMaxRetry) {
|
String responseJson;
|
try {
|
responseJson = res != null ? objectMapper.writeValueAsString(res) : "null";
|
} catch (JsonProcessingException e) {
|
responseJson = String.valueOf(res);
|
}
|
Object codeObj = res != null ? res.get("code") : null;
|
Object statusObj = res != null ? res.get("status") : null;
|
boolean success = Integer.valueOf(200).equals(codeObj) || Integer.valueOf(200).equals(statusObj);
|
int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusFail();
|
logRecord.setLastRequestBody(truncateForStore(requestBody));
|
logRecord.setLastResponseBody(truncateForStore(responseJson));
|
logRecord.setLastNotifyTime(now);
|
logRecord.setRetryCount(nextRetry);
|
logRecord.setNotifyStatus(status);
|
logRecord.setUpdateTime(now);
|
cloudWmsNotifyLogService.updateById(logRecord);
|
log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}",
|
logRecord.getId(), logRecord.getBizRef(), nextRetry, status, responseJson);
|
}
|
|
private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) {
|
logRecord.setLastRequestBody(truncateForStore(requestBody));
|
logRecord.setLastResponseBody(truncateForStore(errorMsg));
|
logRecord.setLastNotifyTime(now);
|
logRecord.setRetryCount(nextRetry);
|
// logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
|
int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
|
? cloudWmsNotifyLogService.getNotifyStatusFail()
|
: cloudWmsNotifyLogService.getNotifyStatusPending();
|
logRecord.setNotifyStatus(status);
|
logRecord.setUpdateTime(now);
|
cloudWmsNotifyLogService.updateById(logRecord);
|
log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}",
|
logRecord.getId(), logRecord.getBizRef(), nextRetry, logRecord.getNotifyStatus(), errorMsg);
|
}
|
|
/** maxRetry = -1 表示无限重发 */
|
private boolean isInfiniteRetry(Integer maxRetry) {
|
return maxRetry != null && maxRetry == -1;
|
}
|
|
private String truncateForStore(String body) {
|
int maxChars = resolveStoreBodyMaxChars();
|
if (body == null || body.length() <= maxChars) {
|
return body;
|
}
|
return body.substring(0, maxChars);
|
}
|
|
private int resolveStoreBodyMaxChars() {
|
try {
|
Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_NOTIFY_STORE_BODY_MAX_CHARS);
|
if (cfg == null || cfg.getVal() == null || cfg.getVal().trim().isEmpty()) {
|
return STORE_BODY_MAX_CHARS_DEFAULT;
|
}
|
int parsed = Integer.parseInt(cfg.getVal().trim());
|
return parsed > 0 ? parsed : STORE_BODY_MAX_CHARS_DEFAULT;
|
} catch (Exception e) {
|
return STORE_BODY_MAX_CHARS_DEFAULT;
|
}
|
}
|
}
|