package com.zy.integration.iot.biz.impl;
|
|
import com.alibaba.fastjson.JSON;
|
import com.baomidou.mybatisplus.mapper.EntityWrapper;
|
import com.core.common.Cools;
|
import com.core.common.R;
|
import com.zy.asrs.entity.LocMast;
|
import com.zy.asrs.entity.WrkDetl;
|
import com.zy.asrs.entity.WrkMast;
|
import com.zy.asrs.entity.param.MesToCombParam;
|
import com.zy.asrs.entity.param.OutTaskParam;
|
import com.zy.asrs.service.ExternalTaskFacadeService;
|
import com.zy.asrs.service.LocMastService;
|
import com.zy.asrs.service.WrkDetlService;
|
import com.zy.integration.iot.biz.IotInstructionService;
|
import com.zy.iot.config.IotProperties;
|
import com.zy.iot.constant.IotConstants;
|
import com.zy.iot.entity.IotFeedbackMessage;
|
import com.zy.iot.entity.IotInstructionMessage;
|
import com.zy.iot.entity.IotPublishRecord;
|
import com.zy.iot.service.IotPublishRecordService;
|
import com.zy.iot.util.IotInstructionIdGenerator;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import java.util.ArrayList;
|
import java.util.Arrays;
|
import java.util.Collections;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Objects;
|
|
/**
|
* IoT 指令核心编排服务。
|
* 负责把 XBPS 下发的 MQTT 指令转换成现有 ASRS 入库/出库流程,并在本地记录幂等、乱序和回执状态。
|
*/
|
@Slf4j
|
@Service
|
public class IotInstructionServiceImpl implements IotInstructionService {
|
|
@Autowired
|
private IotPublishRecordService iotPublishRecordService;
|
@Autowired
|
private IotProperties iotProperties;
|
@Autowired
|
private ExternalTaskFacadeService externalTaskFacadeService;
|
@Autowired
|
private LocMastService locMastService;
|
@Autowired
|
private WrkDetlService wrkDetlService;
|
|
/**
|
* 处理 egress/asrs/stow 指令。
|
* 只接单和预登记,不在 MQTT 回调里直接触发真实入库作业。
|
*/
|
@Override
|
@Transactional(rollbackFor = Exception.class)
|
public Long handleStowInstruction(IotInstructionMessage message, String topic, String rawPayload) {
|
if (message == null || Cools.isEmpty(message.getInstructionId())) {
|
log.warn("ignore invalid stow message, payload={}", rawPayload);
|
return null;
|
}
|
IotPublishRecord existing = iotPublishRecordService.findByInstructionId(message.getInstructionId());
|
if (existing != null) {
|
return existing.getId();
|
}
|
|
IotPublishRecord record = initInboundRecord(message, topic, rawPayload, IotConstants.MESSAGE_TYPE_STOW);
|
iotPublishRecordService.insert(record);
|
if (Cools.isEmpty(message.getContainerId())) {
|
return updateAsFailure(record, null, "containerId不能为空");
|
}
|
if (isDelayed(message, IotConstants.MESSAGE_TYPE_STOW)) {
|
return updateAsFailure(record, IotConstants.ERROR_CODE_DELAYED, "延迟消息已忽略");
|
}
|
|
String preferredLocNo = resolvePreferredInboundLoc(message);
|
MesToCombParam param = buildInboundParam(message, preferredLocNo);
|
R result = externalTaskFacadeService.acceptInboundNotice(param);
|
if (result == null || !Objects.equals(result.get("code"), 200)) {
|
return updateAsFailure(record, null, resolveResultMessage(result, "接收入库指令失败"));
|
}
|
|
Date now = new Date();
|
record.setOrderNo(param.getOrderId());
|
record.setProcessStatus(IotConstants.PROCESS_STATUS_PROCESSED);
|
record.setErrorCode(null);
|
record.setErrorMessage(null);
|
applyFeedback(record, IotConstants.FEEDBACK_SUCCESS, null, null, now);
|
iotPublishRecordService.updateById(record);
|
return record.getId();
|
}
|
|
/**
|
* 处理 egress/asrs/pick 指令。
|
* 远端出库口会先映射成本地站台号,再复用现有出库建单并自动确认放行。
|
*/
|
@Override
|
@Transactional(rollbackFor = Exception.class)
|
public Long handlePickInstruction(IotInstructionMessage message, String topic, String rawPayload) {
|
if (message == null || Cools.isEmpty(message.getInstructionId())) {
|
log.warn("ignore invalid pick message, payload={}", rawPayload);
|
return null;
|
}
|
IotPublishRecord existing = iotPublishRecordService.findByInstructionId(message.getInstructionId());
|
if (existing != null) {
|
return existing.getId();
|
}
|
|
IotPublishRecord record = initInboundRecord(message, topic, rawPayload, IotConstants.MESSAGE_TYPE_PICK);
|
iotPublishRecordService.insert(record);
|
if (Cools.isEmpty(message.getContainerId())) {
|
return updateAsFailure(record, null, "containerId不能为空");
|
}
|
if (isDelayed(message, IotConstants.MESSAGE_TYPE_PICK)) {
|
return updateAsFailure(record, IotConstants.ERROR_CODE_DELAYED, "延迟消息已忽略");
|
}
|
|
String remoteDestination = extractFirstDestination(message.getDestinationLocationIds());
|
Integer stationId = Cools.isEmpty(remoteDestination) ? null : iotProperties.getPickStationMappings().get(remoteDestination);
|
if (stationId == null) {
|
return updateAsFailure(record, IotConstants.ERROR_CODE_STATION_MAPPING, "未找到目标出库口映射");
|
}
|
|
OutTaskParam param = new OutTaskParam();
|
param.setPalletId(message.getContainerId());
|
param.setOrderId(resolveReferenceId(message));
|
param.setStationId(String.valueOf(stationId));
|
param.setSeq(1);
|
R result = externalTaskFacadeService.createOutboundTask(param, true);
|
if (result == null || !Objects.equals(result.get("code"), 200)) {
|
return updateAsFailure(record, null, resolveResultMessage(result, "接收拣货指令失败"));
|
}
|
|
Date now = new Date();
|
record.setOrderNo(param.getOrderId());
|
record.setWrkNo(resolveWrkNo(result));
|
record.setProcessStatus(IotConstants.PROCESS_STATUS_PROCESSED);
|
record.setErrorCode(null);
|
record.setErrorMessage(null);
|
applyFeedback(record, IotConstants.FEEDBACK_SUCCESS, null, null, now);
|
iotPublishRecordService.updateById(record);
|
return record.getId();
|
}
|
|
/**
|
* 处理 XBPS 对 ASRS 动作消息的 feedback 回执。
|
* 这里只回写 IoT 发布状态,不改库存和工作档。
|
*/
|
@Override
|
@Transactional(rollbackFor = Exception.class)
|
public void handleFeedbackAck(IotFeedbackMessage feedbackMessage, String topic, String rawPayload) {
|
if (feedbackMessage == null || Cools.isEmpty(feedbackMessage.getInstructionId())) {
|
return;
|
}
|
IotPublishRecord record = iotPublishRecordService.findByInstructionId(feedbackMessage.getInstructionId());
|
if (record == null || !IotConstants.DIRECTION_OUTBOUND.equals(record.getDirection())) {
|
log.warn("ignore iot ack without outbound record, instructionId={}", feedbackMessage.getInstructionId());
|
return;
|
}
|
Date now = new Date();
|
record.setAckPayload(rawPayload);
|
record.setAckTime(now);
|
record.setReceiveTopic(topic);
|
record.setPublishStatus(IotConstants.FEEDBACK_SUCCESS.equalsIgnoreCase(feedbackMessage.getStatus())
|
? IotConstants.PUBLISH_STATUS_ACK_SUCCESS
|
: IotConstants.PUBLISH_STATUS_ACK_FAILURE);
|
record.setErrorCode(feedbackMessage.getErrorCode());
|
record.setErrorMessage(truncate(feedbackMessage.getMessage(), 500));
|
record.setUpdateTime(now);
|
iotPublishRecordService.updateById(record);
|
}
|
|
/**
|
* 在工作档真正完成后落一条待发布记录。
|
* 事务内只写库,不直接发 MQTT,避免业务提交和网络发送耦合。
|
*/
|
@Override
|
@Transactional(rollbackFor = Exception.class)
|
public void queueWorkCompletion(WrkMast wrkMast) {
|
if (!iotProperties.isEnabled() || wrkMast == null || wrkMast.getWrkNo() == null) {
|
return;
|
}
|
IotPublishRecord existed = iotPublishRecordService.selectOne(new EntityWrapper<IotPublishRecord>()
|
.eq("direction", IotConstants.DIRECTION_OUTBOUND)
|
.eq("wrk_no", wrkMast.getWrkNo()));
|
if (existed != null) {
|
return;
|
}
|
IotPublishRecord record = buildOutboundRecord(wrkMast);
|
if (record != null) {
|
iotPublishRecordService.insert(record);
|
}
|
}
|
|
private IotPublishRecord initInboundRecord(IotInstructionMessage message, String topic, String rawPayload, String messageType) {
|
Date now = new Date();
|
IotPublishRecord record = new IotPublishRecord();
|
record.setInstructionId(message.getInstructionId());
|
record.setDirection(IotConstants.DIRECTION_INBOUND);
|
record.setMessageType(messageType);
|
record.setReceiveTopic(topic);
|
record.setContainerId(message.getContainerId());
|
record.setReferenceId(message.getReferenceId());
|
record.setSourceLocationId(message.getSourceLocationId());
|
record.setDestinationLocationIds(normalizeDestinations(message.getDestinationLocationIds()));
|
record.setMessageCreationTime(resolveCreationTime(message.getCreationTime()));
|
record.setRawPayload(rawPayload);
|
record.setProcessStatus(IotConstants.PROCESS_STATUS_RECEIVED);
|
record.setPublishStatus(IotConstants.PUBLISH_STATUS_NOT_REQUIRED);
|
record.setCreateTime(now);
|
record.setUpdateTime(now);
|
return record;
|
}
|
|
private boolean isDelayed(IotInstructionMessage message, String messageType) {
|
Long creationTime = resolveCreationTime(message.getCreationTime());
|
IotPublishRecord latest = iotPublishRecordService.findLatestInboundInstruction(message.getContainerId(), messageType);
|
return latest != null && latest.getMessageCreationTime() != null && latest.getMessageCreationTime() > creationTime;
|
}
|
|
/**
|
* IoT 文档未提供物料和数量明细,这里沿用现有满托入库通知的兜底约定。
|
*/
|
private MesToCombParam buildInboundParam(IotInstructionMessage message, String preferredLocNo) {
|
MesToCombParam param = new MesToCombParam();
|
param.setPalletId(message.getContainerId());
|
param.setOrderId("");
|
param.setBizNo(message.getInstructionId());
|
param.setAnfme(1D);
|
param.setFull(1);
|
param.setLocId(preferredLocNo);
|
param.setBoxType1("aws");
|
return param;
|
}
|
|
/**
|
* 仅当指定库位当前仍为空位时才强制优先使用,否则回退到原有自动找位。
|
*/
|
private String resolvePreferredInboundLoc(IotInstructionMessage message) {
|
String preferredLoc = extractFirstDestination(message.getDestinationLocationIds());
|
if (Cools.isEmpty(preferredLoc)) {
|
return null;
|
}
|
LocMast locMast = locMastService.selectById(preferredLoc);
|
if (locMast != null && "O".equalsIgnoreCase(locMast.getLocSts())) {
|
return locMast.getLocNo();
|
}
|
return null;
|
}
|
|
private String resolveReferenceId(IotInstructionMessage message) {
|
return Cools.isEmpty(message.getReferenceId()) ? message.getInstructionId() : message.getReferenceId();
|
}
|
|
private Long updateAsFailure(IotPublishRecord record, String errorCode, String errorMessage) {
|
Date now = new Date();
|
record.setProcessStatus(IotConstants.PROCESS_STATUS_FAILED);
|
record.setOrderNo(Cools.isEmpty(record.getOrderNo()) ? record.getReferenceId() : record.getOrderNo());
|
record.setErrorCode(errorCode);
|
record.setErrorMessage(truncate(errorMessage, 500));
|
applyFeedback(record, IotConstants.FEEDBACK_FAILURE, errorCode, errorMessage, now);
|
iotPublishRecordService.updateById(record);
|
return record.getId();
|
}
|
|
private void applyFeedback(IotPublishRecord record, String status, String errorCode, String errorMessage, Date now) {
|
IotFeedbackMessage feedbackMessage = new IotFeedbackMessage();
|
feedbackMessage.setInstructionId(record.getInstructionId());
|
feedbackMessage.setStatus(status);
|
feedbackMessage.setErrorCode(errorCode);
|
feedbackMessage.setMessage(errorMessage);
|
feedbackMessage.setCreationTime(System.currentTimeMillis());
|
|
record.setFeedbackStatus(status);
|
record.setPublishTopic(iotProperties.getTopics().getIngressFeedback());
|
record.setPublishPayload(JSON.toJSONString(feedbackMessage));
|
record.setPublishStatus(IotConstants.PUBLISH_STATUS_PENDING);
|
record.setUpdateTime(now);
|
}
|
|
private String resolveResultMessage(R result, String defaultMessage) {
|
if (result == null) {
|
return defaultMessage;
|
}
|
Object msg = result.get("msg");
|
return msg == null ? defaultMessage : String.valueOf(msg);
|
}
|
|
private Integer resolveWrkNo(R result) {
|
if (result == null) {
|
return null;
|
}
|
Object wrkNo = result.get("wrkNo");
|
if (wrkNo instanceof Number) {
|
return ((Number) wrkNo).intValue();
|
}
|
if (wrkNo instanceof String && !Cools.isEmpty(String.valueOf(wrkNo))) {
|
return Integer.valueOf(String.valueOf(wrkNo));
|
}
|
return null;
|
}
|
|
private IotPublishRecord buildOutboundRecord(WrkMast wrkMast) {
|
Integer ioType = wrkMast.getIoType();
|
if (ioType == null) {
|
return null;
|
}
|
String containerId = resolveContainerId(wrkMast);
|
if (Cools.isEmpty(containerId)) {
|
return null;
|
}
|
if (Arrays.asList(1, 8, 11, 53, 54, 57).contains(ioType)) {
|
return buildStowOutboundRecord(wrkMast, containerId);
|
}
|
if (Arrays.asList(101, 108).contains(ioType)) {
|
return buildPickOutboundRecord(wrkMast, containerId);
|
}
|
return null;
|
}
|
|
private IotPublishRecord buildStowOutboundRecord(WrkMast wrkMast, String containerId) {
|
List<String> destinationLocationIds = wrkMast.getLocNo() == null
|
? Collections.<String>emptyList()
|
: Collections.singletonList(wrkMast.getLocNo());
|
if (destinationLocationIds.isEmpty()) {
|
return null;
|
}
|
IotInstructionMessage payload = new IotInstructionMessage();
|
payload.setInstructionId(IotInstructionIdGenerator.generate(String.valueOf(wrkMast.getWrkNo())));
|
payload.setContainerId(containerId);
|
if (Objects.equals(wrkMast.getIoType(), 11)) {
|
payload.setSourceLocationId(wrkMast.getSourceLocNo());
|
}
|
payload.setDestinationLocationIds(destinationLocationIds);
|
if (!Cools.isEmpty(wrkMast.getUserNo())) {
|
payload.setReferenceId(wrkMast.getUserNo());
|
}
|
payload.setCreationTime(System.currentTimeMillis());
|
return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_STOW, iotProperties.getTopics().getIngressStow(), payload);
|
}
|
|
private IotPublishRecord buildPickOutboundRecord(WrkMast wrkMast, String containerId) {
|
IotInstructionMessage payload = new IotInstructionMessage();
|
payload.setInstructionId(IotInstructionIdGenerator.generate(String.valueOf(wrkMast.getWrkNo())));
|
payload.setContainerId(containerId);
|
payload.setSourceLocationId(wrkMast.getSourceLocNo());
|
payload.setDestinationLocationIds(Collections.singletonList(resolveRemoteStationId(wrkMast.getStaNo())));
|
if (!Cools.isEmpty(wrkMast.getUserNo())) {
|
payload.setReferenceId(wrkMast.getUserNo());
|
}
|
payload.setCreationTime(System.currentTimeMillis());
|
return initOutboundRecord(wrkMast, IotConstants.MESSAGE_TYPE_PICK, iotProperties.getTopics().getIngressPick(), payload);
|
}
|
|
private IotPublishRecord initOutboundRecord(WrkMast wrkMast, String messageType, String publishTopic, IotInstructionMessage payload) {
|
Date now = new Date();
|
IotPublishRecord record = new IotPublishRecord();
|
record.setInstructionId(payload.getInstructionId());
|
record.setDirection(IotConstants.DIRECTION_OUTBOUND);
|
record.setMessageType(messageType);
|
record.setPublishTopic(publishTopic);
|
record.setContainerId(payload.getContainerId());
|
record.setReferenceId(payload.getReferenceId());
|
record.setSourceLocationId(payload.getSourceLocationId());
|
record.setDestinationLocationIds(normalizeDestinations(payload.getDestinationLocationIds()));
|
record.setMessageCreationTime(payload.getCreationTime());
|
record.setPublishPayload(JSON.toJSONString(payload));
|
record.setRawPayload(record.getPublishPayload());
|
record.setProcessStatus(IotConstants.PROCESS_STATUS_PENDING);
|
record.setPublishStatus(IotConstants.PUBLISH_STATUS_PENDING);
|
record.setWrkNo(wrkMast.getWrkNo());
|
record.setOrderNo(wrkMast.getUserNo());
|
record.setCreateTime(now);
|
record.setUpdateTime(now);
|
return record;
|
}
|
|
private String resolveContainerId(WrkMast wrkMast) {
|
if (!Cools.isEmpty(wrkMast.getBarcode())) {
|
return wrkMast.getBarcode();
|
}
|
List<WrkDetl> wrkDetls = wrkDetlService.selectList(new EntityWrapper<WrkDetl>().eq("wrk_no", wrkMast.getWrkNo()));
|
if (Cools.isEmpty(wrkDetls)) {
|
return null;
|
}
|
for (WrkDetl wrkDetl : wrkDetls) {
|
if (!Cools.isEmpty(wrkDetl.getZpallet())) {
|
return wrkDetl.getZpallet();
|
}
|
}
|
return null;
|
}
|
|
private String resolveRemoteStationId(Integer staNo) {
|
if (staNo == null) {
|
return null;
|
}
|
for (Map.Entry<String, Integer> entry : iotProperties.getPickStationMappings().entrySet()) {
|
if (Objects.equals(entry.getValue(), staNo)) {
|
return entry.getKey();
|
}
|
}
|
return String.valueOf(staNo);
|
}
|
|
private String extractFirstDestination(List<String> destinationLocationIds) {
|
if (Cools.isEmpty(destinationLocationIds)) {
|
return null;
|
}
|
for (String destinationLocationId : destinationLocationIds) {
|
if (!Cools.isEmpty(destinationLocationId)) {
|
return destinationLocationId;
|
}
|
}
|
return null;
|
}
|
|
private Long resolveCreationTime(Long creationTime) {
|
return creationTime == null ? System.currentTimeMillis() : creationTime;
|
}
|
|
private String normalizeDestinations(List<String> destinationLocationIds) {
|
if (Cools.isEmpty(destinationLocationIds)) {
|
return null;
|
}
|
return JSON.toJSONString(new ArrayList<String>(destinationLocationIds));
|
}
|
|
private String truncate(String message, int maxLength) {
|
if (message == null || message.length() <= maxLength) {
|
return message;
|
}
|
return message.substring(0, maxLength);
|
}
|
}
|