自动化立体仓库 - WMS系统
src/main/java/com/zy/integration/iot/biz/impl/IotInstructionServiceImpl.java
@@ -18,6 +18,7 @@
import com.zy.iot.entity.IotFeedbackMessage;
import com.zy.iot.entity.IotInstructionMessage;
import com.zy.iot.entity.IotPublishRecord;
import com.zy.iot.service.IotDbConfigService;
import com.zy.iot.service.IotPublishRecordService;
import com.zy.iot.util.IotInstructionIdGenerator;
import lombok.extern.slf4j.Slf4j;
@@ -41,10 +42,21 @@
@Service
public class IotInstructionServiceImpl implements IotInstructionService {
    /**
     * MQTT 组托入口写入 WaitPakin.boxType1=aws。
     * 真实入库任务生成时该字段会同步到 WrkDetl.boxType1,可作为入库任务来源的本地兜底标识。
     */
    private static final String MQTT_SOURCE_BOX_TYPE = "aws";
    private static final List<Integer> STOW_WORK_IO_TYPES = Arrays.asList(1, 8, 11, 53, 54, 57);
    private static final List<Integer> PICK_WORK_IO_TYPES = Arrays.asList(101, 108);
    @Autowired
    private IotPublishRecordService iotPublishRecordService;
    @Autowired
    private IotProperties iotProperties;
    @Autowired
    private IotDbConfigService iotDbConfigService;
    @Autowired
    private ExternalTaskFacadeService externalTaskFacadeService;
    @Autowired
@@ -120,7 +132,8 @@
        }
        String remoteDestination = extractFirstDestination(message.getDestinationLocationIds());
        Integer stationId = Cools.isEmpty(remoteDestination) ? null : iotProperties.getPickStationMappings().get(remoteDestination);
//        Integer stationId = Cools.isEmpty(remoteDestination) ? null : iotProperties.getPickStationMappings().get(remoteDestination);
        Integer stationId =602;
        if (stationId == null) {
            return updateAsFailure(record, IotConstants.ERROR_CODE_STATION_MAPPING, "未找到目标出库口映射");
        }
