package com.zy.integration.iot.biz.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.core.common.Cools; import com.core.common.R; import com.zy.asrs.entity.LocMast; import com.zy.asrs.entity.WrkDetl; import com.zy.asrs.entity.WrkMast; import com.zy.asrs.entity.param.MesToCombParam; import com.zy.asrs.entity.param.OutTaskParam; import com.zy.asrs.service.ExternalTaskFacadeService; import com.zy.asrs.service.LocMastService; import com.zy.asrs.service.WrkDetlService; import com.zy.integration.iot.biz.IotInstructionService; import com.zy.iot.config.IotProperties; import com.zy.iot.constant.IotConstants; import com.zy.iot.entity.IotFeedbackMessage; import com.zy.iot.entity.IotInstructionMessage; import com.zy.iot.entity.IotPublishRecord; import com.zy.iot.service.IotPublishRecordService; import com.zy.iot.util.IotInstructionIdGenerator; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; /** * IoT 指令核心编排服务。 * 负责把 XBPS 下发的 MQTT 指令转换成现有 ASRS 入库/出库流程,并在本地记录幂等、乱序和回执状态。 */ @Slf4j @Service public class IotInstructionServiceImpl implements IotInstructionService { @Autowired private IotPublishRecordService iotPublishRecordService; @Autowired private IotProperties iotProperties; @Autowired private ExternalTaskFacadeService externalTaskFacadeService; @Autowired private LocMastService locMastService; @Autowired private WrkDetlService wrkDetlService; /** * 处理 egress/asrs/stow 指令。 * 只接单和预登记,不在 MQTT 回调里直接触发真实入库作业。 */ @Override @Transactional(rollbackFor = Exception.class) public Long handleStowInstruction(IotInstructionMessage message, String topic, String rawPayload) { if (message == null || Cools.isEmpty(message.getInstructionId())) { log.warn("ignore invalid stow message, payload={}", rawPayload); return null; } IotPublishRecord existing = iotPublishRecordService.findByInstructionId(message.getInstructionId()); if (existing != null) { return existing.getId(); } IotPublishRecord record = initInboundRecord(message, topic, rawPayload, IotConstants.MESSAGE_TYPE_STOW); iotPublishRecordService.insert(record); if (Cools.isEmpty(message.getContainerId())) { return updateAsFailure(record, null, "containerId不能为空"); } if (isDelayed(message, IotConstants.MESSAGE_TYPE_STOW)) { return updateAsFailure(record, IotConstants.ERROR_CODE_DELAYED, "延迟消息已忽略"); } String preferredLocNo = resolvePreferredInboundLoc(message); MesToCombParam param = buildInboundParam(message, preferredLocNo); R result = externalTaskFacadeService.acceptInboundNotice(param); if (result == null || !Objects.equals(result.get("code"), 200)) { return updateAsFailure(record, null, resolveResultMessage(result, "接收入库指令失败")); } Date now = new Date(); record.setOrderNo(param.getOrderId()); record.setProcessStatus(IotConstants.PROCESS_STATUS_PROCESSED); record.setErrorCode(null); record.setErrorMessage(null); applyFeedback(record, IotConstants.FEEDBACK_SUCCESS, null, null, now); iotPublishRecordService.updateById(record); return record.getId(); } /** * 处理 egress/asrs/pick 指令。 * 远端出库口会先映射成本地站台号,再复用现有出库建单并自动确认放行。 */ @Override @Transactional(rollbackFor = Exception.class) public Long handlePickInstruction(IotInstructionMessage message, String topic, String rawPayload) { if (message == null || Cools.isEmpty(message.getInstructionId())) { log.warn("ignore invalid pick message, payload={}", rawPayload); return null; } IotPublishRecord existing = iotPublishRecordService.findByInstructionId(message.getInstructionId()); if (existing != null) { return existing.getId(); } IotPublishRecord record = initInboundRecord(message, topic, rawPayload, IotConstants.MESSAGE_TYPE_PICK); iotPublishRecordService.insert(record); if (Cools.isEmpty(message.getContainerId())) { return updateAsFailure(record, null, "containerId不能为空"); } if (isDelayed(message, IotConstants.MESSAGE_TYPE_PICK)) { return updateAsFailure(record, IotConstants.ERROR_CODE_DELAYED, "延迟消息已忽略"); } String remoteDestination = extractFirstDestination(message.getDestinationLocationIds()); Integer stationId = Cools.isEmpty(remoteDestination) ? null : iotProperties.getPickStationMappings().get(remoteDestination); if (stationId == null) { return updateAsFailure(record, IotConstants.ERROR_CODE_STATION_MAPPING, "未找到目标出库口映射"); } OutTaskParam param = new OutTaskParam(); param.setPalletId(message.getContainerId()); param.setOrderId(resolveReferenceId(message)); param.setStationId(String.valueOf(stationId)); param.setSeq(1); R result = externalTaskFacadeService.createOutboundTask(param, true); if (result == null || !Objects.equals(result.get("code"), 200)) { return updateAsFailure(record, null, resolveResultMessage(result, "接收拣货指令失败")); } Date now = new Date(); record.setOrderNo(param.getOrderId()); record.setWrkNo(resolveWrkNo(result)); record.setProcessStatus(IotConstants.PROCESS_STATUS_PROCESSED); record.setErrorCode(null); record.setErrorMessage(null); applyFeedback(record, IotConstants.FEEDBACK_SUCCESS, null, null, now); iotPublishRecordService.updateById(record); return record.getId(); } /** * 处理 XBPS 对 ASRS 动作消息的 feedback 回执。 * 这里只回写 IoT 发布状态,不改库存和工作档。 */ @Override @Transactional(rollbackFor = Exception.class) public void handleFeedbackAck(IotFeedbackMessage feedbackMessage, String topic, String rawPayload) { if (feedbackMessage == null || Cools.isEmpty(feedbackMessage.getInstructionId())) { return; } IotPublishRecord record = iotPublishRecordService.findByInstructionId(feedbackMessage.getInstructionId()); if (record == null || !IotConstants.DIRECTION_OUTBOUND.equals(record.getDirection())) { log.warn("ignore iot ack without outbound record, instructionId={}", feedbackMessage.getInstructionId()); return; } Date now = new Date(); record.setAckPayload(rawPayload); record.setAckTime(now); record.setReceiveTopic(topic); record.setPublishStatus(IotConstants.FEEDBACK_SUCCESS.equalsIgnoreCase(feedbackMessage.getStatus()) ? IotConstants.PUBLISH_STATUS_ACK_SUCCESS : IotConstants.PUBLISH_STATUS_ACK_FAILURE); record.setErrorCode(feedbackMessage.getErrorCode()); record.setErrorMessage(truncate(feedbackMessage.getMessage(), 500)); record.setUpdateTime(now); iotPublishRecordService.updateById(record); } /** * 在工作档真正完成后落一条待发布记录。 * 事务内只写库,不直接发 MQTT,避免业务提交和网络发送耦合。 */ @Override @Transactional(rollbackFor = Exception.class) public void queueWorkCompletion(WrkMast wrkMast) { if (!iotProperties.isEnabled() || wrkMast == null || wrkMast.getWrkNo() == null) { return; } IotPublishRecord existed = iotPublishRecordService.selectOne(new EntityWrapper() .eq("direction", IotConstants.DIRECTION_OUTBOUND) .eq("wrk_no", wrkMast.getWrkNo())); if (existed != null) { return; } IotPublishRecord record = buildOutboundRecord(wrkMast); if (record != null) { iotPublishRecordService.insert(record); } } private IotPublishRecord initInboundRecord(IotInstructionMessage message, String topic, String rawPayload, String messageType) { Date now = new Date(); IotPublishRecord record = new IotPublishRecord(); record.setInstructionId(message.getInstructionId()); record.setDirection(IotConstants.DIRECTION_INBOUND); record.setMessageType(messageType); record.setReceiveTopic(topic); record.setContainerId(message.getContainerId()); record.setReferenceId(message.getReferenceId()); record.setSourceLocationId(message.getSourceLocationId()); record.setDestinationLocationIds(normalizeDestinations(message.getDestinationLocationIds())); record.setMessageCreationTime(resolveCreationTime(message.getCreationTime())); record.setRawPayload(rawPayload); record.setProcessStatus(IotConstants.PROCESS_STATUS_RECEIVED); record.setPublishStatus(IotConstants.PUBLISH_STATUS_NOT_REQUIRED); record.setCreateTime(now); record.setUpdateTime(now); return record; } private boolean isDelayed(IotInstructionMessage message, String messageType) { Long creationTime = resolveCreationTime(message.getCreationTime()); IotPublishRecord latest = iotPublishRecordService.findLatestInboundInstruction(message.getContainerId(), messageType); return latest != null && latest.getMessageCreationTime() != null && latest.getMessageCreationTime() > creationTime; } /** * IoT 文档未提供物料和数量明细,这里沿用现有满托入库通知的兜底约定。 */ private MesToCombParam buildInboundParam(IotInstructionMessage message, String preferredLocNo) { MesToCombParam param = new MesToCombParam(); param.setPalletId(message.getContainerId()); param.setOrderId(""); param.setBizNo(message.getInstructionId()); param.setAnfme(1D); param.setFull(1); param.setLocId(preferredLocNo); param.setBoxType1("aws"); return param; } /** * 仅当指定库位当前仍为空位时才强制优先使用,否则回退到原有自动找位。 */ private String resolvePreferredInboundLoc(IotInstructionMessage message) { String preferredLoc = extractFirstDestination(message.getDestinationLocationIds()); if (Cools.isEmpty(preferredLoc)) { return null; } LocMast locMast = locMastService.selectById(preferredLoc); if (locMast != null && "O".equalsIgnoreCase(locMast.getLocSts())) { return locMast.getLocNo(); } return null; } private String resolveReferenceId(IotInstructionMessage message) { return Cools.isEmpty(message.getReferenceId()) ? message.getInstructionId() : message.getReferenceId(); } private Long updateAsFailure(IotPublishRecord record, String errorCode, String errorMessage) { Date now = new Date(); record.setProcessStatus(IotConstants.PROCESS_STATUS_FAILED); record.setOrderNo(Cools.isEmpty(record.getOrderNo()) ? record.getReferenceId() : record.getOrderNo()); record.setErrorCode(errorCode); record.setErrorMessage(truncate(errorMessage, 500)); applyFeedback(record, IotConstants.FEEDBACK_FAILURE, errorCode, errorMessage, now); iotPublishRecordService.updateById(record); return record.getId(); } private void applyFeedback(IotPublishRecord record, String status, String errorCode, String errorMessage, Date now) { IotFeedbackMessage feedbackMessage = new IotFeedbackMessage(); feedbackMessage.setInstructionId(record.getInstructionId()); feedbackMessage.setStatus(status); feedbackMessage.setErrorCode(errorCode); feedbackMessage.setMessage(errorMessage); feedbackMessage.setCreationTime(System.currentTimeMillis()); record.setFeedbackStatus(status); record.setPublishTopic(iotProperties.getTopics().getIngressFeedback()); record.setPublishPayload(JSON.toJSONString(feedbackMessage)); record.setPublishStatus(IotConstants.PUBLISH_STATUS_PENDING); record.setUpdateTime(now); } private String resolveResultMessage(R result, String defaultMessage) { if (result == null) { return defaultMessage; } Object msg = result.get("msg"); return msg == null ? defaultMessage : String.valueOf(msg); } private Integer resolveWrkNo(R result) { if (result == null) { return null; } Object wrkNo = result.get("wrkNo"); if (wrkNo instanceof Number) { return ((Number) wrkNo).intValue(); } if (wrkNo instanceof String && !Cools.isEmpty(String.valueOf(wrkNo))) { return Integer.valueOf(String.valueOf(wrkNo)); } return null; } private IotPublishRecord buildOutboundRecord(WrkMast wrkMast) { Integer ioType = wrkMast.getIoType(); if (ioType == null) { return null; } String containerId = resolveContainerId(wrkMast); if (Cools.isEmpty(containerId)) { return null; } if (Arrays.asList(1, 8, 11, 53, 54, 57).contains(ioType)) { return buildStowOutboundRecord(wrkMast, containerId); } if (Arrays.asList(101, 108).contains(ioType)) { return buildPickOutboundRecord(wrkMast, containerId); } return null; } private IotPublishRecord buildStowOutboundRecord(WrkMast wrkMast, String containerId) { List destinationLocationIds = wrkMast.getLocNo() == null ? Collections.emptyList() : Collections.singletonList(wrkMast.getLocNo()); if (destinationLocationIds.isEmpty()) { return null; } IotInstructionMessage payload = new IotInstructionMessage(); payload.setInstructionId(IotInstructionIdGenerator.generate(String.valueOf(wrkMast.getWrkNo()))); payload.setContainerId(containerId); if (Objects.equals(wrkMast.getIoType(), 11)) { payload.setSourceLocationId(wrkMast.getSourceLocNo()); } payload.setDestinationLocationIds(destinationLocationIds); if (!Cools.isEmpty(wrkMast.getUserNo())) { payload.setReferenceId(wrkMast.getUserNo()); } payload.setCreationTime(System.currentTimeMillis()); return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_STOW, iotProperties.getTopics().getIngressStow(), payload); } private IotPublishRecord buildPickOutboundRecord(WrkMast wrkMast, String containerId) { IotInstructionMessage payload = new IotInstructionMessage(); payload.setInstructionId(IotInstructionIdGenerator.generate(String.valueOf(wrkMast.getWrkNo()))); payload.setContainerId(containerId); payload.setSourceLocationId(wrkMast.getSourceLocNo()); payload.setDestinationLocationIds(Collections.singletonList(resolveRemoteStationId(wrkMast.getStaNo()))); if (!Cools.isEmpty(wrkMast.getUserNo())) { payload.setReferenceId(wrkMast.getUserNo()); } payload.setCreationTime(System.currentTimeMillis()); return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_PICK, iotProperties.getTopics().getIngressPick(), payload); } private IotPublishRecord initOutboundRecord(WrkMast wrkMast, String messageType, String publishTopic, IotInstructionMessage payload) { Date now = new Date(); IotPublishRecord record = new IotPublishRecord(); record.setInstructionId(payload.getInstructionId()); record.setDirection(IotConstants.DIRECTION_OUTBOUND); record.setMessageType(messageType); record.setPublishTopic(publishTopic); record.setContainerId(payload.getContainerId()); record.setReferenceId(payload.getReferenceId()); record.setSourceLocationId(payload.getSourceLocationId()); record.setDestinationLocationIds(normalizeDestinations(payload.getDestinationLocationIds())); record.setMessageCreationTime(payload.getCreationTime()); record.setPublishPayload(JSON.toJSONString(payload)); record.setRawPayload(record.getPublishPayload()); record.setProcessStatus(IotConstants.PROCESS_STATUS_PENDING); record.setPublishStatus(IotConstants.PUBLISH_STATUS_PENDING); record.setWrkNo(wrkMast.getWrkNo()); record.setOrderNo(wrkMast.getUserNo()); record.setCreateTime(now); record.setUpdateTime(now); return record; } private String resolveContainerId(WrkMast wrkMast) { if (!Cools.isEmpty(wrkMast.getBarcode())) { return wrkMast.getBarcode(); } List wrkDetls = wrkDetlService.selectList(new EntityWrapper().eq("wrk_no", wrkMast.getWrkNo())); if (Cools.isEmpty(wrkDetls)) { return null; } for (WrkDetl wrkDetl : wrkDetls) { if (!Cools.isEmpty(wrkDetl.getZpallet())) { return wrkDetl.getZpallet(); } } return null; } private String resolveRemoteStationId(Integer staNo) { if (staNo == null) { return null; } for (Map.Entry entry : iotProperties.getPickStationMappings().entrySet()) { if (Objects.equals(entry.getValue(), staNo)) { return entry.getKey(); } } return String.valueOf(staNo); } private String extractFirstDestination(List destinationLocationIds) { if (Cools.isEmpty(destinationLocationIds)) { return null; } for (String destinationLocationId : destinationLocationIds) { if (!Cools.isEmpty(destinationLocationId)) { return destinationLocationId; } } return null; } private Long resolveCreationTime(Long creationTime) { return creationTime == null ? System.currentTimeMillis() : creationTime; } private String normalizeDestinations(List destinationLocationIds) { if (Cools.isEmpty(destinationLocationIds)) { return null; } return JSON.toJSONString(new ArrayList(destinationLocationIds)); } private String truncate(String message, int maxLength) { if (message == null || message.length() <= maxLength) { return message; } return message.substring(0, maxLength); } }