package com.zy.asrs.task; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.baomidou.mybatisplus.mapper.Wrapper; import com.core.common.Cools; import com.zy.asrs.entity.Task; import com.zy.asrs.entity.WrkMast; import com.zy.asrs.mapper.WrkMastMapper; import com.zy.asrs.entity.WrkMastLog; import com.zy.asrs.service.TaskService; import com.zy.asrs.service.WrkMastLogService; import com.zy.asrs.service.WrkMastService; import com.zy.asrs.task.handler.AgvHandler; import com.zy.common.properties.SchedulerProperties; import com.zy.system.entity.Config; import com.zy.system.service.ConfigService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** * @author pang.jiabao * @description AGV交互相关定时任务 * @createDate 2025/11/18 14:18 */ @Slf4j @Component public class AgvScheduler { @Resource private ConfigService configService; @Resource private AgvHandler agvHandler; @Resource private TaskService taskService; @Resource private WrkMastMapper wrkMastMapper; @Resource private WrkMastService wrkMastService; @Resource private WrkMastLogService wrkMastLogService; @Resource private SchedulerProperties schedulerProperties; /** * 记录上次处理的任务ID,用于轮询处理 * 确保每次处理不同的任务,避免一直处理同一个任务 */ private Long lastProcessedTaskId = null; /** * 记录上次分配站点的任务ID,用于轮询处理 */ private Long lastAllocatedTaskId = null; /** * 分配站点任务执行标志,确保同一时间只有一个线程在执行分配站点循环 */ private final AtomicBoolean isAllocateSite = new AtomicBoolean(false); /** * 呼叫AGV定时任务执行标志,确保同一时间只有一个线程在执行分配站点循环 */ private final AtomicBoolean iscallAgv = new AtomicBoolean(false); /** * agv工作档分配站点定时任务 * 查询状态7(待呼叫AGV)但没有分配站点的任务,为其分配可用站点 * 只负责分配站点,不呼叫AGV * 每次只处理一个任务,避免高并发执行 * 使用AtomicBoolean确保单线程执行循环 */ @Scheduled(cron = "0/5 * * * * ? ") private void allocateSite() { if (!schedulerProperties.isEnabled()) { log.debug("定时任务allocateSite:调度器未启用,跳过执行"); return; } if (!isAllocateSite.compareAndSet(false, true)) { log.debug("定时任务allocateSite:上一次分配站点任务还在执行中,跳过本次执行"); return; } // 构建查询条件:查询所有待呼叫AGV但没有分配站点的任务 EntityWrapper wrapper = new EntityWrapper(); wrapper.eq("wrk_sts", 7); // 待呼叫AGV状态 wrapper.eq("task_type", "agv"); // AGV任务类型 wrapper.eq("is_deleted", 0); // 排除已删除的任务 wrapper.andNew() .isNull("sta_no") .or() .eq("sta_no", "") .or() .eq("sta_no", "0"); wrapper.orderBy("id", true); // 按id升序排序 wrapper.last("OFFSET 0 ROWS FETCH NEXT 21 ROWS ONLY"); List taskList = taskService.selectList(wrapper); if (taskList.isEmpty()) { log.debug("定时任务allocateSite:没有待分配站点的任务(wrk_sts=7,task_type=agv,sta_no为空)"); isAllocateSite.set(false); return; } try { for (Task task : taskList) { String errorMsg = agvHandler.allocateSiteForTask(task); // 调用分配站点逻辑 String displayTaskId = (task.getWrkNo() != null) ? String.valueOf(task.getWrkNo()) : String.valueOf(task.getId()); log.info("定时任务allocateSite:开始为任务ID:{}分配站点(wrk_no={},ioType={})", displayTaskId, task.getWrkNo(), task.getIoType()); // 检查是否成功分配了站点 String staNo = task.getStaNo(); if (errorMsg == null && staNo != null && !staNo.isEmpty() && !staNo.equals("0")) { // 分配站点成功 lastAllocatedTaskId = task.getId(); log.info("定时任务allocateSite:任务ID:{}成功分配站点:{},更新lastAllocatedTaskId为{}", displayTaskId, staNo, lastAllocatedTaskId); } else { // 无法分配站点,不更新lastAllocatedTaskId,下次会重新尝试 log.info("定时任务allocateSite:任务ID:{}无法分配站点:{},不更新lastAllocatedTaskId(当前:{}),下次将重新尝试", displayTaskId, errorMsg != null ? errorMsg : "所有站点都被占用", lastAllocatedTaskId); } // 每个任务处理完后等待1秒, try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("定时任务allocateSite:延迟被中断", e); break; // 如果被中断,退出循环 } } } finally { // 确保标志位被重置,即使发生异常也能释放锁 isAllocateSite.set(false); } } /** * 呼叫AGV定时任务 * 查询状态7(待呼叫AGV)且已分配站点的任务,呼叫AGV * 呼叫成功后,状态从7(待呼叫AGV)变为8(正在搬运) */ @Scheduled(cron = "0/5 * * * * ? ") private void callAgv() { if (!schedulerProperties.isEnabled()) { log.debug("呼叫AGV定时任务:调度器未启用,跳过执行"); return; } if (!iscallAgv.compareAndSet(false, true)) { log.debug("呼叫AGV定时任务:上一次分配站点任务还在执行中,跳过本次执行"); return; } try { // 构建查询条件:查询状态7(待呼叫AGV)且已分配站点的任务 EntityWrapper wrapper = new EntityWrapper(); wrapper.eq("wrk_sts", 7); // 待呼叫AGV状态 wrapper.eq("task_type", "agv"); // AGV任务类型 wrapper.eq("is_deleted", 0); // 排除已删除的任务 wrapper.isNotNull("sta_no"); // 必须有站点分配 wrapper.ne("sta_no", ""); // 站点不能为空字符串 wrapper.ne("sta_no", "0"); // 站点不能为0 wrapper.orderBy("id", true); // 按id升序排序 wrapper.last("OFFSET 0 ROWS FETCH NEXT 22 ROWS ONLY"); // 如果上次处理过任务,从下一个任务开始查询(轮询) // if (lastProcessedTaskId != null) { // wrapper.gt("id", lastProcessedTaskId); // } // 查询待呼叫agv任务 List taskList = taskService.selectList(wrapper); if (taskList.isEmpty()) { log.debug("呼叫AGV定时任务:没有待呼叫AGV的任务(wrk_sts=7,task_type=agv,sta_no不为空)"); iscallAgv.set(false); return; } for (Task task : taskList) { // 调用处理逻辑:呼叫AGV,成功后状态从7变为8 String displayTaskId = (task.getWrkNo() != null) ? String.valueOf(task.getWrkNo()) : String.valueOf(task.getId()); log.info("呼叫AGV定时任务:开始处理任务ID:{}(wrk_no={},ioType={},sta_no={})", displayTaskId, task.getWrkNo(), task.getIoType(), task.getStaNo()); boolean processed = agvHandler.callAgv(Collections.singletonList(task)); // 只有当任务成功处理(成功呼叫AGV,状态从7变为8)时,才更新lastProcessedTaskId // 如果任务被跳过(站点被占用等),不更新lastProcessedTaskId,下次会重新尝试 if (processed) { lastProcessedTaskId = task.getId(); log.info("定时任务callAgv:任务ID:{}成功呼叫AGV,状态已从7变为8,更新lastProcessedTaskId为{},下次将处理下一个任务", displayTaskId, lastProcessedTaskId); } else { log.info("定时任务callAgv:任务ID:{}被跳过,不更新lastProcessedTaskId(当前:{}),下次将重新尝试处理此任务", displayTaskId, lastProcessedTaskId); } // 每个任务处理完后等待1秒, try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("呼叫AGV定时任务:延迟被中断", e); break; // 如果被中断,退出循环 } } } finally { // 确保标志位被重置,即使发生异常也能释放锁 iscallAgv.set(false); } } /** * 货物到达出库口,生成agv任务 */ @Scheduled(cron = "0/3 * * * * ? ") private void createAgvOutTasks() { if (!schedulerProperties.isEnabled()) { return; } // 获取呼叫agv配置 List configs = configService.selectList(new EntityWrapper().in("code", "eastCallAgvControl", "westCallAgvControl").eq("status", 1)); if (configs.isEmpty()) { return; } // 获取agv出库可用站点 List sites = new ArrayList<>(); for (Config config : configs) { String value = config.getValue(); if (Cools.isEmpty(value)) { continue; } String[] split = value.split(";"); sites.addAll(Arrays.asList(split)); } if (sites.isEmpty()) { return; } agvHandler.createAgvOutTasks(sites); } /** * 任务完成转历史 */ @Scheduled(cron = "0/10 * * * * ? ") private void moveTaskToHistory() { if (!schedulerProperties.isEnabled()) { return; } List taskList = taskService.selectList(new EntityWrapper().eq("wrk_sts", 9).eq("is_deleted", 0)); if (taskList.isEmpty()) { return; } agvHandler.moveTaskToHistory(taskList); } /** * 检查入库成功的任务,完结对应的AGV呼叫单 * 如果入库任务呼叫AGV后没有收到回调,但工作档已经入库成功,则完结AGV呼叫单 */ @Scheduled(cron = "0/10 * * * * ? ") private void checkInboundCompletedTasks() { if (!schedulerProperties.isEnabled()) { return; } try { // 查询入库成功的工作档(状态4:入库完成,入库类型:1,10,53,57) List completedWrkMasts = wrkMastService.selectList( new EntityWrapper() .eq("wrk_sts", 4L) // 入库完成 .in("io_type", 1, 10, 53, 57) // 入库类型 .isNotNull("wrk_no") ); if (completedWrkMasts.isEmpty()) { return; } Date now = new Date(); int completedCount = 0; List completedTasks = new ArrayList<>(); for (WrkMast wrkMast : completedWrkMasts) { // 查找对应的AGV任务(优先通过wrk_no查询) Wrapper taskWrapper1 = new EntityWrapper() .eq("task_type", "agv") .eq("wrk_sts", 8L) // 已呼叫AGV状态 .eq("wrk_no", wrkMast.getWrkNo()) .eq("is_deleted", 0); // 排除已删除的任务 List agvTasks = taskService.selectList(taskWrapper1); // 如果通过wrk_no没找到,且有条码,则通过条码查询 if (agvTasks.isEmpty() && !Cools.isEmpty(wrkMast.getBarcode())) { Wrapper taskWrapper2 = new EntityWrapper() .eq("task_type", "agv") .eq("wrk_sts", 8L) .eq("barcode", wrkMast.getBarcode()) .eq("is_deleted", 0); // 排除已删除的任务 agvTasks = taskService.selectList(taskWrapper2); } for (Task agvTask : agvTasks) { // 确保是入库任务 if (agvTask.getIoType() != null && (agvTask.getIoType() == 1 || agvTask.getIoType() == 10 || agvTask.getIoType() == 53 || agvTask.getIoType() == 57)) { // 更新AGV任务状态为完成 agvTask.setWrkSts(9L); agvTask.setModiTime(now); if (taskService.updateById(agvTask)) { completedTasks.add(agvTask); completedCount++; // taskId使用工作号(wrk_no),如果工作号为空则使用任务ID String displayTaskId = (agvTask.getWrkNo() != null) ? String.valueOf(agvTask.getWrkNo()) : String.valueOf(agvTask.getId()); log.info("入库任务工作档已入库成功,完结AGV呼叫单,taskId:{},wrkNo:{},barcode:{}", displayTaskId, wrkMast.getWrkNo(), wrkMast.getBarcode()); } } } } // 立即将完成的AGV任务转移到历史表,不保留在Task表中 if (!completedTasks.isEmpty()) { try { agvHandler.moveTaskToHistory(completedTasks); log.info("入库完成,已将{}个AGV任务转移到历史表(不保留在Task表中)", completedTasks.size()); } catch (Exception e) { log.error("入库完成,转移AGV任务到历史表失败", e); } } if (completedCount > 0) { log.info("本次检查完结了{}个入库AGV呼叫单", completedCount); } } catch (Exception e) { log.error("检查入库成功任务并完结AGV呼叫单异常", e); } } /** * 检查并修复异常状态的AGV任务:正在搬运但没有分配站点 * 这种情况可能是数据异常或并发问题导致的 */ @Scheduled(cron = "0/30 * * * * ? ") private void checkAbnormalTasksWithoutSite() { if (!schedulerProperties.isEnabled()) { return; } try { // 查询状态为8(正在搬运)但没有分配站点的任务 List abnormalTasks = taskService.selectList( new EntityWrapper() .eq("task_type", "agv") .eq("wrk_sts", 8L) // 正在搬运 .eq("is_deleted", 0) // 排除已删除的任务 .andNew() .isNull("sta_no") .or() .eq("sta_no", "") .or() .eq("sta_no", "0") ); if (abnormalTasks.isEmpty()) { return; } log.warn("检测到{}个异常状态的AGV任务:正在搬运但没有分配站点,开始修复", abnormalTasks.size()); Date now = new Date(); int fixedCount = 0; int completedCount = 0; for (Task task : abnormalTasks) { String displayTaskId = (task.getWrkNo() != null) ? String.valueOf(task.getWrkNo()) : String.valueOf(task.getId()); // 检查工作档和历史档状态 WrkMast wrkMast = null; WrkMastLog wrkMastLog = null; if (task.getWrkNo() != null) { wrkMast = wrkMastService.selectOne( new EntityWrapper().eq("wrk_no", task.getWrkNo()) ); wrkMastLog = wrkMastLogService.selectOne( new EntityWrapper().eq("wrk_no", task.getWrkNo()) ); } // 如果工作档已完成或已转历史档,直接结束任务 boolean shouldComplete = false; String reason = ""; if (wrkMastLog != null) { shouldComplete = true; reason = "工作档已转历史档"; } else if (wrkMast != null) { Long wrkSts = wrkMast.getWrkSts(); Integer ioType = task.getIoType(); if (wrkSts != null && ioType != null) { // 入库任务:状态4或5 if ((ioType == 1 || ioType == 10 || ioType == 53 || ioType == 57) && (wrkSts == 4L || wrkSts == 5L)) { shouldComplete = true; reason = String.format("工作档已完成(入库),状态:%d", wrkSts); } // 出库任务:状态14或15 else if ((ioType == 101 || ioType == 110 || ioType == 103 || ioType == 107) && (wrkSts == 14L || wrkSts == 15L)) { shouldComplete = true; reason = String.format("工作档已完成(出库),状态:%d", wrkSts); } } } if (shouldComplete) { // 工作档已完成,直接结束任务 task.setWrkSts(9L); task.setModiTime(now); if (taskService.updateById(task)) { try { agvHandler.moveTaskToHistory(Collections.singletonList(task)); completedCount++; log.info("修复异常任务:{},{},已结束任务并转移到历史表,taskId:{}", reason, displayTaskId, displayTaskId); } catch (Exception e) { log.error("修复异常任务:转移任务到历史表失败,taskId:{}", displayTaskId, e); } } }/* else { // 工作档未完成,尝试分配站点或重置状态 // 先尝试分配站点 String errorMsg = agvHandler.allocateSiteForTask(task); if (errorMsg == null && task.getStaNo() != null && !task.getStaNo().isEmpty() && !task.getStaNo().equals("0")) { // 分配站点成功 fixedCount++; log.info("修复异常任务:已为任务分配站点,taskId:{},站点:{}", displayTaskId, task.getStaNo()); } else { // 无法分配站点,重置状态为7(待呼叫AGV),等待下次分配 task.setWrkSts(7L); task.setModiTime(now); if (taskService.updateById(task)) { fixedCount++; log.warn("修复异常任务:无法分配站点,重置状态为7(待呼叫AGV),taskId:{},原因:{}", displayTaskId, errorMsg != null ? errorMsg : "所有站点都被占用"); } } }*/ } if (fixedCount > 0 || completedCount > 0) { log.info("修复异常任务完成:修复了{}个任务,结束了{}个已完成工作档的任务", fixedCount, completedCount); } } catch (Exception e) { log.error("检查并修复异常状态的AGV任务异常", e); } } /** * 检查AGV任务对应的工作档是否已完成或已转历史档并完结 * 处理被跳过的AGV任务: * 1. 如果工作档已完成(wrk_sts=4,5,14,15),则完结AGV任务 * 2. 如果工作档进入历史档,立即结束AGV任务 * 3. 如果入库成功,也结束掉搬运任务(已在checkInboundCompletedTasks中实现) */ @Scheduled(cron = "0/10 * * * * ? ") private void checkCompletedTasksInHistory() { if (!schedulerProperties.isEnabled()) { return; } try { // 查询状态为8(已呼叫AGV)的AGV任务 List agvTasks = taskService.selectList( new EntityWrapper() .eq("task_type", "agv") .eq("wrk_sts", 8L) // 已呼叫AGV状态 .eq("is_deleted", 0) // 排除已删除的任务 ); if (agvTasks.isEmpty()) { return; } Date now = new Date(); int completedCount = 0; List completedTasks = new ArrayList<>(); for (Task agvTask : agvTasks) { boolean isCompleted = false; String reason = ""; // 检查工作档是否存在 WrkMast wrkMast = null; if (agvTask.getWrkNo() != null) { wrkMast = wrkMastService.selectOne( new EntityWrapper().eq("wrk_no", agvTask.getWrkNo()) ); } // 检查历史档是否存在(无论工作档是否存在都需要检查) WrkMastLog wrkMastLog = null; // 优先通过wrk_no查询历史档 if (agvTask.getWrkNo() != null) { wrkMastLog = wrkMastLogService.selectOne( new EntityWrapper().eq("wrk_no", agvTask.getWrkNo()) ); } // 如果通过wrk_no没找到,且有条码,则通过条码查询 if (wrkMastLog == null && !Cools.isEmpty(agvTask.getBarcode())) { List logList = wrkMastLogService.selectList( new EntityWrapper().eq("barcode", agvTask.getBarcode()) ); if (!logList.isEmpty()) { wrkMastLog = logList.get(0); // 取第一个 } } // 如果工作档存在,检查是否已完成 if (wrkMast != null) { Long wrkSts = wrkMast.getWrkSts(); Integer ioType = agvTask.getIoType(); if (wrkSts != null && ioType != null) { // 入库任务:状态4(入库完成)或5(库存更新完成) if ((ioType == 1 || ioType == 10 || ioType == 53 || ioType == 57) && (wrkSts == 4L || wrkSts == 5L)) { isCompleted = true; reason = String.format("工作档已完成,状态:%d", wrkSts); } // 出库任务:状态14(已出库未确认)或15(出库更新完成) else if ((ioType == 101 || ioType == 110 || ioType == 103 || ioType == 107) && (wrkSts == 14L || wrkSts == 15L)) { isCompleted = true; reason = String.format("工作档已完成,状态:%d", wrkSts); } } } // 1. 如果工作档进入历史档,立即结束AGV任务(只要历史档存在就结束) if (!isCompleted && wrkMastLog != null) { isCompleted = true; reason = String.format("工作档已转历史档,立即结束AGV任务,历史档状态:%d", wrkMastLog.getWrkSts()); } // 如果已完成,更新AGV任务状态并收集到列表 if (isCompleted) { agvTask.setWrkSts(9L); agvTask.setModiTime(now); if (taskService.updateById(agvTask)) { completedTasks.add(agvTask); completedCount++; // taskId使用工作号(wrk_no),如果工作号为空则使用任务ID String displayTaskId = (agvTask.getWrkNo() != null) ? String.valueOf(agvTask.getWrkNo()) : String.valueOf(agvTask.getId()); log.info("{},完结AGV呼叫单,taskId:{},wrkNo:{},barcode:{},站点:{}", reason, displayTaskId, agvTask.getWrkNo(), agvTask.getBarcode(), agvTask.getStaNo()); } } } // 立即将完成的AGV任务转移到历史表,不保留在Task表中 if (!completedTasks.isEmpty()) { try { agvHandler.moveTaskToHistory(completedTasks); log.info("入库/出库完成,已将{}个AGV任务转移到历史表(不保留在Task表中)", completedTasks.size()); } catch (Exception e) { log.error("入库/出库完成,转移AGV任务到历史表失败", e); } } if (completedCount > 0) { log.info("本次检查完结了{}个AGV呼叫单(工作档已完成或已转历史档)", completedCount); } } catch (Exception e) { log.error("检查工作档已完成或历史档完结任务并完结AGV呼叫单异常", e); } } }