cl
7 天以前 6062e826f1b1acde1bfe2887353c1214f2af12d6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.vincent.rsf.server.manager.schedules;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
import com.vincent.rsf.server.api.service.CloudWmsReportService;
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
import com.vincent.rsf.server.system.entity.Config;
import com.vincent.rsf.server.system.service.ConfigService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import java.util.Date;
import java.util.List;
import java.util.Map;
 
/** 云仓上报定时任务 */
@Slf4j
@Component
public class CloudWmsNotifySchedule {
 
    private static final int BATCH_LIMIT = 50;
    private static final int STORE_BODY_MAX_CHARS_DEFAULT = 2000;
 
    @Autowired
    private CloudWmsNotifyLogService cloudWmsNotifyLogService;
    @Autowired
    private CloudWmsReportService cloudWmsReportService;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private ConfigService configService;
 
    @Scheduled(cron = "0/30 * * * * ?")
    public void syncCloudWmsNotify() {
        // List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
        List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
        if (pending.isEmpty()) {
            return;
        }
        long nowMs = System.currentTimeMillis();
        for (CloudWmsNotifyLog logRecord : pending) {
            try {
                Integer maxRetry = logRecord.getMaxRetryCount();
                Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
                if (maxRetry == null || intervalSeconds == null || intervalSeconds <= 0) {
                    log.warn("云仓上报待办跳过:重试参数缺失,id={},bizRef={},maxRetry={},intervalSeconds={}",
                            logRecord.getId(), logRecord.getBizRef(), maxRetry, intervalSeconds);
                    continue;
                }
                // if (logRecord.getRetryCount() != null && logRecord.getRetryCount() >= maxRetry) {
                if (!isInfiniteRetry(maxRetry)
                        && logRecord.getRetryCount() != null
                        && logRecord.getRetryCount() >= maxRetry) {
                    log.info("云仓上报待办跳过:重试次数已达上限,id={},bizRef={},retryCount={},maxRetry={}",
                            logRecord.getId(), logRecord.getBizRef(), logRecord.getRetryCount(), maxRetry);
                    continue;
                }
                if (logRecord.getLastNotifyTime() != null) {
                    long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
                    if (elapsed < intervalSeconds) {
//                        log.info("云仓上报待办跳过:未到重试间隔,id={},bizRef={},elapsed={}s,interval={}s",
//                                logRecord.getId(), logRecord.getBizRef(), elapsed, intervalSeconds);
                        continue;
                    }
                }
                processOne(logRecord);
            } catch (Exception e) {
                log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
            }
        }
    }
 
    private void processOne(CloudWmsNotifyLog logRecord) {
        String reportType = logRecord.getReportType();
        String requestBody = logRecord.getRequestBody();
        Date now = new Date();
        int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
        int effectiveMaxRetry = logRecord.getMaxRetryCount();
        log.info("云仓上报开始,id={},bizRef={},reportType={},attempt={},requestBody={}",
                logRecord.getId(), logRecord.getBizRef(), reportType, nextRetry, requestBody);
 
        try {
            if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
                InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
            } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
                InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
            } else {
                log.warn("未知上报类型,id={},reportType={}", logRecord.getId(), reportType);
                return;
            }
        } catch (JsonProcessingException e) {
            log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage());
            setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
        } catch (Exception e) {
            log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
            setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
        }
    }
 
    private void updateAfterNotify(CloudWmsNotifyLog logRecord, String requestBody, Map<String, Object> res, int nextRetry, Date now, int effectiveMaxRetry) {
        String responseJson;
        try {
            responseJson = res != null ? objectMapper.writeValueAsString(res) : "null";
        } catch (JsonProcessingException e) {
            responseJson = String.valueOf(res);
        }
        Object codeObj = res != null ? res.get("code") : null;
        Object statusObj = res != null ? res.get("status") : null;
        boolean success = Integer.valueOf(200).equals(codeObj) || Integer.valueOf(200).equals(statusObj);
        int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusFail();
        logRecord.setLastRequestBody(truncateForStore(requestBody));
        logRecord.setLastResponseBody(truncateForStore(responseJson));
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        logRecord.setNotifyStatus(status);
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
        log.info("云仓上报结束,id={},bizRef={},attempt={},notifyStatus={},responseBody={}",
                logRecord.getId(), logRecord.getBizRef(), nextRetry, status, responseJson);
    }
 
    private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) {
        logRecord.setLastRequestBody(truncateForStore(requestBody));
        logRecord.setLastResponseBody(truncateForStore(errorMsg));
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
        int status = !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
                ? cloudWmsNotifyLogService.getNotifyStatusFail()
                : cloudWmsNotifyLogService.getNotifyStatusPending();
        logRecord.setNotifyStatus(status);
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
        log.warn("云仓上报失败,id={},bizRef={},attempt={},notifyStatus={},error={}",
                logRecord.getId(), logRecord.getBizRef(), nextRetry, logRecord.getNotifyStatus(), errorMsg);
    }
 
    /** maxRetry = -1 表示无限重发 */
    private boolean isInfiniteRetry(Integer maxRetry) {
        return maxRetry != null && maxRetry == -1;
    }
 
    private String truncateForStore(String body) {
        int maxChars = resolveStoreBodyMaxChars();
        if (body == null || body.length() <= maxChars) {
            return body;
        }
        return body.substring(0, maxChars);
    }
 
    private int resolveStoreBodyMaxChars() {
        try {
            Config cfg = configService.getCachedOrLoad(GlobalConfigCode.CLOUD_WMS_NOTIFY_STORE_BODY_MAX_CHARS);
            if (cfg == null || cfg.getVal() == null || cfg.getVal().trim().isEmpty()) {
                return STORE_BODY_MAX_CHARS_DEFAULT;
            }
            int parsed = Integer.parseInt(cfg.getVal().trim());
            return parsed > 0 ? parsed : STORE_BODY_MAX_CHARS_DEFAULT;
        } catch (Exception e) {
            return STORE_BODY_MAX_CHARS_DEFAULT;
        }
    }
}