cl
6 天以前 50393719d85fc30438456b0d0f065573a404fba5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package com.vincent.rsf.server.manager.service.impl;
 
import com.vincent.rsf.framework.common.R;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.vincent.rsf.server.api.controller.erp.params.InOutResultReportParam;
import com.vincent.rsf.server.common.service.RedisService;
import com.vincent.rsf.server.manager.entity.CloudWmsNotifyLog;
import com.vincent.rsf.server.manager.mapper.CloudWmsNotifyLogMapper;
import com.vincent.rsf.server.manager.service.CloudWmsNotifyLogService;
import com.vincent.rsf.server.system.constant.GlobalConfigCode;
import com.vincent.rsf.server.system.entity.Config;
import com.vincent.rsf.server.system.service.ConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.io.IOException;
import java.util.Date;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
 
@Slf4j
@Service
public class CloudWmsNotifyLogServiceImpl extends ServiceImpl<CloudWmsNotifyLogMapper, CloudWmsNotifyLog> implements CloudWmsNotifyLogService {
 
    /** 单条待办「正在上报」Redis 占位秒数(SET NX EX) */
    private static final int CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS = 120;
    /** sending=1 但 Redis 无占位:update_time 早于此时长(分钟)则补偿清零 */
    private static final int STALE_SENDING_RECOVER_AFTER_MINUTES = 2;
 
    @Autowired
    private ConfigService configService;
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired(required = false)
    private RedisService redisService;
 
    private static final String CLOUD_WMS_REDIS_FLAG = "cloudwms";
 
    private boolean useSendingRedis() {
        return redisService != null
                && Boolean.TRUE.equals(redisService.initialize)
                && CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS > 0;
    }
 
    private static String sendingRedisSubKey(Long id) {
        return "sending." + id;
    }
 
    /** Redis 占位存在且未过期时视为正在上报 */
    private boolean isSendingHeldInRedis(Long id) {
        if (id == null || !useSendingRedis()) {
            return false;
        }
        try {
            String v = redisService.getValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
            return StringUtils.isNotBlank(v);
        } catch (Exception e) {
            return false;
        }
    }
 
    @Override
    public List<CloudWmsNotifyLog> listPending(int limit, int maxRetry) {
        LambdaQueryWrapper<CloudWmsNotifyLog> wrapper = new LambdaQueryWrapper<CloudWmsNotifyLog>()
                // 仅查询数据库配置状态
                .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
                .apply("(send_hold IS NULL OR send_hold = 0)")
                // 仅查询可重试数据
                .apply("(max_retry_count IS NULL OR max_retry_count = -1 OR retry_count < max_retry_count)")
                // 仅查询已到重试时间的数据
                .apply("(last_notify_time IS NULL OR retry_interval_seconds IS NULL OR retry_interval_seconds <= 0 OR TIMESTAMPDIFF(SECOND, last_notify_time, NOW()) >= retry_interval_seconds)")
                //缺重试参数的不进入待发送列表
                .isNotNull(CloudWmsNotifyLog::getMaxRetryCount)
                .isNotNull(CloudWmsNotifyLog::getRetryIntervalSeconds)
                .orderByAsc(CloudWmsNotifyLog::getLastNotifyTime)
                .orderByAsc(CloudWmsNotifyLog::getId);
        if (maxRetry >= 0) {
            wrapper.lt(CloudWmsNotifyLog::getRetryCount, maxRetry);
        }
        if (limit < 0) {
            return list(wrapper);
        }
        Page<CloudWmsNotifyLog> page = new Page<>(1, Math.max(1, limit));
        return page(page, wrapper).getRecords();
    }
 
