cl
1 天以前 b21ca070526ec10fbea98e29135751776dc31059
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.vincent.rsf.server.manager.schedules;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
import com.vincent.rsf.server.api.controller.erp.params.InventoryAdjustReportParam;
import com.vincent.rsf.server.api.service.CloudWmsReportService;
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import java.util.Date;
import java.util.List;
import java.util.Map;
 
/** 云仓上报定时任务 */
@Slf4j
@Component
public class CloudWmsNotifySchedule {
 
    private static final int BATCH_LIMIT = 50;
 
    @Autowired
    private CloudWmsNotifyLogService cloudWmsNotifyLogService;
    @Autowired
    private CloudWmsReportService cloudWmsReportService;
    @Autowired
    private ObjectMapper objectMapper;
 
    @Scheduled(cron = "0/30 * * * * ?")
    public void syncCloudWmsNotify() {
        // List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, 999);
        List<CloudWmsNotifyLog> pending = cloudWmsNotifyLogService.listPending(BATCH_LIMIT, -1);
        if (pending.isEmpty()) {
            return;
        }
        long nowMs = System.currentTimeMillis();
        for (CloudWmsNotifyLog logRecord : pending) {
            try {
                Integer maxRetry = logRecord.getMaxRetryCount();
                Integer intervalSeconds = logRecord.getRetryIntervalSeconds();
                if (maxRetry == null || intervalSeconds == null || intervalSeconds <= 0) {
                    continue;
                }
                // if (logRecord.getRetryCount() != null && logRecord.getRetryCount() >= maxRetry) {
                if (!isInfiniteRetry(maxRetry)
                        && logRecord.getRetryCount() != null
                        && logRecord.getRetryCount() >= maxRetry) {
                    continue;
                }
                if (logRecord.getLastNotifyTime() != null) {
                    long elapsed = (nowMs - logRecord.getLastNotifyTime().getTime()) / 1000;
                    if (elapsed < intervalSeconds) {
                        continue;
                    }
                }
                processOne(logRecord);
            } catch (Exception e) {
                log.warn("云仓上报定时任务处理单条异常,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
            }
        }
    }
 
    private void processOne(CloudWmsNotifyLog logRecord) {
        String reportType = logRecord.getReportType();
        String requestBody = logRecord.getRequestBody();
        Date now = new Date();
        int nextRetry = (logRecord.getRetryCount() == null ? 0 : logRecord.getRetryCount()) + 1;
        int effectiveMaxRetry = logRecord.getMaxRetryCount();
 
        try {
            if (cloudWmsNotifyLogService.getReportTypeInOutResult().equals(reportType)) {
                InOutResultReportParam param = objectMapper.readValue(requestBody, InOutResultReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInOutResult(param);
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
            } else if (cloudWmsNotifyLogService.getReportTypeInventoryAdjust().equals(reportType)) {
                InventoryAdjustReportParam param = objectMapper.readValue(requestBody, InventoryAdjustReportParam.class);
                Map<String, Object> res = cloudWmsReportService.reportInventoryAdjust(param);
                updateAfterNotify(logRecord, requestBody, res, nextRetry, now, effectiveMaxRetry);
            } else {
                log.warn("未知上报类型,id={},reportType={}", logRecord.getId(), reportType);
                return;
            }
        } catch (JsonProcessingException e) {
            log.warn("云仓上报请求体反序列化失败,id={}:{}", logRecord.getId(), e.getMessage());
            setFailResult(logRecord, requestBody, "反序列化失败: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
        } catch (Exception e) {
            log.warn("云仓上报请求失败,id={},bizRef={}:{}", logRecord.getId(), logRecord.getBizRef(), e.getMessage());
            setFailResult(logRecord, requestBody, "请求异常: " + e.getMessage(), nextRetry, now, effectiveMaxRetry);
        }
    }
 
    private void updateAfterNotify(CloudWmsNotifyLog logRecord, String requestBody, Map<String, Object> res, int nextRetry, Date now, int effectiveMaxRetry) {
        String responseJson;
        try {
            responseJson = res != null ? objectMapper.writeValueAsString(res) : "null";
        } catch (JsonProcessingException e) {
            responseJson = String.valueOf(res);
        }
        Object codeObj = res != null ? res.get("code") : null;
        boolean success = Integer.valueOf(200).equals(codeObj);
        int status = success ? cloudWmsNotifyLogService.getNotifyStatusSuccess() : cloudWmsNotifyLogService.getNotifyStatusPending();
        if (!success && !isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry) {
            status = cloudWmsNotifyLogService.getNotifyStatusFail();
        }
        logRecord.setLastRequestBody(requestBody);
        logRecord.setLastResponseBody(responseJson);
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        logRecord.setNotifyStatus(status);
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
    }
 
    private void setFailResult(CloudWmsNotifyLog logRecord, String requestBody, String errorMsg, int nextRetry, Date now, int effectiveMaxRetry) {
        logRecord.setLastRequestBody(requestBody);
        logRecord.setLastResponseBody(errorMsg);
        logRecord.setLastNotifyTime(now);
        logRecord.setRetryCount(nextRetry);
        // logRecord.setNotifyStatus(nextRetry >= effectiveMaxRetry ? cloudWmsNotifyLogService.getNotifyStatusFail() : cloudWmsNotifyLogService.getNotifyStatusPending());
        logRecord.setNotifyStatus(!isInfiniteRetry(effectiveMaxRetry) && nextRetry >= effectiveMaxRetry
                ? cloudWmsNotifyLogService.getNotifyStatusFail()
                : cloudWmsNotifyLogService.getNotifyStatusPending());
        logRecord.setUpdateTime(now);
        cloudWmsNotifyLogService.updateById(logRecord);
    }
 
    /** maxRetry = -1 表示无限重发 */
    private boolean isInfiniteRetry(Integer maxRetry) {
        return maxRetry != null && maxRetry == -1;
    }
}