#
Junjie
12 小时以前 83af5944a32527fd8aa83537dd840d428af7f577
src/main/java/com/zy/core/utils/WmsOperateUtils.java
@@ -2,7 +2,7 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.core.common.Cools;
import com.core.exception.CoolException;
import com.zy.asrs.entity.BasCrnp;
@@ -32,10 +32,36 @@
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class WmsOperateUtils {
    private static final int APPLY_IN_TASK_ASYNC_THREADS = Math.max(2,
            Math.min(4, Runtime.getRuntime().availableProcessors()));
    private static final int APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY = 200;
    private static final int APPLY_IN_TASK_REQUEST_TTL_SECONDS = 10 * 60;
    private static final int APPLY_IN_TASK_RESPONSE_TTL_SECONDS = 60;
    private static final AtomicInteger APPLY_IN_TASK_THREAD_NO = new AtomicInteger(1);
    private static final ThreadPoolExecutor APPLY_IN_TASK_EXECUTOR = new ThreadPoolExecutor(
            APPLY_IN_TASK_ASYNC_THREADS,
            APPLY_IN_TASK_ASYNC_THREADS,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(APPLY_IN_TASK_ASYNC_QUEUE_CAPACITY),
            runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName("WmsApplyInTask-" + APPLY_IN_TASK_THREAD_NO.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
    );
    @Autowired
    private ConfigService configService;
@@ -54,8 +80,10 @@
    @Autowired
    private RedisUtil redisUtil;
    private final ConcurrentMap<String, Boolean> asyncInTaskInflight = new ConcurrentHashMap<>();
    // 申请入库任务
    public synchronized String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
    public String applyInTask(String barcode, Integer sourceStaNo, Integer locType1) {
        Object systemConfigMapObj = redisUtil.get(RedisKeyType.SYSTEM_CONFIG_MAP.key);
        if (systemConfigMapObj == null) {
            News.error("系统Config缓存失效");
@@ -80,7 +108,7 @@
        int result = 0;
        try {
            BasStation basStation = basStationService
                    .selectOne(new EntityWrapper<BasStation>().eq("station_id", sourceStaNo));
                    .getOne(new QueryWrapper<BasStation>().eq("station_id", sourceStaNo));
            if (basStation == null) {
                News.error("站点{}不存在", sourceStaNo);
                return null;
@@ -103,7 +131,7 @@
                    .setTimeout(30, TimeUnit.SECONDS)
                    .build()
                    .doPost();
            if (response != null) {
            if (!Cools.isEmpty(response)) {
                JSONObject jsonObject = JSON.parseObject(response);
                if (jsonObject.getInteger("code") == 200) {
                    result = 1;
@@ -127,7 +155,7 @@
            httpRequestLog.setResponse(response);
            httpRequestLog.setCreateTime(new Date());
            httpRequestLog.setResult(result);
            httpRequestLogService.insert(httpRequestLog);
            httpRequestLogService.save(httpRequestLog);
        }
        return response;
    }
@@ -144,36 +172,45 @@
        String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + sourceStaNo;
        String responseKey = RedisKeyType.ASYNC_WMS_IN_TASK_RESPONSE.key + barcode + "_" + sourceStaNo;
        if (asyncInTaskInflight.putIfAbsent(requestKey, Boolean.TRUE) != null) {
            return;
        }
        // 检查是否已有请求在进行中
        Object existingRequest = redisUtil.get(requestKey);
        if (existingRequest != null) {
            asyncInTaskInflight.remove(requestKey);
            return; // 已有请求在进行中,跳过
        }
        // 标记请求进行中,设置60秒超时
        redisUtil.set(requestKey, "processing", 60);
        // 标记请求进行中,避免请求在线程池排队时被重复提交
        redisUtil.set(requestKey, "processing", APPLY_IN_TASK_REQUEST_TTL_SECONDS);
        // 提交异步任务
        new Thread(() -> {
            try {
                String response = applyInTask(barcode, sourceStaNo, locType1);
                if (response != null) {
                    // 存储响应结果,设置60秒超时
                    redisUtil.set(responseKey, response, 60);
                    News.info("异步WMS入库请求完成,barcode={},stationId={},response={}", barcode, sourceStaNo, response);
                } else {
                    // 请求失败,存储失败标记
                    redisUtil.set(responseKey, "FAILED", 10);
                    News.error("异步WMS入库请求失败,barcode={},stationId={}", barcode, sourceStaNo);
        try {
            APPLY_IN_TASK_EXECUTOR.execute(() -> {
                try {
                    String response = applyInTask(barcode, sourceStaNo, locType1);
                    if (response != null) {
                        redisUtil.set(responseKey, response, APPLY_IN_TASK_RESPONSE_TTL_SECONDS);
                        News.info("异步WMS入库请求完成,barcode={},stationId={},response={}", barcode, sourceStaNo, response);
                    } else {
                        redisUtil.set(responseKey, "FAILED", 10);
                        News.error("异步WMS入库请求失败,barcode={},stationId={}", barcode, sourceStaNo);
                    }
                } catch (Exception e) {
                    News.error("异步WMS入库请求异常,barcode={},stationId={},error={}", barcode, sourceStaNo, e.getMessage());
                    redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
                } finally {
                    asyncInTaskInflight.remove(requestKey);
                    redisUtil.del(requestKey);
                }
            } catch (Exception e) {
                News.error("异步WMS入库请求异常,barcode={},stationId={},error={}", barcode, sourceStaNo, e.getMessage());
                redisUtil.set(responseKey, "ERROR:" + e.getMessage(), 10);
            } finally {
                // 清除请求进行中标记
                redisUtil.del(requestKey);
            }
        }).start();
            });
        } catch (RejectedExecutionException e) {
            asyncInTaskInflight.remove(requestKey);
            redisUtil.del(requestKey);
            redisUtil.set(responseKey, "ERROR:ASYNC_QUEUE_FULL", 10);
            News.error("异步WMS入库请求被拒绝,线程池已满,barcode={},stationId={}", barcode, sourceStaNo);
        }
    }
    /**
@@ -203,13 +240,13 @@
     */
    public boolean isAsyncRequestInProgress(String barcode, Integer stationId) {
        String requestKey = RedisKeyType.ASYNC_WMS_IN_TASK_REQUEST.key + barcode + "_" + stationId;
        return redisUtil.get(requestKey) != null;
        return asyncInTaskInflight.containsKey(requestKey) || redisUtil.get(requestKey) != null;
    }
    // 申请任务重新分配库位
    public synchronized String applyReassignTaskLocNo(Integer taskNo, Integer stationId) {
        String wmsUrl = null;
        Config wmsSystemUriConfig = configService.selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemUri"));
        Config wmsSystemUriConfig = configService.getOne(new QueryWrapper<Config>().eq("code", "wmsSystemUri"));
        if (wmsSystemUriConfig != null) {
            wmsUrl = wmsSystemUriConfig.getValue();
        }
@@ -221,7 +258,7 @@
        String wmsSystemReassignInTaskUrl = null;
        Config wmsSystemReassignInTaskUrlConfig = configService
                .selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemReassignInTaskUrl"));
                .getOne(new QueryWrapper<Config>().eq("code", "wmsSystemReassignInTaskUrl"));
        if (wmsSystemReassignInTaskUrlConfig != null) {
            wmsSystemReassignInTaskUrl = wmsSystemReassignInTaskUrlConfig.getValue();
        }
@@ -260,7 +297,7 @@
                    .setTimeout(30, TimeUnit.SECONDS)
                    .build()
                    .doPost();
            if (response != null) {
            if (!Cools.isEmpty(response)) {
                JSONObject jsonObject = JSON.parseObject(response);
                if (jsonObject.getInteger("code") == 200) {
                    result = 1;
@@ -284,7 +321,7 @@
            httpRequestLog.setResponse(response);
            httpRequestLog.setCreateTime(new Date());
            httpRequestLog.setResult(result);
            httpRequestLogService.insert(httpRequestLog);
            httpRequestLogService.save(httpRequestLog);
        }
        return response;
    }
@@ -292,7 +329,7 @@
    // 申请在库库位更换库位
    public synchronized String applyChangeLocNo(String locNo) {
        String wmsUrl = null;
        Config wmsSystemUriConfig = configService.selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemUri"));
        Config wmsSystemUriConfig = configService.getOne(new QueryWrapper<Config>().eq("code", "wmsSystemUri"));
        if (wmsSystemUriConfig != null) {
            wmsUrl = wmsSystemUriConfig.getValue();
        }
@@ -304,7 +341,7 @@
        String wmsSystemChangeLocNoUrl = null;
        Config wmsSystemChangeLocNoUrlConfig = configService
                .selectOne(new EntityWrapper<Config>().eq("code", "wmsSystemChangeLocNoUrl"));
                .getOne(new QueryWrapper<Config>().eq("code", "wmsSystemChangeLocNoUrl"));
        if (wmsSystemChangeLocNoUrlConfig != null) {
            wmsSystemChangeLocNoUrl = wmsSystemChangeLocNoUrlConfig.getValue();
        }
@@ -322,7 +359,7 @@
        List<Integer> crnRows = new ArrayList<>();
        if (findCrnNoResult.getCrnType().equals(SlaveType.Crn)) {
            BasCrnp basCrnp = basCrnpService.selectOne(new EntityWrapper<BasCrnp>().eq("crn_no", crnNo));
            BasCrnp basCrnp = basCrnpService.getOne(new QueryWrapper<BasCrnp>().eq("crn_no", crnNo));
            if (basCrnp == null) {
                return null;
            }
@@ -332,7 +369,7 @@
            }
        } else if (findCrnNoResult.getCrnType().equals(SlaveType.DualCrn)) {
            BasDualCrnp basDualCrnp = basDualCrnpService
                    .selectOne(new EntityWrapper<BasDualCrnp>().eq("crn_no", crnNo));
                    .getOne(new QueryWrapper<BasDualCrnp>().eq("crn_no", crnNo));
            if (basDualCrnp == null) {
                return null;
            }
@@ -359,7 +396,7 @@
                    .build()
                    .doPost();
            if (response != null) {
            if (!Cools.isEmpty(response)) {
                JSONObject jsonObject = JSON.parseObject(response);
                if (jsonObject.getInteger("code") == 200) {
                    result = 1;
@@ -383,7 +420,7 @@
            httpRequestLog.setResponse(response);
            httpRequestLog.setCreateTime(new Date());
            httpRequestLog.setResult(result);
            httpRequestLogService.insert(httpRequestLog);
            httpRequestLogService.save(httpRequestLog);
        }
        return response;
    }