    @Override
    public boolean tryClaimSending(Long id) {
        if (id == null) {
            return false;
        }
        boolean useRedis = useSendingRedis();
        if (useRedis) {
            boolean got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
                    CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
            if (!got) {
                CloudWmsNotifyLog cur = getById(id);
                if (cur != null && (cur.getSending() == null || cur.getSending() == 0)) {
                    redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
                    got = redisService.trySetStringNxEx(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id), "1",
                            CLOUD_WMS_NOTIFY_SENDING_REDIS_TTL_SECONDS);
                }
                if (!got) {
                    return false;
                }
            }
        }
        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
        u.eq(CloudWmsNotifyLog::getId, id)
                .set(CloudWmsNotifyLog::getSending, 1)
                .set(CloudWmsNotifyLog::getUpdateTime, new Date());
        if (!useRedis) {
            u.and(w -> w.isNull(CloudWmsNotifyLog::getSending).or().eq(CloudWmsNotifyLog::getSending, 0));
        }
        boolean ok = update(u);
        if (!ok && useRedis) {
            try {
                redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
            } catch (Exception ignored) {
            }
        }
        return ok;
    }
 
    @Override
    public void clearSending(Long id) {
        if (id == null) {
            return;
        }
        if (useSendingRedis()) {
            try {
                redisService.deleteValue(CLOUD_WMS_REDIS_FLAG, sendingRedisSubKey(id));
            } catch (Exception e) {
                log.debug("云仓上报 clearSending Redis id={}", id, e);
            }
        }
        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
        u.eq(CloudWmsNotifyLog::getId, id)
                .set(CloudWmsNotifyLog::getSending, 0)
                .set(CloudWmsNotifyLog::getUpdateTime, new Date());
        update(u);
    }
 
    @Override
    public void recoverStaleSendingWhenRedisMiss() {
        Date threshold = new Date(System.currentTimeMillis() - STALE_SENDING_RECOVER_AFTER_MINUTES * 60_000L);
        List<CloudWmsNotifyLog> rows = list(new LambdaQueryWrapper<CloudWmsNotifyLog>()
                .eq(CloudWmsNotifyLog::getSending, 1)
                .isNotNull(CloudWmsNotifyLog::getUpdateTime)
                .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
                .last("LIMIT 500"));
        if (rows.isEmpty()) {
            return;
        }
        Date now = new Date();
        int cleared = 0;
        for (CloudWmsNotifyLog row : rows) {
            Long id = row.getId();
            if (id == null) {
                continue;
            }
            if (useSendingRedis() && isSendingHeldInRedis(id)) {
                continue;
            }
            LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
            u.eq(CloudWmsNotifyLog::getId, id)
                    .eq(CloudWmsNotifyLog::getSending, 1)
                    .lt(CloudWmsNotifyLog::getUpdateTime, threshold)
                    .set(CloudWmsNotifyLog::getSending, 0)
                    .set(CloudWmsNotifyLog::getUpdateTime, now);
            if (update(u)) {
                cleared++;
            }
        }
        if (cleared > 0) {
            log.info("云仓待办 sending 补偿清零 {} 条", cleared);
        }
    }
 
    @Override
    @Transactional(rollbackFor = Exception.class)
    public R manualFlushToNotifyByOrderCode(String orderCode, boolean inbound) {
        if (StringUtils.isBlank(orderCode)) {
            return R.error("单号不能为空");
        }
        int flag = inbound ? 1 : 0;
        Date now = new Date();
        LambdaUpdateWrapper<CloudWmsNotifyLog> u = new LambdaUpdateWrapper<>();
        u.eq(CloudWmsNotifyLog::getReportType, getReportTypeInOutResult())
                .eq(CloudWmsNotifyLog::getSourceOrderNo, orderCode.trim())
                .eq(CloudWmsNotifyLog::getInboundFlag, flag)
                .eq(CloudWmsNotifyLog::getSendHold, 1)
                .in(CloudWmsNotifyLog::getNotifyStatus, getNotifyStatusPending(), getNotifyStatusFail())
                .set(CloudWmsNotifyLog::getSendHold, 0)
                .set(CloudWmsNotifyLog::getUpdateTime, now);
        int n = getBaseMapper().update(null, u);
        if (n <= 0) {
            return R.error("当前无待放行的入出库待办(请确认 manual 模式、单号与入/出库类型一致)");
        }
        return R.ok("已放行云仓上报待办 " + n + " 条").add(n);
    }
 
    @Override
    public String getReportTypeInOutResult() {
        return getConfigString(GlobalConfigCode.CLOUD_WMS_REPORT_TYPE_IN_OUT_RESULT, CloudWmsNotifyLog.REPORT_TYPE_IN_OUT_RESULT);
    }
 
    @Override
    public String getReportTypeInventoryAdjust() {
        return getConfigString(GlobalConfigCode.CLOUD_WMS_REPORT_TYPE_INVENTORY_ADJUST, CloudWmsNotifyLog.REPORT_TYPE_INVENTORY_ADJUST);
    }
 
    @Override
    public int getNotifyStatusPending() {
        return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_PENDING, CloudWmsNotifyLog.NOTIFY_STATUS_PENDING);
    }
 
    @Override
    public int getNotifyStatusSuccess() {
        return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_SUCCESS, CloudWmsNotifyLog.NOTIFY_STATUS_SUCCESS);
    }
 
    @Override
    public int getNotifyStatusFail() {
        return getConfigInt(GlobalConfigCode.CLOUD_WMS_NOTIFY_STATUS_FAIL, CloudWmsNotifyLog.NOTIFY_STATUS_FAIL);
    }
 
    private String getConfigValTrimmed(String flag) {
        Config c = configService.getCachedOrLoad(flag);
        if (c == null || c.getVal() == null) {
            return null;
        }
        String v = c.getVal().trim();
        return v.isEmpty() ? null : v;
    }
 
    /** 与实体常量搭配:仅当库/缓存无有效 val 时用常量 */
    private String getConfigString(String flag, String defaultVal) {
        String v = getConfigValTrimmed(flag);
        return v != null ? v : defaultVal;
    }
 
    private int getConfigInt(String flag, int defaultVal) {
        Integer n = getConfigIntOrNull(flag);
        return n != null ? n : defaultVal;
    }
 
    private Integer getConfigIntOrNull(String flag) {
        String v = getConfigValTrimmed(flag);
        if (v == null) {
            return null;
        }
        try {
            return Integer.parseInt(v);
        } catch (NumberFormatException e) {
            return null;
        }
    }
 
    @Override
    public void fillFromConfig(CloudWmsNotifyLog log) {
        log.setNotifyStatus(getNotifyStatusPending());
        log.setMaxRetryCount(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_MAX_RETRY));
        log.setRetryIntervalSeconds(getConfigIntOrNull(GlobalConfigCode.CLOUD_WMS_NOTIFY_RETRY_INTERVAL_SECONDS));
    }
 
    @Override
    public String inOutMergeKeyFromRequestBody(String requestBody) {
        return mergeKeyFromBody(requestBody);
    }
 
    @Override
    public List<InOutResultReportParam> parseInOutLinesFromRequestBody(String requestBody) throws IOException {
        return extractInOutLines(requestBody);
    }
 
    private String mergeKeyFromBody(String body) {
        if (StringUtils.isBlank(body)) {
            return null;
        }
        try {
            JsonNode root = objectMapper.readTree(body);
            if (root.has("lines") && root.get("lines").isArray() && root.get("lines").size() > 0) {
                return null;
            }
            JsonNode first = root;
            String orderNo = textNode(first, "orderNo");
            if (StringUtils.isBlank(orderNo)) {
                return null;
            }
            String wh = textNode(first, "wareHouseId");
            boolean inbound = !first.has("inbound") || first.get("inbound").isNull() || first.get("inbound").asBoolean();
            return orderNo + "\t" + inbound + "\t" + StringUtils.defaultString(wh);
        } catch (Exception e) {
            return null;
        }
    }
 
    private static String textNode(JsonNode n, String field) {
        if (n == null || !n.has(field) || n.get(field).isNull()) {
            return null;
        }
        return n.get(field).asText();
    }
 
    private List<InOutResultReportParam> extractInOutLines(String body) throws java.io.IOException {
        JsonNode root = objectMapper.readTree(body);
        if (root.has("lines") && root.get("lines").isArray()) {
            List<InOutResultReportParam> list = new ArrayList<>();
            for (JsonNode n : root.get("lines")) {
                list.add(objectMapper.treeToValue(n, InOutResultReportParam.class));
            }
            return list;
        }
        return Collections.singletonList(objectMapper.readValue(body, InOutResultReportParam.class));
    }
}