| src/main/java/com/zy/asrs/service/impl/TaskServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/main/java/com/zy/asrs/task/AgvScheduler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/main/java/com/zy/asrs/task/handler/AgvHandler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| src/main/java/com/zy/asrs/task/handler/WorkMastHandler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/zy/asrs/service/impl/TaskServiceImpl.java
@@ -229,7 +229,7 @@ if (!Cools.isEmpty(locNo)) { LocCache locMast = locCacheService.selectOne(new EntityWrapper<LocCache>().eq("loc_no", locNo)); if (Cools.isEmpty(locMast)) { throw new CoolException("取消工作档失败,库位不存在:" + locNo); locMast.setLocSts("O"); } if (!Cools.isEmpty(locSts)) { locMast.setLocSts(locSts); src/main/java/com/zy/asrs/task/AgvScheduler.java
@@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** * @author pang.jiabao @@ -67,10 +68,20 @@ private Long lastAllocatedTaskId = null; /** * 分配站点定时任务 * 分配站点任务执行标志,确保同一时间只有一个线程在执行分配站点循环 */ private final AtomicBoolean isAllocateSite = new AtomicBoolean(false); /** * 呼叫AGV定时任务执行标志,确保同一时间只有一个线程在执行分配站点循环 */ private final AtomicBoolean iscallAgv = new AtomicBoolean(false); /** * agv工作档分配站点定时任务 * 查询状态7(待呼叫AGV)但没有分配站点的任务,为其分配可用站点 * 只负责分配站点,不呼叫AGV * 每次只处理一个任务,避免高并发执行 * 使用AtomicBoolean确保单线程执行循环 */ @Scheduled(cron = "0/5 * * * * ? ") private void allocateSite() { @@ -78,12 +89,15 @@ log.debug("定时任务allocateSite:调度器未启用,跳过执行"); return; } if (!isAllocateSite.compareAndSet(false, true)) { log.debug("定时任务allocateSite:上一次分配站点任务还在执行中,跳过本次执行"); return; } // 构建查询条件:查询所有待呼叫AGV但没有分配站点的任务 EntityWrapper<Task> wrapper = new EntityWrapper<Task>(); wrapper.eq("wrk_sts", 7); // 待呼叫AGV状态 wrapper.eq("task_type", "agv"); // AGV任务类型 wrapper.andNew("(is_deleted = 0)"); wrapper.eq("is_deleted", 0); // 排除已删除的任务 wrapper.andNew() .isNull("sta_no") .or() @@ -91,48 +105,25 @@ .or() .eq("sta_no", "0"); wrapper.orderBy("id", true); // 按id升序排序 wrapper.last("OFFSET 0 ROWS FETCH NEXT 21 ROWS ONLY"); List<Task> taskList = taskService.selectList(wrapper); // 如果上次处理过任务,从下一个任务开始查询(轮询) if (lastAllocatedTaskId != null) { wrapper.gt("id", lastAllocatedTaskId); } // 查询待分配站点的任务,每次只查询一个 List<Task> taskList = taskService.selectList( wrapper.last("OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY") ); // 如果从上次任务之后没有找到任务,从头开始查询(实现循环轮询) if (taskList.isEmpty() && lastAllocatedTaskId != null) { lastAllocatedTaskId = null; // 重置,从头开始 taskList = taskService.selectList( new EntityWrapper<Task>() .eq("wrk_sts", 7) .eq("task_type", "agv") .andNew("(is_deleted = 0)") .andNew() .isNull("sta_no") .or() .eq("sta_no", "") .or() .eq("sta_no", "0") .orderBy("id", true) .last("OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY") ); } if(taskList.isEmpty()) { log.debug("定时任务allocateSite:没有待分配站点的任务(wrk_sts=7,task_type=agv,sta_no为空)"); isAllocateSite.set(false); return; } try { for (Task task : taskList) { String errorMsg = agvHandler.allocateSiteForTask(task); // 调用分配站点逻辑 Task task = taskList.get(0); String displayTaskId = (task.getWrkNo() != null) ? String.valueOf(task.getWrkNo()) : String.valueOf(task.getId()); log.info("定时任务allocateSite:开始为任务ID:{}分配站点(wrk_no={},ioType={})", displayTaskId, task.getWrkNo(), task.getIoType()); String errorMsg = agvHandler.allocateSiteForTask(task); // 检查是否成功分配了站点 String staNo = task.getStaNo(); @@ -146,69 +137,70 @@ log.info("定时任务allocateSite:任务ID:{}无法分配站点:{},不更新lastAllocatedTaskId(当前:{}),下次将重新尝试", displayTaskId, errorMsg != null ? errorMsg : "所有站点都被占用", lastAllocatedTaskId); } // 每个任务处理完后等待1秒, try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("定时任务allocateSite:延迟被中断", e); break; // 如果被中断,退出循环 } } } finally { // 确保标志位被重置,即使发生异常也能释放锁 isAllocateSite.set(false); } } /** * 呼叫AGV定时任务 * 查询状态7(待呼叫AGV)且已分配站点的任务,呼叫AGV * 呼叫成功后,状态从7(待呼叫AGV)变为8(正在搬运) * 每次只处理一个任务,避免高并发执行 * 使用轮询机制,确保不会一直处理同一个任务 */ @Scheduled(cron = "0/5 * * * * ? ") private void callAgv() { if (!schedulerProperties.isEnabled()) { log.debug("定时任务callAgv:调度器未启用,跳过执行"); log.debug("呼叫AGV定时任务:调度器未启用,跳过执行"); return; } if (!iscallAgv.compareAndSet(false, true)) { log.debug("呼叫AGV定时任务:上一次分配站点任务还在执行中,跳过本次执行"); return; } try { // 构建查询条件:查询状态7(待呼叫AGV)且已分配站点的任务 EntityWrapper<Task> wrapper = new EntityWrapper<Task>(); wrapper.eq("wrk_sts", 7); // 待呼叫AGV状态 wrapper.eq("task_type", "agv"); // AGV任务类型 wrapper.andNew("(is_deleted = 0)"); wrapper.eq("is_deleted", 0); // 排除已删除的任务 wrapper.isNotNull("sta_no"); // 必须有站点分配 wrapper.ne("sta_no", ""); // 站点不能为空字符串 wrapper.ne("sta_no", "0"); // 站点不能为0 wrapper.orderBy("id", true); // 按id升序排序 wrapper.last("OFFSET 0 ROWS FETCH NEXT 22 ROWS ONLY"); // 如果上次处理过任务,从下一个任务开始查询(轮询) if (lastProcessedTaskId != null) { wrapper.gt("id", lastProcessedTaskId); } // if (lastProcessedTaskId != null) { // wrapper.gt("id", lastProcessedTaskId); // } // 查询待呼叫agv任务,每次只查询一个 List<Task> taskList = taskService.selectList( wrapper.last("OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY") ); // 查询待呼叫agv任务 List<Task> taskList = taskService.selectList(wrapper); // 如果从上次任务之后没有找到任务,从头开始查询(实现循环轮询) if (taskList.isEmpty() && lastProcessedTaskId != null) { lastProcessedTaskId = null; // 重置,从头开始 taskList = taskService.selectList( new EntityWrapper<Task>() .eq("wrk_sts", 7) .eq("task_type", "agv") .andNew("(is_deleted = 0)") .isNotNull("sta_no") .ne("sta_no", "") .ne("sta_no", "0") .orderBy("id", true) .last("OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY") ); } if(taskList.isEmpty()) { log.debug("定时任务callAgv:没有待呼叫AGV的任务(wrk_sts=7,task_type=agv,sta_no不为空)"); log.debug("呼叫AGV定时任务:没有待呼叫AGV的任务(wrk_sts=7,task_type=agv,sta_no不为空)"); iscallAgv.set(false); return; } for (Task task : taskList) { // 调用处理逻辑:呼叫AGV,成功后状态从7变为8 Task task = taskList.get(0); String displayTaskId = (task.getWrkNo() != null) ? String.valueOf(task.getWrkNo()) : String.valueOf(task.getId()); log.info("定时任务callAgv:开始处理任务ID:{}(wrk_no={},ioType={},sta_no={})", log.info("呼叫AGV定时任务:开始处理任务ID:{}(wrk_no={},ioType={},sta_no={})", displayTaskId, task.getWrkNo(), task.getIoType(), task.getStaNo()); boolean processed = agvHandler.callAgv(taskList); boolean processed = agvHandler.callAgv(Collections.singletonList(task)); // 只有当任务成功处理(成功呼叫AGV,状态从7变为8)时,才更新lastProcessedTaskId // 如果任务被跳过(站点被占用等),不更新lastProcessedTaskId,下次会重新尝试 @@ -219,6 +211,19 @@ } else { log.info("定时任务callAgv:任务ID:{}被跳过,不更新lastProcessedTaskId(当前:{}),下次将重新尝试处理此任务", displayTaskId, lastProcessedTaskId); } // 每个任务处理完后等待1秒, try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("呼叫AGV定时任务:延迟被中断", e); break; // 如果被中断,退出循环 } } } finally { // 确保标志位被重置,即使发生异常也能释放锁 iscallAgv.set(false); } } @@ -263,7 +268,7 @@ if (!schedulerProperties.isEnabled()) { return; } List<Task> taskList = taskService.selectList(new EntityWrapper<Task>().eq("wrk_sts", 9).andNew("(is_deleted = 0)")); List<Task> taskList = taskService.selectList(new EntityWrapper<Task>().eq("wrk_sts", 9).eq("is_deleted", 0)); if(taskList.isEmpty()) { return; } @@ -302,7 +307,7 @@ .eq("task_type", "agv") .eq("wrk_sts", 8L) // 已呼叫AGV状态 .eq("wrk_no", wrkMast.getWrkNo()) .andNew("(is_deleted = 0)"); .eq("is_deleted", 0); // 排除已删除的任务 List<Task> agvTasks = taskService.selectList(taskWrapper1); // 如果通过wrk_no没找到,且有条码,则通过条码查询 @@ -311,7 +316,7 @@ .eq("task_type", "agv") .eq("wrk_sts", 8L) .eq("barcode", wrkMast.getBarcode()) .andNew("(is_deleted = 0)"); .eq("is_deleted", 0); // 排除已删除的任务 agvTasks = taskService.selectList(taskWrapper2); } @@ -368,7 +373,7 @@ new EntityWrapper<Task>() .eq("task_type", "agv") .eq("wrk_sts", 8L) // 正在搬运 .andNew("(is_deleted = 0)") .eq("is_deleted", 0) // 排除已删除的任务 .andNew() .isNull("sta_no") .or() @@ -444,7 +449,7 @@ log.error("修复异常任务:转移任务到历史表失败,taskId:{}", displayTaskId, e); } } } else { }/* else { // 工作档未完成,尝试分配站点或重置状态 // 先尝试分配站点 String errorMsg = agvHandler.allocateSiteForTask(task); @@ -462,7 +467,7 @@ displayTaskId, errorMsg != null ? errorMsg : "所有站点都被占用"); } } } }*/ } if (fixedCount > 0 || completedCount > 0) { @@ -491,7 +496,7 @@ new EntityWrapper<Task>() .eq("task_type", "agv") .eq("wrk_sts", 8L) // 已呼叫AGV状态 .andNew("(is_deleted = 0)") .eq("is_deleted", 0) // 排除已删除的任务 ); if (agvTasks.isEmpty()) { src/main/java/com/zy/asrs/task/handler/AgvHandler.java
@@ -189,7 +189,7 @@ .in("wrk_sts", 7L, 8L) .in("io_type", ioTypes) .ne("id", task.getId()) // 排除当前任务本身 .andNew("(is_deleted = 0)") .eq("is_deleted", 0) // 排除已删除的任务 ); int taskCount = allTasks != null ? allTasks.size() : 0; @@ -267,7 +267,7 @@ .isNotNull("plc_str_time") // 只检查已收到AGV确认的任务(plc_str_time不为空) .in("io_type", ioTypes) .ne("id", task.getId()) // 排除当前任务本身 .andNew("(is_deleted = 0)") .eq("is_deleted", 0) // 排除已删除的任务 ); // 检查并自动结束已完成工作档的AGV任务 @@ -368,7 +368,7 @@ .build() .doPost(); // 打印返回参数 // log.info("{}呼叫agv搬运 - 返回参数:{}", namespace, response); log.info("{}呼叫agv搬运,请求参数「{}」 - 返回参数:{}", namespace,body, response); // 检查响应是否为空 if (response == null || response.trim().isEmpty()) { @@ -575,7 +575,8 @@ * @param taskTypeName 任务类型名称(用于日志) * @return 仍然有效的正在搬运的任务列表(已完成的已被移除) */ private List<Task> checkAndCompleteFinishedTasks(List<Task> transportingTasks, String taskTypeName) { @Transactional(rollbackFor = Exception.class) public List<Task> checkAndCompleteFinishedTasks(List<Task> transportingTasks, String taskTypeName) { if (transportingTasks == null || transportingTasks.isEmpty()) { return transportingTasks; } @@ -676,6 +677,8 @@ return validTasks; } /** * 从memo字段中获取重试次数 * memo格式:如果包含"retryCount:数字",则返回该数字,否则返回0 @@ -745,7 +748,7 @@ /** * 构造请求内容(仙工M4格式) */ private String getRequest(Task task, String nameSpace) { public String getRequest(Task task, String nameSpace) { JSONObject object = new JSONObject(); // taskId使用工作号(wrk_no),格式:T + 工作号 // 如果工作号为空,则使用任务ID作为备选 @@ -792,6 +795,7 @@ object.put("kind", kind); return object.toJSONString(); } /** * 为任务分配站点(定时任务中调用) @@ -884,75 +888,18 @@ .in("dev_no", sites) .eq("in_enable", "Y") .eq("canining", "Y") .eq("loading", "N") .ne("dev_no", 0) // 排除dev_no=0的无效站点 ); if (devListWithConfig.isEmpty()) { // 站点配置不允许入库(canining != "Y"),暂不分配,等待配置开通(只在定时任务中记录日志) // 记录每个站点的canining状态 StringBuilder caniningStatusInfo = new StringBuilder(); for (Integer siteNo : sites) { BasDevp dev = allDevList.stream() .filter(d -> d.getDevNo().equals(siteNo)) .findFirst() .orElse(null); if (dev != null) { if (caniningStatusInfo.length() > 0) { caniningStatusInfo.append("; "); if (devListWithConfig==null || devListWithConfig.isEmpty()) { log.warn("任务ID:{}没有可入站点(站点未开通可入允许:canining='Y'),暂不分配站点,等待配置开通。能入站点列表:{}", displayTaskId, sites); return null; } caniningStatusInfo.append("站点").append(siteNo) .append("(canining=").append(dev.getCanining()).append(")"); } } log.warn("任务ID:{}没有可入站点(站点未开通可入允许:canining='Y'),暂不分配站点,等待配置开通。能入站点列表:{},canining状态:{}", displayTaskId, sites, caniningStatusInfo.toString()); return null; // 返回null,表示暂不分配,等待配置开通 } // 获取没有出库任务的站点(从已配置可入的站点中筛选) List<Integer> configuredSites = devListWithConfig.stream() .map(BasDevp::getDevNo) .collect(Collectors.toList()); log.info("任务ID:{},已配置可入站点列表:{}", displayTaskId, configuredSites); List<Integer> canInSites = basDevpMapper.getCanInSites(configuredSites); if (canInSites.isEmpty()) { // 所有已配置可入的站点都有出库任务,暂不分配,等待下次定时任务再尝试(只在定时任务中记录日志) log.warn("任务ID:{}没有可入站点(请等待出库完成),暂不分配站点,等待下次定时任务再尝试。已配置可入站点列表:{}", displayTaskId, configuredSites); return null; // 返回null,表示暂不分配,等待下次定时任务再尝试 } log.info("任务ID:{},没有出库任务的站点列表:{}", displayTaskId, canInSites); // 寻找入库任务最少的站点(且必须in_enable="Y"能入 和 canining="Y"可入),排除dev_no=0的无效站点 List<BasDevp> devList = basDevpMapper.selectList(new EntityWrapper<BasDevp>() .in("dev_no", canInSites) .eq("in_enable", "Y") .eq("canining", "Y") .ne("dev_no", 0) // 排除dev_no=0的无效站点 ).stream() .filter(dev -> dev.getDevNo() != null && dev.getDevNo() != 0) // 再次过滤,确保不为null或0 .collect(Collectors.toList()); if (devList.isEmpty()) { // 理论上不应该到这里,因为前面已经检查过了,但为了安全起见还是保留 String errorMsg = "没有可入站点(in_enable='Y'且canining='Y')"; log.warn("任务ID:{},{},可入站点列表:{}", displayTaskId, errorMsg, canInSites); return errorMsg; } // 记录每个站点的入库任务数 StringBuilder siteInQtyInfo = new StringBuilder(); for (BasDevp dev : devList) { if (siteInQtyInfo.length() > 0) { siteInQtyInfo.append("; "); } siteInQtyInfo.append("站点").append(dev.getDevNo()) .append("(入库任务数=").append(dev.getInQty()).append(")"); } log.info("任务ID:{},可入站点及其入库任务数:{}", displayTaskId, siteInQtyInfo.toString()); // 先按规则排序(入库任务数排序) devList.sort(Comparator.comparing(BasDevp::getInQty)); devListWithConfig.sort(Comparator.comparing(BasDevp::getInQty)); // 根据任务类型确定要检查的io_type列表 Integer taskIoType = task.getIoType(); List<Integer> checkIoTypes = null; @@ -994,13 +941,20 @@ } // 从可用站点中筛选出未被分配的站点 List<BasDevp> unallocatedSites = devList.stream() List<BasDevp> unallocatedSites = devListWithConfig.stream() .filter(dev -> { String staNo = String.valueOf(dev.getDevNo()); return !allocatedSiteNos.contains(staNo); }) .collect(Collectors.toList()); List<BasDevp> unallocatedSites2= new ArrayList<>(); for(int i=0;devListWithConfig.size()>i;i++){ unallocatedSites2.add(devListWithConfig.get(i)); } // if(unallocatedSites==null || unallocatedSites.isEmpty()){ // unallocatedSites=unallocatedSites2; // // } // 只使用未分配站点 if (unallocatedSites.isEmpty()) { // 未分配站点为空:不分配站点 @@ -1088,7 +1042,6 @@ log.info("任务ID:{}已分配站点:{},机器人组:{},任务类型:{}", displayTaskId, endSite, robotGroup, taskTypeName); return null; // 分配成功,返回null } /** * 根据站点编号判断机器人组 * @param staNo 站点编号 @@ -1113,6 +1066,7 @@ return agvProperties.getRobotGroupEast(); // 默认使用东侧机器人组 } } /** * 任务完成转历史 释放暂存点 @@ -1180,7 +1134,11 @@ log.info("agv任务档转历史成功:{}", taskIds); } @Transactional(rollbackFor = Exception.class) public void moveTaskToHistory(Task agvTask) { moveTaskToHistory(Collections.singletonList(agvTask)); } /** * 货物到达出库口,生成agv任务 */ src/main/java/com/zy/asrs/task/handler/WorkMastHandler.java
@@ -823,8 +823,9 @@ return; } // 检查工作档是否已完成或已转历史档 // 检查工作档是否已完成或已转历史档,并获取工作档的目标站作为AGV的源库位 boolean workCompleted = false; String wrkMastStaNo = null; // 工作档的目标站,将作为AGV的源库位 if (outTask.getWrkNo() != null) { // 检查工作档是否存在且已完成 WrkMast wrkMast = wrkMastService.selectOne( @@ -836,7 +837,9 @@ // 出库任务完成状态:14(已出库未确认)或15(出库更新完成) if (wrkSts != null && (wrkSts == 14L || wrkSts == 15L)) { workCompleted = true; log.debug("工作档{}已完成,状态:{}", outTask.getWrkNo(), wrkSts); // 获取工作档的目标站作为AGV的源库位 wrkMastStaNo = wrkMast.getStaNo(); log.debug("工作档{}已完成,状态:{},目标站:{}", outTask.getWrkNo(), wrkSts, wrkMastStaNo); } } else { // 如果工作档不存在,检查历史档 @@ -848,7 +851,9 @@ // 出库任务历史档完成状态:15(出库更新完成) if (logWrkSts == 15L) { workCompleted = true; log.debug("工作档{}已转历史档并完结,历史档状态:{}", outTask.getWrkNo(), logWrkSts); // 从历史档获取目标站作为AGV的源库位 wrkMastStaNo = wrkMastLog.getStaNo() != null ? String.valueOf(wrkMastLog.getStaNo()) : null; log.debug("工作档{}已转历史档并完结,历史档状态:{},目标站:{}", outTask.getWrkNo(), logWrkSts, wrkMastStaNo); } } } @@ -891,6 +896,8 @@ // 创建空托出库/满托出库任务 Task cacheTask = new Task(); Date now = new Date(); // 出库工作档完成时,工作档的目标站就是AGV的源库位 // AGV的目标站是asr_loc_cache里面WA开头的库位 cacheTask.setWrkNo(workNo) .setIoTime(now) .setWrkSts(7L) // 工作状态:7.待呼叫AGV @@ -903,8 +910,8 @@ .setFullPlt(isEmptyPallet ? "N" : "Y") // 满板:空托=N,满托=Y .setPicking("N") .setExitMk("N") .setSourceLocNo(null) // 出库任务不需要源库位 .setLocNo(cacheLoc.getLocNo()) // 目标库位(缓存库位) .setSourceLocNo(wrkMastStaNo) // AGV的源库位:出库工作档的目标站 .setLocNo(cacheLoc.getLocNo()) // AGV的目标库位:asr_loc_cache里面WA开头的库位 .setEmptyMk(isEmptyPallet ? "Y" : "N") // 空板标记 .setBarcode(outTask.getBarcode()) // 托盘码 .setLinkMis("N") @@ -927,8 +934,8 @@ return; } log.info("成功生成{}任务,任务ID:{},工作号:{},源站点:{},目标站点:{},缓存库位:{}", isEmptyPallet ? "空托出库" : "满托出库", cacheTask.getId(), workNo, outboundStaNo, cacheStaNo, cacheLoc.getLocNo()); log.info("成功生成{}任务,任务ID:{},工作号:{},源站点:{},目标站点:{},源库位(工作档目标站):{},目标库位(WA缓存库位):{}", isEmptyPallet ? "空托出库" : "满托出库", cacheTask.getId(), workNo, outboundStaNo, cacheStaNo, wrkMastStaNo, cacheLoc.getLocNo()); } /**