package com.zy.acs.manager.core.service;
|
|
import com.zy.acs.framework.common.Cools;
|
import com.zy.acs.manager.core.domain.UnlockPathTask;
|
import com.zy.acs.manager.core.service.astart.MapDataDispatcher;
|
import com.zy.acs.manager.core.service.astart.domain.DynamicNode;
|
import lombok.Data;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.PostConstruct;
|
import javax.annotation.PreDestroy;
|
import java.util.*;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.TimeUnit;
|
import java.util.stream.Collectors;
|
|
@Slf4j
|
@Service
|
public class PathQueueConsumer {
|
|
private ExecutorService consumerExecutor;
|
|
@Autowired
|
private LinkedBlockingQueue<UnlockPathTask> unlockTaskQueue;
|
@Autowired
|
private MapDataDispatcher mapDataDispatcher;
|
|
@PostConstruct
|
public void init() {
|
Integer lev = MapDataDispatcher.MAP_DEFAULT_LEV;
|
|
this.consumerExecutor = Executors.newSingleThreadExecutor();
|
this.consumerExecutor.execute(() -> {
|
while (!Thread.currentThread().isInterrupted()) {
|
try {
|
Thread.sleep(10);
|
List<UnlockPathTask> tasks = new ArrayList<>();
|
// if unlockTaskQueue was empty, then block
|
tasks.add(unlockTaskQueue.take());
|
unlockTaskQueue.drainTo(tasks);
|
|
if (!Cools.isEmpty(tasks)) {
|
long startTime = System.currentTimeMillis();
|
|
List<int[]> resetCodeIdxList = null;
|
if (tasks.size() == 1) {
|
resetCodeIdxList = this.getResetCodeList(lev, tasks.get(0));
|
} else if (tasks.size() > 1) {
|
// log.info("consumer task count:{}", tasks.size());
|
resetCodeIdxList = this.getResetCodeList(lev,tasks);
|
}
|
|
if (!Cools.isEmpty(resetCodeIdxList)) {
|
this.dealResetCodeList(lev, resetCodeIdxList);
|
}
|
|
// log.info("consumer unlock path spend time:{}", System.currentTimeMillis() - startTime);
|
}
|
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.error("PathQueueConsumer[Thread.currentThread.interrupt]", e);
|
break;
|
} catch (Exception e) {
|
log.error("PathQueueConsumer", e);
|
}
|
}
|
});
|
}
|
|
private List<int[]> getResetCodeList(Integer lev, List<UnlockPathTask> tasks) {
|
if (Cools.isEmpty(tasks)) {
|
return new ArrayList<>();
|
}
|
|
// 1.整理预处理数据
|
String[][] codeMatrix = mapDataDispatcher.getCodeMatrix(lev);
|
DynamicNode[][] dynamicMatrix = mapDataDispatcher.getDynamicMatrix(lev);
|
|
// Set<String> codeDataSet = tasks.stream().map(UnlockPathTask::getCodeData).collect(Collectors.toSet());
|
Set<String> agvNoSet = tasks.stream().map(UnlockPathTask::getAgvNo).collect(Collectors.toSet());
|
|
Map<String, Set<String>> agvNoCodeDataMap = new HashMap<>();
|
tasks.forEach(task -> agvNoCodeDataMap.computeIfAbsent(task.getAgvNo(), k -> new HashSet<>()).add(task.getCodeData()));
|
|
|
// 2.获取agv的所有dynamic node,并且记录agv当前的dynamic node serial
|
|
// agvNo : List<{code, serial}>
|
Map<String, List<CodeSerial>> codeSerialMap = new HashMap<>();
|
// agvNo : serial
|
Map<String, Integer> agvNoSerialMap = new HashMap<>();
|
|
|
for (int i = 0; i < dynamicMatrix.length; i++) {
|
for (int j = 0; j < dynamicMatrix[i].length; j++) {
|
DynamicNode dynamicNode = dynamicMatrix[i][j];
|
|
if (agvNoSet.contains(dynamicNode.getVehicle())) {
|
|
String codeData = codeMatrix[i][j];
|
String vehicle = dynamicNode.getVehicle();
|
int serial = dynamicNode.getSerial();
|
|
List<CodeSerial> codeSerials = codeSerialMap.computeIfAbsent(vehicle, k -> new ArrayList<>());
|
codeSerials.add(new CodeSerial(new int[] {i, j}, serial));
|
|
if (agvNoCodeDataMap.get(vehicle).contains(codeData)) {
|
Integer agvNoSerial = agvNoSerialMap.get(vehicle);
|
if (null == agvNoSerial) {
|
agvNoSerialMap.put(vehicle, serial);
|
} else {
|
if (serial > agvNoSerial) {
|
agvNoSerialMap.put(vehicle, serial);
|
}
|
}
|
|
}
|
|
}
|
|
}
|
}
|
|
// 3.处理codeSerialMap,获取所有agv需要释放的code matrix list
|
|
List<int[]> resetCodeIdxList = new ArrayList<>();
|
|
for (Map.Entry<String, List<CodeSerial>> entry : codeSerialMap.entrySet()) {
|
String agvNo = entry.getKey();
|
List<CodeSerial> codeSerials = entry.getValue();
|
|
Integer maxSerial = agvNoSerialMap.get(agvNo);
|
|
for (CodeSerial codeSerial : codeSerials) {
|
if (codeSerial.getSerial() < maxSerial) {
|
resetCodeIdxList.add(codeSerial.getCodeMatrixIdx());
|
}
|
}
|
|
}
|
|
return resetCodeIdxList;
|
}
|
|
private List<int[]> getResetCodeList(Integer lev, UnlockPathTask task) {
|
if (null == task) {
|
return new ArrayList<>();
|
}
|
DynamicNode[][] dynamicMatrix = mapDataDispatcher.getDynamicMatrix(lev);
|
|
String agvNo = task.getAgvNo();
|
int[] codeMatrixIdx = mapDataDispatcher.getCodeMatrixIdx(lev, task.getCodeData());
|
|
DynamicNode dynamicNode = dynamicMatrix[codeMatrixIdx[0]][codeMatrixIdx[1]];
|
int serial = dynamicNode.getSerial();
|
|
List<int[]> resetCodeIdxList = new ArrayList<>();
|
|
for (int i = 0; i < dynamicMatrix.length; i++) {
|
for (int j = 0; j < dynamicMatrix[i].length; j++) {
|
|
DynamicNode node = dynamicMatrix[i][j];
|
if (node.getVehicle().equals(agvNo)) {
|
if (node.getSerial() < serial) {
|
resetCodeIdxList.add(new int[] {i, j});
|
}
|
}
|
}
|
}
|
|
return resetCodeIdxList;
|
}
|
|
private void dealResetCodeList(Integer lev, List<int[]> resetCodeIdxList) {
|
if (Cools.isEmpty(resetCodeIdxList)) {
|
return;
|
}
|
|
mapDataDispatcher.clearDynamicMatrixByCodeList(lev, resetCodeIdxList);
|
}
|
|
@Data
|
public static class CodeSerial {
|
private String codeData;
|
private int[] codeMatrixIdx;
|
private int serial;
|
|
public CodeSerial(int[] codeMatrixIdx, int serial) {
|
this.codeMatrixIdx = codeMatrixIdx;
|
this.serial = serial;
|
}
|
}
|
|
@PreDestroy
|
public void destroy() {
|
if (this.consumerExecutor != null) {
|
this.consumerExecutor.shutdown();
|
try {
|
if (!this.consumerExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
this.consumerExecutor.shutdownNow();
|
if (!this.consumerExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
log.error("this.consumerExecutor failed to shutdown");
|
}
|
}
|
} catch (InterruptedException ie) {
|
this.consumerExecutor.shutdownNow();
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|
|
}
|