""" 任务分配算法 """ import time import random import logging from typing import Dict, List, Tuple, Optional, Set, Any from collections import defaultdict from abc import ABC, abstractmethod from common.data_models import TaskData, AGVStatus, TaskAssignment, BackpackData from algorithm_system.models.agv_model import AGVModel, AGVModelManager from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance from dataclasses import dataclass class TaskAllocation(ABC): """任务分配算法基类""" def __init__(self, agv_manager: AGVModelManager): """ 初始化任务分配算法 Args: agv_manager: AGV模型管理器 """ self.agv_manager = agv_manager self.logger = logging.getLogger(__name__) @abstractmethod def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]: """ 分配任务给AGV Args: tasks: 待分配的任务列表 Returns: List[TaskAssignment]: 分配结果列表 """ pass def find_available_backpack_slot(self, agv_status: AGVStatus) -> Optional[int]: """ 查找AGV的可用背篓位置 Args: agv_status: AGV状态信息 Returns: Optional[int]: 可用的背篓位置编号,如果没有可用位置则返回None """ if not agv_status.backpack: # 如果没有背篓信息,假设从第一个位置开始 self.logger.warning(f"AGV {agv_status.agvId} 没有背篓信息,分配到位置0") return 0 # 查找空闲且未执行任务的背篓位置 for backpack_item in agv_status.backpack: if not backpack_item.loaded and not backpack_item.execute and not backpack_item.taskId: self.logger.debug(f"AGV {agv_status.agvId} 找到可用背篓位置: {backpack_item.index}") return backpack_item.index # 如果所有位置都被占用,返回None self.logger.debug(f"AGV {agv_status.agvId} 没有可用的背篓位置") return None def get_agv_available_capacity(self, agv_status: AGVStatus) -> int: """ 获取AGV的可用背篓容量 Args: agv_status: AGV状态信息 Returns: int: 可用背篓数量 """ if not agv_status.backpack: return 1 # 假设至少有一个背篓位置 available_count = 0 for backpack_item in agv_status.backpack: if not backpack_item.loaded and not backpack_item.execute and not backpack_item.taskId: available_count += 1 return available_count def assign_task_with_backpack(self, agv_model, task: TaskData, lev_id: int) -> bool: """ 将任务分配给AGV的指定背篓位置 Args: agv_model: AGV模型 task: 任务数据 lev_id: 背篓位置编号 Returns: bool: 分配是否成功 """ try: # 使用AGV模型的assign_task方法 success = agv_model.assign_task( task_id=task.taskId, priority=task.priority, start_code=task.start, end_code=task.end ) if success: self.logger.info(f"任务 {task.taskId} 成功分配给AGV {agv_model.agvId} 的背篓位置 {lev_id}") return True else: self.logger.warning(f"任务 {task.taskId} 分配给AGV {agv_model.agvId} 失败") return False except Exception as e: self.logger.error(f"分配任务时发生异常: {e}") return False class NearestFirstAllocation(TaskAllocation): """最近优先分配算法""" def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]: """ 使用最近优先策略分配任务 Args: tasks: 待分配的任务列表 Returns: List[TaskAssignment]: 分配结果列表 """ if not tasks: return [] # 获取可用的AGV available_agvs = self.agv_manager.get_available_agvs() if not available_agvs: self.logger.warning("没有可用的AGV进行任务分配") return [] # 1. 首先检查任务是否已经分配,避免重复分配 already_assigned_tasks = set() all_agvs = self.agv_manager.get_all_agvs() for agv in all_agvs: if agv.backpack: for backpack_item in agv.backpack: if backpack_item.taskId: already_assigned_tasks.add(backpack_item.taskId) self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配") # 2. 过滤掉已分配的任务 unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks] if not unassigned_tasks: self.logger.info("所有任务都已分配,无需重新分配") return [] self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}") assignments = [] path_mapping = self.agv_manager.path_mapping # 对每个任务找到最近的AGV for task in unassigned_tasks: if not available_agvs: break # 获取任务起点坐标 task_start_coord = get_coordinate_from_path_id(task.start, path_mapping) if not task_start_coord: self.logger.warning(f"无法获取任务 {task.taskId} 起点 {task.start} 的坐标") continue # 找到距离最近的AGV nearest_agv = None min_distance = float('inf') for agv in available_agvs: if agv.coordinates: distance = calculate_manhattan_distance(agv.coordinates, task_start_coord) # 如果AGV已有任务,计算完成当前任务后到新任务起点的距离 if agv.current_task_count > 0: # 简化:假设AGV需要额外时间完成当前任务 distance += agv.current_task_count * 10 if distance < min_distance: min_distance = distance nearest_agv = agv if nearest_agv and nearest_agv.can_accept_task(task.priority): # 获取AGV的原始状态数据来查找可用背篓位置 agv_status = None for agv_state in self.agv_manager.get_all_agv_status(): if agv_state.agvId == nearest_agv.agvId: agv_status = agv_state break if agv_status: # 查找可用的背篓位置 available_lev_id = self.find_available_backpack_slot(agv_status) if available_lev_id is not None: # 分配任务到指定背篓位置 success = self.assign_task_with_backpack(nearest_agv, task, available_lev_id) if success: assignments.append(TaskAssignment( taskId=task.taskId, agvId=nearest_agv.agvId, lev_id=available_lev_id )) self.logger.info(f"任务 {task.taskId} 分配给最近的AGV {nearest_agv.agvId},背篓位置: {available_lev_id},距离: {min_distance}") # 检查AGV是否还有可用背篓位置 remaining_capacity = self.get_agv_available_capacity(agv_status) - 1 if remaining_capacity <= 0: available_agvs.remove(nearest_agv) else: self.logger.warning(f"任务 {task.taskId} 分配给AGV {nearest_agv.agvId} 失败") else: self.logger.warning(f"AGV {nearest_agv.agvId} 没有可用的背篓位置") available_agvs.remove(nearest_agv) else: self.logger.warning(f"无法获取AGV {nearest_agv.agvId} 的状态信息") return assignments class LoadBalancedAllocation(TaskAllocation): """负载均衡分配算法""" def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]: """ 使用负载均衡策略分配任务 Args: tasks: 待分配的任务列表 Returns: List[TaskAssignment]: 分配结果列表 """ if not tasks: return [] # 获取所有AGV all_agvs = self.agv_manager.get_all_agvs() if not all_agvs: self.logger.warning("没有AGV进行任务分配") return [] # 1. 首先检查任务是否已经分配,避免重复分配 already_assigned_tasks = set() for agv in all_agvs: if agv.backpack: for backpack_item in agv.backpack: if backpack_item.taskId: already_assigned_tasks.add(backpack_item.taskId) self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配") # 2. 过滤掉已分配的任务 unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks] if not unassigned_tasks: self.logger.info("所有任务都已分配,无需重新分配") return [] self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}") assignments = [] path_mapping = self.agv_manager.path_mapping # 按优先级排序任务 sorted_tasks = sorted(unassigned_tasks, key=lambda t: t.priority, reverse=True) # 对每个任务分配给负载最低的AGV for task in sorted_tasks: # 获取任务起点坐标 task_start_coord = get_coordinate_from_path_id(task.start, path_mapping) if not task_start_coord: self.logger.warning(f"无法获取任务 {task.taskId} 起点 {task.start} 的坐标") continue # 按负载和距离排序AGV agv_scores = [] for agv in all_agvs: if not agv.can_accept_task(task.priority): continue # 计算负载得分(负载越低得分越高) load_score = 1.0 - agv.get_workload_ratio() # 计算距离得分(距离越近得分越高) distance_score = 0.0 if agv.coordinates and task_start_coord: distance = calculate_manhattan_distance(agv.coordinates, task_start_coord) distance_score = 1.0 / (1.0 + distance / 100.0) # 归一化距离得分 # 计算效率得分 efficiency_score = agv.calculate_efficiency_score(path_mapping) # 综合得分 total_score = 0.4 * load_score + 0.3 * distance_score + 0.3 * efficiency_score agv_scores.append((agv, total_score)) if not agv_scores: # 详细分析为什么没有AGV可以接受任务 total_agvs = len(all_agvs) busy_count = 0 low_battery_count = 0 overloaded_count = 0 status_invalid_count = 0 for agv in all_agvs: if agv.is_overloaded(): overloaded_count += 1 elif str(agv.status) not in ["0", "1", "2"]: status_invalid_count += 1 elif agv.need_charging(): low_battery_count += 1 else: busy_count += 1 self.logger.warning(f"没有AGV可以接受任务 {task.taskId} - 详细分析:") self.logger.warning(f" 总AGV数: {total_agvs}") self.logger.warning(f" 任务满载: {overloaded_count}") self.logger.warning(f" 状态异常: {status_invalid_count}") self.logger.warning(f" 电量过低: {low_battery_count}") self.logger.warning(f" 其他原因: {busy_count}") self.logger.warning(f" 任务优先级: {task.priority}") continue # 选择得分最高的AGV agv_scores.sort(key=lambda x: x[1], reverse=True) best_agv = agv_scores[0][0] # 获取AGV的原始状态数据来查找可用背篓位置 agv_status = self.agv_manager.get_agv_status(best_agv.agvId) if agv_status: # 查找可用的背篓位置 available_lev_id = self.find_available_backpack_slot(agv_status) if available_lev_id is not None: # 分配任务到指定背篓位置 success = self.assign_task_with_backpack(best_agv, task, available_lev_id) if success: assignments.append(TaskAssignment( taskId=task.taskId, agvId=best_agv.agvId, lev_id=available_lev_id )) self.logger.info(f"任务 {task.taskId} 分配给负载均衡的AGV {best_agv.agvId},背篓位置: {available_lev_id},得分: {agv_scores[0][1]:.3f}") else: self.logger.warning(f"任务 {task.taskId} 分配给AGV {best_agv.agvId} 失败") else: self.logger.warning(f"AGV {best_agv.agvId} 没有可用的背篓位置") else: self.logger.warning(f"无法获取AGV {best_agv.agvId} 的状态信息") return assignments class PriorityFirstAllocation(TaskAllocation): """优先级优先分配算法""" def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]: """ 使用优先级优先策略分配任务 Args: tasks: 待分配的任务列表 Returns: List[TaskAssignment]: 分配结果列表 """ if not tasks: return [] # 获取可用的AGV available_agvs = self.agv_manager.get_available_agvs() if not available_agvs: self.logger.warning("没有可用的AGV进行任务分配") return [] # 1. 首先检查任务是否已经分配,避免重复分配 already_assigned_tasks = set() all_agvs = self.agv_manager.get_all_agvs() for agv in all_agvs: if agv.backpack: for backpack_item in agv.backpack: if backpack_item.taskId: already_assigned_tasks.add(backpack_item.taskId) self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配") # 2. 过滤掉已分配的任务 unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks] if not unassigned_tasks: self.logger.info("所有任务都已分配,无需重新分配") return [] self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}") # 按优先级排序任务(高优先级在前) sorted_tasks = sorted(unassigned_tasks, key=lambda t: t.priority, reverse=True) assignments = [] path_mapping = self.agv_manager.path_mapping # 优先分配高优先级任务 for task in sorted_tasks: if not available_agvs: break # 获取任务起点坐标 task_start_coord = get_coordinate_from_path_id(task.start, path_mapping) if not task_start_coord: self.logger.warning(f"无法获取任务 {task.taskId} 起点 {task.start} 的坐标") continue # 为高优先级任务选择最佳AGV best_agv = None best_score = -1 for agv in available_agvs: if not agv.can_accept_task(task.priority): continue # 计算综合得分 distance_score = 0.0 if agv.coordinates and task_start_coord: distance = calculate_manhattan_distance(agv.coordinates, task_start_coord) distance_score = 1.0 / (1.0 + distance / 50.0) efficiency_score = agv.calculate_efficiency_score(path_mapping) capacity_score = agv.get_task_capacity() / agv.max_capacity # 高优先级任务更注重效率和距离 total_score = 0.5 * distance_score + 0.3 * efficiency_score + 0.2 * capacity_score if total_score > best_score: best_score = total_score best_agv = agv if best_agv: # 获取AGV的原始状态数据来查找可用背篓位置 agv_status = self.agv_manager.get_agv_status(best_agv.agvId) if agv_status: # 查找可用的背篓位置 available_lev_id = self.find_available_backpack_slot(agv_status) if available_lev_id is not None: # 分配任务到指定背篓位置 success = self.assign_task_with_backpack(best_agv, task, available_lev_id) if success: assignments.append(TaskAssignment( taskId=task.taskId, agvId=best_agv.agvId, lev_id=available_lev_id )) self.logger.info(f"高优先级任务 {task.taskId} (优先级: {task.priority}) 分配给AGV {best_agv.agvId},背篓位置: {available_lev_id}") # 检查AGV是否还有可用背篓位置 remaining_capacity = self.get_agv_available_capacity(agv_status) - 1 if remaining_capacity <= 0: available_agvs.remove(best_agv) else: self.logger.warning(f"任务 {task.taskId} 分配给AGV {best_agv.agvId} 失败") else: self.logger.warning(f"AGV {best_agv.agvId} 没有可用的背篓位置") available_agvs.remove(best_agv) else: self.logger.warning(f"无法获取AGV {best_agv.agvId} 的状态信息") return assignments class MultiObjectiveAllocation(TaskAllocation): """多目标优化分配算法""" def __init__(self, agv_manager: AGVModelManager, distance_weight: float = 0.4, load_weight: float = 0.3, efficiency_weight: float = 0.3): """ 初始化多目标优化分配算法 Args: agv_manager: AGV模型管理器 distance_weight: 距离权重 load_weight: 负载权重 efficiency_weight: 效率权重 """ super().__init__(agv_manager) self.distance_weight = distance_weight self.load_weight = load_weight self.efficiency_weight = efficiency_weight def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]: """ 使用多目标优化策略分配任务 Args: tasks: 待分配的任务列表 Returns: List[TaskAssignment]: 分配结果列表 """ if not tasks: return [] # 获取所有AGV all_agvs = self.agv_manager.get_all_agvs() if not all_agvs: self.logger.warning("没有AGV进行任务分配") return [] # 1. 首先检查任务是否已经分配,避免重复分配 already_assigned_tasks = set() for agv in all_agvs: if agv.backpack: for backpack_item in agv.backpack: if backpack_item.taskId: already_assigned_tasks.add(backpack_item.taskId) self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配") # 2. 过滤掉已分配的任务 unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks] if not unassigned_tasks: self.logger.info("所有任务都已分配,无需重新分配") return [] self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}") assignments = [] path_mapping = self.agv_manager.path_mapping # 对每个任务-AGV对计算得分 task_agv_scores = {} for task in unassigned_tasks: task_start_coord = get_coordinate_from_path_id(task.start, path_mapping) if not task_start_coord: continue for agv in all_agvs: if not agv.can_accept_task(task.priority): continue # 距离得分 distance_score = 0.0 if agv.coordinates: distance = calculate_manhattan_distance(agv.coordinates, task_start_coord) distance_score = 1.0 / (1.0 + distance / 100.0) # 负载得分 load_score = 1.0 - agv.get_workload_ratio() # 效率得分 efficiency_score = agv.calculate_efficiency_score(path_mapping) # 计算综合得分 total_score = ( self.distance_weight * distance_score + self.load_weight * load_score + self.efficiency_weight * efficiency_score ) task_agv_scores[(task.taskId, agv.agvId)] = total_score # 使用贪心算法进行匹配 assignments = self._greedy_matching(unassigned_tasks, all_agvs, task_agv_scores) return assignments def _greedy_matching(self, tasks: List[TaskData], agvs: List[AGVModel], scores: Dict[Tuple[str, str], float]) -> List[TaskAssignment]: """ 使用贪心算法进行任务-AGV匹配 Args: tasks: 任务列表 agvs: AGV列表 scores: 任务-AGV对的得分 Returns: List[TaskAssignment]: 分配结果 """ assignments = [] remaining_tasks = [task.taskId for task in tasks] # 重复分配直到没有任务或没有可用AGV while remaining_tasks: # 找到得分最高的任务-AGV对 best_score = -1 best_task_id = None best_agv = None for task_id in remaining_tasks: for agv in agvs: if agv.is_overloaded(): continue score = scores.get((task_id, agv.agvId), 0.0) if score > best_score: best_score = score best_task_id = task_id best_agv = agv if best_task_id and best_agv: # 获取AGV的原始状态数据来查找可用背篓位置 agv_status = self.agv_manager.get_agv_status(best_agv.agvId) if agv_status: # 查找可用的背篓位置 available_lev_id = self.find_available_backpack_slot(agv_status) if available_lev_id is not None: # 找到对应的任务对象 task = next((t for t in tasks if t.taskId == best_task_id), None) if task: # 分配任务到指定背篓位置 success = self.assign_task_with_backpack(best_agv, task, available_lev_id) if success: assignments.append(TaskAssignment( taskId=best_task_id, agvId=best_agv.agvId, lev_id=available_lev_id )) remaining_tasks.remove(best_task_id) self.logger.info(f"多目标优化:任务 {best_task_id} 分配给AGV {best_agv.agvId},背篓位置: {available_lev_id},得分: {best_score:.3f}") else: self.logger.warning(f"任务 {best_task_id} 分配给AGV {best_agv.agvId} 失败") break else: self.logger.error(f"找不到任务 {best_task_id} 的详细信息") break else: self.logger.debug(f"AGV {best_agv.agvId} 没有可用的背篓位置,跳过") break else: self.logger.warning(f"无法获取AGV {best_agv.agvId} 的状态信息") break else: break return assignments class TaskAllocationFactory: """任务分配算法工厂类""" @staticmethod def create_allocator(algorithm_type: str, agv_manager: AGVModelManager) -> TaskAllocation: """ 创建任务分配算法 Args: algorithm_type: 算法类型 agv_manager: AGV模型管理器 Returns: TaskAllocation: 任务分配算法对象 """ if algorithm_type == "NEAREST_FIRST": return NearestFirstAllocation(agv_manager) elif algorithm_type == "LOAD_BALANCED": return LoadBalancedAllocation(agv_manager) elif algorithm_type == "PRIORITY_FIRST": return PriorityFirstAllocation(agv_manager) elif algorithm_type == "MULTI_OBJECTIVE": return MultiObjectiveAllocation(agv_manager) else: # 默认使用负载均衡算法 return LoadBalancedAllocation(agv_manager)