package com.vincent.rsf.server.manager.schedules; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam; import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam; import com.vincent.rsf.server.api.service.CloudWmsReportService; import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog; import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService; import com.vincent.rsf.server.system.constant.GlobalConfigCode; import com.vincent.rsf.server.system.entity.Config; import com.vincent.rsf.server.system.service.ConfigService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; import java.util.Map; /** 云仓上报定时任务 */ @Slf4j @Component public class CloudWmsNotifySchedule { private static final int BATCH_LIMIT = -1; private static final int STORE_BODY_MAX_CHARS_DEFAULT = 2000; @Autowired private CloudWmsNotifyLogService cloudWmsNotifyLogService; @Autowired private CloudWmsReportService cloudWmsReportService; @Autowired private ObjectMapper objectMapper; @Autowired private ConfigService configService; @Scheduled(cron = "0/30 * * * * ?") public void syncCloudWmsNotify() { // List pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999); List pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1); if (pending.isEmpty()) { return; } long nowMs = System.currentTimeMillis(); for (CloudWmsNotifyLog logRecord : pending) { try { Integer maxRetry = logRecord.getMaxRetryCount(); Integer intervalSeconds = logRecord.getRetryIntervalSeconds(); if (maxRetry == null || intervalSeconds == null || intervalSeconds <= 0) { log.warn("云仓上报待办跳过:重试参数缺失,id={},bizRef={},maxRetry={},intervalSeconds={}", logRecord.getId(), logRecord.getBizRef(), maxRetry, intervalSeconds); continue; } // if (logRecord.getRetryCount() != null && logRecord.getRetryCount() >= maxRetry) { if (!isInfiniteRetry(maxRetry) && logRecord.getRetryCount() != null && logRecord.getRetryCount() >= maxRetry) { log.info("云仓上报待办跳过:重试次数已达上限,id={},bizRef={},retryCount={},maxRetry={}", logRecord.getId(), logRecord.getBizRef(), logRecord.getRetryCount(), maxRetry); continue; } if (logRecord.getLastNotifyTime() != null) { long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000; if (elapsed < intervalSeconds) { // log.info("云仓上报待办跳过:未到重试间隔,id={},bizRef={},elapsed={}s,interval={}s", // logRecord.getId(), logRecord.getBizRef(), elapsed, intervalSeconds); continue; } } processOne(logRecord); } catch (Exception e) { log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage()); } } } private void processOne(CloudWmsNotifyLog logRecord) { String reportType = logRecord.getReportType(); String requestBody = logRecord.getRequestBody(); Date now = new Date(); int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1; int effectiveMaxRetry = logRecord.getMaxRetryCount(); log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}", logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody); try { if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) { InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class); Map res = cloudWmsReportService.reportInOutResult(param); updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry); } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) { InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class); Map res = cloudWmsReportService.reportInventoryAdjust(param); updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry); } else { log.warn("未知上报类型,id={},reportType={}", logRecord.getId(), reportType); return; } } catch (JsonProcessingException e) { log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage()); setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry); } catch (Exception e) { log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage()); setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry); } } private void updateAfterNotify(CloudWmsNotifyLog logRecord, String requestBody, Map res, int nextRetry, Date now, int effectiveMaxRetry) { String responseJson; try { responseJson = res != null ? objectMapper.writeValueAsString(res) : "null"; } catch (JsonProcessingException e) { responseJson = String.valueOf(res); } Object codeObj = res != null ? res.get("code") : null; Object statusObj = res != null ? res.get("status") : null; boolean success = Integer.valueOf(200).equals(codeObj) || Integer.valueOf(200).equals(statusObj); int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusFail(); logRecord.setLastRequestBody(truncateForStore(requestBody)); logRecord.setLastResponseBody(truncateForStore(responseJson)); logRecord.setLastNotifyTime(now); logRecord.setRetryCount(nextRetry); logRecord.setNotifyStatus(status); logRecord.setUpdateTime(now); cloudWmsNotifyLogService.updateById(logRecord); log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}", logRecord.getId(), logRecord.getBizRef(), nextRetry, status, responseJson); } private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) { logRecord.setLastRequestBody(truncateForStore(requestBody)); logRecord.setLastResponseBody(truncateForStore(errorMsg)); logRecord.setLastNotifyTime(now); logRecord.setRetryCount(nextRetry); // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending()); int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending(); logRecord.setNotifyStatus(status); logRecord.setUpdateTime(now); cloudWmsNotifyLogService.updateById(logRecord); log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}", logRecord.getId(), logRecord.getBizRef(), nextRetry, logRecord.getNotifyStatus(), errorMsg); } /** maxRetry = -1 表示无限重发 */ private boolean isInfiniteRetry(Integer maxRetry) { return maxRetry != null && maxRetry == -1; } private String truncateForStore(String body) { int maxChars = resolveStoreBodyMaxChars(); if (body == null || body.length() <= maxChars) { return body; } return body.substring(0, maxChars); } private int resolveStoreBodyMaxChars() { try { Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_NOTIFY_STORE_BODY_MAX_CHARS); if (cfg == null || cfg.getVal() == null || cfg.getVal().trim().isEmpty()) { return STORE_BODY_MAX_CHARS_DEFAULT; } int parsed = Integer.parseInt(cfg.getVal().trim()); return parsed > 0 ? parsed : STORE_BODY_MAX_CHARS_DEFAULT; } catch (Exception e) { return STORE_BODY_MAX_CHARS_DEFAULT; } } }