package com.zy.acs.manager.core.service; import com.zy.acs.framework.common.Cools; import com.zy.acs.manager.core.domain.UnlockPathTask; import com.zy.acs.manager.core.service.astart.MapDataDispatcher; import com.zy.acs.manager.core.service.astart.domain.DynamicNode; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j @Service public class PathQueueConsumer { private ExecutorService consumerExecutor; @Autowired private LinkedBlockingQueue unlockTaskQueue; @Autowired private MapDataDispatcher mapDataDispatcher; @PostConstruct public void init() { Integer lev = MapDataDispatcher.MAP_DEFAULT_LEV; this.consumerExecutor = Executors.newSingleThreadExecutor(); this.consumerExecutor.execute(() -> { while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(30); List tasks = new ArrayList<>(); // if unlockTaskQueue was empty, then block tasks.add(unlockTaskQueue.take()); unlockTaskQueue.drainTo(tasks); if (!Cools.isEmpty(tasks)) { long startTime = System.currentTimeMillis(); List resetCodeIdxList = null; if (tasks.size() == 1) { resetCodeIdxList = this.getResetCodeList(lev, tasks.get(0)); } else if (tasks.size() > 1) { // log.info("consumer task count:{}", tasks.size()); resetCodeIdxList = this.getResetCodeList(lev,tasks); } if (!Cools.isEmpty(resetCodeIdxList)) { this.dealResetCodeList(lev, resetCodeIdxList); } // log.info("consumer unlock path spend time:{}", System.currentTimeMillis() - startTime); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("PathQueueConsumer[Thread.currentThread.interrupt]", e); break; } catch (Exception e) { log.error("PathQueueConsumer", e); } } }); } private List getResetCodeList(Integer lev, List tasks) { if (Cools.isEmpty(tasks)) { return new ArrayList<>(); } // 1.整理预处理数据 String[][] codeMatrix = mapDataDispatcher.getCodeMatrix(lev); DynamicNode[][] dynamicMatrix = mapDataDispatcher.getDynamicMatrix(lev); // Set codeDataSet = tasks.stream().map(UnlockPathTask::getCodeData).collect(Collectors.toSet()); Set agvNoSet = tasks.stream().map(UnlockPathTask::getAgvNo).collect(Collectors.toSet()); Map> agvNoCodeDataMap = new HashMap<>(); tasks.forEach(task -> agvNoCodeDataMap.computeIfAbsent(task.getAgvNo(), k -> new HashSet<>()).add(task.getCodeData())); // 2.获取agv的所有dynamic node,并且记录agv当前的dynamic node serial // agvNo : List<{code, serial}> Map> codeSerialMap = new HashMap<>(); // agvNo : serial Map agvNoSerialMap = new HashMap<>(); for (int i = 0; i < dynamicMatrix.length; i++) { for (int j = 0; j < dynamicMatrix[i].length; j++) { DynamicNode dynamicNode = dynamicMatrix[i][j]; if (agvNoSet.contains(dynamicNode.getVehicle())) { String codeData = codeMatrix[i][j]; String vehicle = dynamicNode.getVehicle(); int serial = dynamicNode.getSerial(); List codeSerials = codeSerialMap.computeIfAbsent(vehicle, k -> new ArrayList<>()); codeSerials.add(new CodeSerial(new int[] {i, j}, serial)); if (agvNoCodeDataMap.get(vehicle).contains(codeData)) { Integer agvNoSerial = agvNoSerialMap.get(vehicle); if (null == agvNoSerial) { agvNoSerialMap.put(vehicle, serial); } else { if (serial > agvNoSerial) { agvNoSerialMap.put(vehicle, serial); } } } } } } // 3.处理codeSerialMap,获取所有agv需要释放的code matrix list List resetCodeIdxList = new ArrayList<>(); for (Map.Entry> entry : codeSerialMap.entrySet()) { String agvNo = entry.getKey(); List codeSerials = entry.getValue(); Integer maxSerial = agvNoSerialMap.get(agvNo); for (CodeSerial codeSerial : codeSerials) { if (codeSerial.getSerial() < maxSerial) { resetCodeIdxList.add(codeSerial.getCodeMatrixIdx()); } } } return resetCodeIdxList; } private List getResetCodeList(Integer lev, UnlockPathTask task) { if (null == task) { return new ArrayList<>(); } DynamicNode[][] dynamicMatrix = mapDataDispatcher.getDynamicMatrix(lev); String agvNo = task.getAgvNo(); int[] codeMatrixIdx = mapDataDispatcher.getCodeMatrixIdx(lev, task.getCodeData()); DynamicNode dynamicNode = dynamicMatrix[codeMatrixIdx[0]][codeMatrixIdx[1]]; int serial = dynamicNode.getSerial(); List resetCodeIdxList = new ArrayList<>(); for (int i = 0; i < dynamicMatrix.length; i++) { for (int j = 0; j < dynamicMatrix[i].length; j++) { DynamicNode node = dynamicMatrix[i][j]; if (node.getVehicle().equals(agvNo)) { if (node.getSerial() < serial) { resetCodeIdxList.add(new int[] {i, j}); } } } } return resetCodeIdxList; } private void dealResetCodeList(Integer lev, List resetCodeIdxList) { if (Cools.isEmpty(resetCodeIdxList)) { return; } mapDataDispatcher.clearDynamicMatrixByCodeList(lev, resetCodeIdxList); } @Data public static class CodeSerial { private String codeData; private int[] codeMatrixIdx; private int serial; public CodeSerial(int[] codeMatrixIdx, int serial) { this.codeMatrixIdx = codeMatrixIdx; this.serial = serial; } } @PreDestroy public void destroy() { if (this.consumerExecutor != null) { this.consumerExecutor.shutdown(); try { if (!this.consumerExecutor.awaitTermination(5, TimeUnit.SECONDS)) { this.consumerExecutor.shutdownNow(); if (!this.consumerExecutor.awaitTermination(5, TimeUnit.SECONDS)) { log.error("this.consumerExecutor failed to shutdown"); } } } catch (InterruptedException ie) { this.consumerExecutor.shutdownNow(); Thread.currentThread().interrupt(); } } } }