@@ -129,7 +142,8 @@
        param.setPalletId(message.getContainerId());
        param.setOrderId(resolveReferenceId(message));
        param.setStationId(String.valueOf(stationId));
        param.setSeq(1);
        param.setBatchSeq("amazon"+new Date().getTime());
        param.setSeq(0);
        R result = externalTaskFacadeService.createOutboundTask(param, true);
        if (result == null || !Objects.equals(result.get("code"), 200)) {
            return updateAsFailure(record, null, resolveResultMessage(result, "接收拣货指令失败"));
@@ -175,19 +189,47 @@
    }
    /**
     * 判断任务是否来自 MQTT 指令。
     *
     * 出库 MQTT 指令在 handlePickInstruction 中会先落 asr_iot_publish_record(INBOUND/PICK),
     * 再通过统一出库订单执行逻辑生成 WrkMast,因此优先用 wrk_no 精确匹配。
     *
     * 入库 MQTT 指令先落 WaitPakin,WCS 扫码后才生成 WrkMast,入站记录当时还不知道 wrk_no,
     * 所以入库侧优先看 WrkDetl.boxType1=aws,再用托盘号 + instructionId/orderNo 做兜底。
     */
    @Override
    public boolean isMqttOriginWork(WrkMast wrkMast) {
        if (wrkMast == null || wrkMast.getWrkNo() == null || wrkMast.getIoType() == null) {
            return false;
        }
        if (STOW_WORK_IO_TYPES.contains(wrkMast.getIoType())) {
            return isMqttOriginStowWork(wrkMast);
        }
        if (PICK_WORK_IO_TYPES.contains(wrkMast.getIoType())) {
            return isMqttOriginPickWork(wrkMast);
        }
        return false;
    }
    /**
     * 在工作档真正完成后落一条待发布记录。
     * 事务内只写库,不直接发 MQTT,避免业务提交和网络发送耦合。
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void queueWorkCompletion(WrkMast wrkMast) {
        if (!iotProperties.isEnabled() || wrkMast == null || wrkMast.getWrkNo() == null) {
        if (!iotDbConfigService.isMqttEnabled() || wrkMast == null || wrkMast.getWrkNo() == null) {
            return;
        }
        IotPublishRecord existed = iotPublishRecordService.selectOne(new EntityWrapper<IotPublishRecord>()
        // MQTT 完工上报只处理 MQTT 来源的任务;ERP/open 来源的任务由 ERP 完工上报调度处理。
        // 这样同一个完工事件不会同时进入 MQTT 和 ERP 两条上报链路。
        if (!isMqttOriginWork(wrkMast)) {
            return;
        }
        int existedCount = iotPublishRecordService.selectCount(new EntityWrapper<IotPublishRecord>()
                .eq("direction", IotConstants.DIRECTION_OUTBOUND)
                .eq("wrk_no", wrkMast.getWrkNo()));
        if (existed != null) {
        if (existedCount > 0) {
            return;
        }
        IotPublishRecord record = buildOutboundRecord(wrkMast);
@@ -233,7 +275,7 @@
        param.setAnfme(1D);
        param.setFull(1);
        param.setLocId(preferredLocNo);
        param.setBoxType1("aws");
        param.setBoxType1(MQTT_SOURCE_BOX_TYPE);
        return param;
    }
@@ -267,6 +309,118 @@
        return record.getId();
    }
    private boolean isMqttOriginStowWork(WrkMast wrkMast) {
        List<WrkDetl> wrkDetls = loadWrkDetls(wrkMast);
        if (hasMqttSourceBoxType(wrkDetls)) {
            return true;
        }
        if (hasProcessedInboundByWrkNo(wrkMast.getWrkNo(), IotConstants.MESSAGE_TYPE_STOW)) {
            return true;
        }
        String containerId = resolveContainerId(wrkMast, wrkDetls);
        if (Cools.isEmpty(containerId)) {
            return false;
        }
        String bizNo = firstNonEmpty(wrkMast.getUserNo(), resolveOrderNo(wrkDetls));
        return hasProcessedInboundByBusinessKey(IotConstants.MESSAGE_TYPE_STOW, containerId, bizNo);
    }
    private boolean isMqttOriginPickWork(WrkMast wrkMast) {
        if (hasProcessedInboundByWrkNo(wrkMast.getWrkNo(), IotConstants.MESSAGE_TYPE_PICK)) {
            return true;
        }
        List<WrkDetl> wrkDetls = loadWrkDetls(wrkMast);
        String containerId = resolveContainerId(wrkMast, wrkDetls);
        if (Cools.isEmpty(containerId)) {
            return false;
        }
        String orderNo = firstNonEmpty(wrkMast.getUserNo(), resolveOrderNo(wrkDetls));
        return hasProcessedInboundByBusinessKey(IotConstants.MESSAGE_TYPE_PICK, containerId, orderNo);
    }
    private List<WrkDetl> loadWrkDetls(WrkMast wrkMast) {
        if (wrkMast == null || wrkMast.getWrkNo() == null) {
            return Collections.emptyList();
        }
        List<WrkDetl> wrkDetls = wrkDetlService.selectList(new EntityWrapper<WrkDetl>().eq("wrk_no", wrkMast.getWrkNo()));
        return wrkDetls == null ? Collections.<WrkDetl>emptyList() : wrkDetls;
    }
    private boolean hasMqttSourceBoxType(List<WrkDetl> wrkDetls) {
        if (Cools.isEmpty(wrkDetls)) {
            return false;
        }
        for (WrkDetl wrkDetl : wrkDetls) {
            if (wrkDetl != null && MQTT_SOURCE_BOX_TYPE.equalsIgnoreCase(wrkDetl.getBoxType1())) {
                return true;
            }
        }
        return false;
    }
    private boolean hasProcessedInboundByWrkNo(Integer wrkNo, String messageType) {
        if (wrkNo == null || Cools.isEmpty(messageType)) {
            return false;
        }
        return iotPublishRecordService.selectCount(new EntityWrapper<IotPublishRecord>()
                .eq("direction", IotConstants.DIRECTION_INBOUND)
                .eq("message_type", messageType)
                .eq("process_status", IotConstants.PROCESS_STATUS_PROCESSED)
                .eq("wrk_no", wrkNo)) > 0;
    }
    private boolean hasProcessedInboundByBusinessKey(String messageType, String containerId, String bizNo) {
        if (Cools.isEmpty(messageType) || Cools.isEmpty(containerId) || Cools.isEmpty(bizNo)) {
            return false;
        }
        // 出库:order_no/reference_id 通常是订单号;入库:instruction_id 通常会同步到 WrkMast.userNo。
        // 三个字段分别查询,避免 MyBatis-Plus 旧版本复杂 OR 条件在不同数据库方言下生成异常 SQL。
        return hasProcessedInbound(messageType, containerId, "order_no", bizNo)
                || hasProcessedInbound(messageType, containerId, "reference_id", bizNo)
                || hasProcessedInbound(messageType, containerId, "instruction_id", bizNo);
    }
    private boolean hasProcessedInbound(String messageType, String containerId, String column, String value) {
        return iotPublishRecordService.selectCount(new EntityWrapper<IotPublishRecord>()
                .eq("direction", IotConstants.DIRECTION_INBOUND)
                .eq("message_type", messageType)
                .eq("process_status", IotConstants.PROCESS_STATUS_PROCESSED)
                .eq("container_id", containerId)
                .eq(column, value)) > 0;
    }
    private String resolveContainerId(WrkMast wrkMast, List<WrkDetl> wrkDetls) {
        if (wrkMast != null && !Cools.isEmpty(wrkMast.getBarcode())) {
            return wrkMast.getBarcode();
        }
        if (Cools.isEmpty(wrkDetls)) {
            return null;
        }
        for (WrkDetl wrkDetl : wrkDetls) {
            if (wrkDetl != null && !Cools.isEmpty(wrkDetl.getZpallet())) {
                return wrkDetl.getZpallet();
            }
        }
        return null;
    }
    private String resolveOrderNo(List<WrkDetl> wrkDetls) {
        if (Cools.isEmpty(wrkDetls)) {
            return null;
        }
        for (WrkDetl wrkDetl : wrkDetls) {
            if (wrkDetl != null && !Cools.isEmpty(wrkDetl.getOrderNo())) {
                return wrkDetl.getOrderNo();
            }
        }
        return null;
    }
    private String firstNonEmpty(String first, String second) {
        return Cools.isEmpty(first) ? second : first;
    }
    private void applyFeedback(IotPublishRecord record, String status, String errorCode, String errorMessage, Date now) {
        IotFeedbackMessage feedbackMessage = new IotFeedbackMessage();
        feedbackMessage.setInstructionId(record.getInstructionId());
@@ -276,7 +430,7 @@
        feedbackMessage.setCreationTime(System.currentTimeMillis());
        record.setFeedbackStatus(status);
        record.setPublishTopic(iotProperties.getTopics().getIngressFeedback());
        record.setPublishTopic(iotDbConfigService.getEffectiveTopics().getIngressFeedback());
        record.setPublishPayload(JSON.toJSONString(feedbackMessage));
        record.setPublishStatus(IotConstants.PUBLISH_STATUS_PENDING);
        record.setUpdateTime(now);
@@ -340,7 +494,7 @@
            payload.setReferenceId(wrkMast.getUserNo());
        }
        payload.setCreationTime(System.currentTimeMillis());
        return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_STOW, iotProperties.getTopics().getIngressStow(), payload);
        return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_STOW, iotDbConfigService.getEffectiveTopics().getIngressStow(), payload);
    }
    private IotPublishRecord buildPickOutboundRecord(WrkMast wrkMast, String containerId) {
@@ -353,7 +507,7 @@
            payload.setReferenceId(wrkMast.getUserNo());
        }
        payload.setCreationTime(System.currentTimeMillis());
        return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_PICK, iotProperties.getTopics().getIngressPick(), payload);
        return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_PICK, iotDbConfigService.getEffectiveTopics().getIngressPick(), payload);
    }
    private IotPublishRecord initOutboundRecord(WrkMast wrkMast, String messageType, String publishTopic, IotInstructionMessage payload) {