package com.zy.asrs.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.core.exception.CoolException; import com.zy.asrs.controller.requestParam.StationRequestParam; import com.zy.asrs.domain.vo.StationStatus; import com.zy.asrs.service.CtuMainService; import com.zy.common.utils.HttpHandler; import com.zy.common.utils.News; import com.zy.core.cache.MessageQueue; import com.zy.core.cache.SlaveConnection; import com.zy.core.enums.SlaveType; import com.zy.core.model.DevpSlave; import com.zy.core.model.Task; import com.zy.core.model.protocol.StaProtocol; import com.zy.core.properties.SlaveProperties; import com.zy.core.thread.SiemensDevpThread; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * 立体仓库WCS系统主流程业务 * Created by vincent on 2020/8/6 */ @Slf4j @Service("ctuMainService") @Transactional @Data public class CtuMainServiceImpl implements CtuMainService { public static final long COMMAND_TIMEOUT = 5 * 1000; private static final long SLEEP_DURATION = 8000L; private static final int MAX_WORK_NO = 9999; private static final int RANDOM_WORK_NO_MAX = 10000; private static final int CTU_STATION_1001 = 1001; private static final int CTU_STATION_1007 = 1007; private static final int CTU_STATION_1006 = 1006; private static final int CTU_STATION_1004 = 1004; @Value("${ctu.url}") private String ctuUrl; @Value("${ctu.station}") private String station; @Autowired private SlaveProperties slaveProperties; // 为不同的操作添加细粒度锁 private final ReentrantLock outLock = new ReentrantLock(); private final ReentrantLock inLock = new ReentrantLock(); private final ReentrantLock in2Lock = new ReentrantLock(); /** * 出库的时候,设备上走 */ @Override public void out(Integer mark) { executeWithLock(outLock, () -> { for (DevpSlave devp : slaveProperties.getDevp()) { for (DevpSlave.Sta outSta : devp.getOutSta()) { processOutboundStation(devp, outSta, mark); } } }, "出库处理"); } /** * 入库,从拣料站到入库站(CTU取货站) */ @Override public void in(Integer mark) { executeWithLock(inLock, () -> processInboundStation(mark, CTU_STATION_1004, (short) CTU_STATION_1006), "入库处理"); } @Override public void in2(Integer mark) { executeWithLock(in2Lock, () -> { // 获取1007站点信息 SiemensDevpThread devpThread = (SiemensDevpThread) SlaveConnection.get(SlaveType.Devp, 1); if (devpThread == null) { log.warn("无法获取设备线程,设备ID: 1"); return; } StaProtocol staProtocol = getStationProtocol(devpThread, CTU_STATION_1007); if (staProtocol == null || !isStationReadyForProcessing(staProtocol)) { return; } if (staProtocol.getWorkNo() > 0 && staProtocol.isAutoing() && !staProtocol.isEmptyMk() && staProtocol.isPakMk()) { if (station(CTU_STATION_1007)) { Integer workNo = staProtocol.getWorkNo(); // 清空1007站点 clearStationProtocol(staProtocol, devpThread); // 更新1006站点信息 updateStation1006(devpThread, workNo); } } }, "入库第二步处理"); } @Override @Transactional public boolean station(Integer staNo) { return checkStationStatus(staNo); } /** * 在锁的保护下执行操作 * @param lock 锁 * @param operation 要执行的操作 * @param operationName 操作名称,用于日志 */ private void executeWithLock(ReentrantLock lock, Runnable operation, String operationName) { lock.lock(); try { operation.run(); } catch (Exception e) { log.error("{}异常", operationName, e); News.error("{}异常: {}", operationName, e.getMessage()); } finally { lock.unlock(); } } /** * 处理出库站点 * @param devp 设备 * @param outSta 出库站点 * @param mark 标记 */ private void processOutboundStation(DevpSlave devp, DevpSlave.Sta outSta, Integer mark) { SiemensDevpThread devpThread = getDeviceThread(SlaveType.Devp, devp.getId()); if (devpThread == null) { log.warn("无法获取设备线程,设备ID: {}", devp.getId()); return; } StaProtocol staProtocol = getStationProtocol(devpThread, outSta.getStaNo()); if (staProtocol == null || !isStationReadyForProcessing(staProtocol)) { return; } if (isOutboundConditionMet(staProtocol) && station(CTU_STATION_1001)) { executeOutboundProcess(staProtocol, devp, mark, devpThread); } else { logStationConditionError(mark, staProtocol); } } /** * 检查出库条件是否满足 * @param staProtocol 站点协议 * @return 条件是否满足 */ private boolean isOutboundConditionMet(StaProtocol staProtocol) { return staProtocol.isAutoing() && !staProtocol.isEmptyMk() && (staProtocol.getWorkNo() == 0 || staProtocol.getWorkNo() == MAX_WORK_NO) && staProtocol.isPakMk(); } /** * 执行出库处理 * @param staProtocol 站点协议 * @param devp 设备 * @param mark 标记 * @param devpThread 设备线程 */ private void executeOutboundProcess(StaProtocol staProtocol, DevpSlave devp, Integer mark, SiemensDevpThread devpThread) { News.warnNoLog("" + mark + " - 0" + " - 开始执行"); // 更新站点信息 且 下发plc命令 staProtocol.setWorkNo((int) (Math.random() * RANDOM_WORK_NO_MAX)); staProtocol.setStaNo((short) CTU_STATION_1004); devpThread.setPakMk(staProtocol.getSiteId(), false); boolean result = MessageQueue.offer(SlaveType.Devp, devp.getId(), new Task(2, staProtocol)); log.info("输送线下发3:{},{}", staProtocol.getWorkNo(), CTU_STATION_1004); if (result) { sleepWithInterruptHandling(SLEEP_DURATION, "出库处理"); } else { News.error("" + mark + " - 2" + " - 发布命令至输送线队列失败!!! [plc编号:{}]", devp.getId()); } } /** * 记录站点条件错误日志 * @param mark 标记 * @param staProtocol 站点协议 */ private void logStationConditionError(Integer mark, StaProtocol staProtocol) { String errorMsg = String.format( "%s - 6 - 站点信息不符合入库条件!!! 自动信号:%s、可入信号:%s、空板信号:%s、工作号:%s、锁定标记%s、入库印记:%s", mark, staProtocol.isLoading(), staProtocol.isInEnable(), staProtocol.isEmptyMk(), staProtocol.getWorkNo(), staProtocol.isPakMk(), staProtocol.getStamp() ); News.errorNoLog(errorMsg); log.warn(errorMsg); } /** * 处理入库站点 * @param mark 标记 * @param sourceStaNo 源站点编号 * @param targetStaNo 目标站点编号 */ private void processInboundStation(Integer mark, int sourceStaNo, short targetStaNo) { SiemensDevpThread devpThread = getDeviceThread(SlaveType.Devp, 1); if (devpThread == null) { log.warn("无法获取设备线程,设备ID: 1"); return; } StaProtocol staProtocol = getStationProtocol(devpThread, sourceStaNo); if (staProtocol == null || !isStationReadyForProcessing(staProtocol)) { return; } if (staProtocol.getWorkNo() > 0 && staProtocol.isAutoing() && !staProtocol.isEmptyMk() && staProtocol.isPakMk()) { if (staProtocol.getStaNo() == sourceStaNo) { sleepWithInterruptHandling(SLEEP_DURATION, "入库处理"); staProtocol.setStaNo(targetStaNo); boolean result = MessageQueue.offer(SlaveType.Devp, 1, new Task(2, staProtocol)); log.info("入库输送线下发:{},{}", staProtocol.getWorkNo(), targetStaNo); } } } /** * 获取设备线程 * @param slaveType 从类型 * @param id 设备ID * @return 设备线程 */ private SiemensDevpThread getDeviceThread(SlaveType slaveType, Integer id) { Object device = SlaveConnection.get(slaveType, id); if (device instanceof SiemensDevpThread) { return (SiemensDevpThread) device; } return null; } /** * 获取站点协议对象 * @param devpThread 设备线程 * @param staNo 站点编号 * @return 站点协议对象 */ private StaProtocol getStationProtocol(SiemensDevpThread devpThread, int staNo) { try { StaProtocol staProtocol = devpThread.getStation().get(staNo); if (staProtocol != null) { return staProtocol.clone(); } } catch (Exception e) { log.error("获取站点协议异常,站点编号: {}", staNo, e); } return null; } /** * 检查站点是否准备好处理 * @param staProtocol 站点协议 * @return 是否准备好 */ private boolean isStationReadyForProcessing(StaProtocol staProtocol) { return staProtocol != null && staProtocol.isLoading(); } /** * 带中断处理的睡眠 * @param duration 休眠时间 * @param operationName 操作名称,用于日志 */ private void sleepWithInterruptHandling(long duration, String operationName) { try { Thread.sleep(duration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("{}线程中断异常", operationName, e); throw new RuntimeException(operationName + "线程中断", e); } } /** * 清空站点协议 * @param staProtocol 站点协议 * @param devpThread 设备线程 */ private void clearStationProtocol(StaProtocol staProtocol, SiemensDevpThread devpThread) { staProtocol.setWorkNo(0); staProtocol.setStaNo((short) 0); boolean result = MessageQueue.offer(SlaveType.Devp, 1, new Task(2, staProtocol)); log.info("1007站点清空:{},{}", staProtocol.getWorkNo(), CTU_STATION_1006); } /** * 更新1006站点 * @param devpThread 设备线程 * @param workNo 工作号 */ private void updateStation1006(SiemensDevpThread devpThread, Integer workNo) { StaProtocol staProtocol1006 = devpThread.getStation().get(CTU_STATION_1006); if (staProtocol1006 != null) { staProtocol1006.setWorkNo(workNo); staProtocol1006.setStaNo((short) CTU_STATION_1007); boolean result2 = MessageQueue.offer(SlaveType.Devp, 1, new Task(2, staProtocol1006)); log.info("1006站点往前走一格:{},{}", staProtocol1006.getWorkNo(), CTU_STATION_1007); } else { log.warn("无法获取1006站点协议"); } } /** * 检查站点状态 * @param staNo 站点编号 * @return 站点是否可通行 */ private boolean checkStationStatus(Integer staNo) { StationRequestParam stationRequestParam = new StationRequestParam(); List staNos = new ArrayList<>(); staNos.add(staNo + ""); stationRequestParam.setStaNos(staNos); String response = ""; try { response = new HttpHandler.Builder() .setUri(ctuUrl) .setPath(station) .setTimeout(1200, TimeUnit.SECONDS) .setJson(JSON.toJSONString(stationRequestParam)) .build() .doPost(); JSONObject jsonObject = JSON.parseObject(response); if (jsonObject.getInteger("code").equals(200)) { log.info("RCS返回数据:{}", response); JSONArray data = jsonObject.getJSONArray("data"); List stationStatuses = JSONArray.parseArray(data.toString(), StationStatus.class); for (StationStatus object : stationStatuses) { if (object.getStaNo().equals(staNo + "")) { if (object.getConveyable()) { return true; } else { log.info("站点:{}状态不对", staNo); return false; } } } log.info("未返回站点状态:{}", staNo); } else { log.error("调用下发任务接口报错,响应码:{},响应内容:{}", jsonObject.getInteger("code"), response); throw new CoolException("调用下发任务接口报错,响应码:" + jsonObject.getInteger("code")); } } catch (CoolException e) { log.error("调用站点状态接口异常", e); throw e; } catch (Exception e) { log.error("检查站点状态失败,站点编号:{}", staNo, e); } return false; } }