""" AGV模型 - 用于算法系统的AGV数据建模 """ import time import logging from typing import Dict, List, Optional, Tuple, Set from dataclasses import dataclass, field from enum import Enum from common.data_models import AGVStatus, BackpackData from common.utils import get_coordinate_from_path_id, calculate_manhattan_distance class AGVTaskStatus(Enum): """AGV任务状态""" IDLE = "IDLE" # 空闲 ASSIGNED = "ASSIGNED" # 已分配任务但未开始 EXECUTING = "EXECUTING" # 执行中 COMPLETED = "COMPLETED" # 已完成 @dataclass class TaskAssignment: """任务分配信息""" task_id: str assigned_time: float priority: int status: AGVTaskStatus = AGVTaskStatus.ASSIGNED start_code: Optional[str] = None end_code: Optional[str] = None estimated_duration: Optional[float] = None class AGVModel: """AGV模型类,用于算法计算""" def __init__(self, agv_id: str, path_mapping: Dict[str, Dict[str, int]]): """ 初始化AGV模型 Args: agv_id: AGV ID path_mapping: 路径点映射字典 """ self.agvId = agv_id self.path_mapping = path_mapping self.logger = logging.getLogger(__name__) # AGV状态信息 self.status: str = "0" # AGV状态码 self.mapCode: str = "" # 当前位置码 self.coordinates: Optional[Tuple[int, int]] = None # 当前坐标 self.backpack: Optional[List[BackpackData]] = None # 背包信息 # 充电相关 self.voltage: int = 100 # 当前电量百分比 self.autoCharge: int = 20 # 低电量设定阈值 self.lowVol: int = 10 # 最低电量阈值 # 任务相关 self.assigned_tasks: List[TaskAssignment] = [] self.current_task_count: int = 0 self.max_capacity: int = 5 # 最大任务容量 # 性能相关 self.efficiency_score: float = 1.0 # 效率得分 self.last_update_time: float = time.time() # 统计信息 self.total_completed_tasks: int = 0 self.total_distance_traveled: float = 0.0 self.average_completion_time: float = 0.0 def update_from_agv_status(self, agv_status: AGVStatus): """从AGV状态更新模型""" self.status = agv_status.status if hasattr(agv_status, 'position'): self.mapCode = agv_status.position elif hasattr(agv_status, 'mapCode'): self.mapCode = agv_status.mapCode else: self.mapCode = "" self.backpack = agv_status.backpack self.last_update_time = time.time() raw_voltage = getattr(agv_status, 'vol', 100) raw_auto_charge = getattr(agv_status, 'autoCharge', 20) raw_low_vol = getattr(agv_status, 'lowVol', 10) # 电量数据标准化处理 # 如果电压值大于100,可能是毫伏值,需要转换为百分比 if raw_voltage > 100: # 假设正常电压范围是3000-5000mV,转换为0-100% # 这里使用简单的线性映射 normalized_voltage = max(0, min(100, ((raw_voltage - 3000) / 2000) * 100)) self.logger.debug(f"AGV {self.agvId} 电压标准化: {raw_voltage}mV -> {normalized_voltage:.1f}%") self.voltage = int(normalized_voltage) else: self.voltage = raw_voltage # 阈值标准化处理 if raw_auto_charge > 100: # 如果阈值也是毫伏值,同样转换 self.autoCharge = max(0, min(100, ((raw_auto_charge - 3000) / 2000) * 100)) self.logger.debug(f"AGV {self.agvId} 自动充电阈值标准化: {raw_auto_charge}mV -> {self.autoCharge:.1f}%") else: self.autoCharge = raw_auto_charge if raw_low_vol > 100: # 如果最低电量阈值也是毫伏值,同样转换 self.lowVol = max(0, min(100, ((raw_low_vol - 3000) / 2000) * 100)) self.logger.debug(f"AGV {self.agvId} 最低电量阈值标准化: {raw_low_vol}mV -> {self.lowVol:.1f}%") else: self.lowVol = raw_low_vol # 更新坐标 if self.mapCode: self.coordinates = get_coordinate_from_path_id(self.mapCode, self.path_mapping) # 根据AGV状态更新任务计数 if self.backpack and self.backpack: self.current_task_count = len([bp for bp in self.backpack if bp.execute]) else: self.current_task_count = 0 def assign_task(self, task_id: str, priority: int = 5, start_code: str = "", end_code: str = "") -> bool: """ 分配任务给AGV Args: task_id: 任务ID priority: 任务优先级 start_code: 起始位置码 end_code: 结束位置码 Returns: bool: 是否分配成功 """ if self.is_overloaded(): self.logger.warning(f"AGV {self.agvId} 已满载,无法分配更多任务") return False # 创建任务分配记录 task_assignment = TaskAssignment( task_id=task_id, assigned_time=time.time(), priority=priority, start_code=start_code, end_code=end_code ) self.assigned_tasks.append(task_assignment) self.current_task_count += 1 self.logger.info(f"任务 {task_id} 已分配给AGV {self.agvId}") return True def complete_task(self, task_id: str) -> bool: """ 完成任务 Args: task_id: 任务ID Returns: bool: 是否完成成功 """ for task in self.assigned_tasks: if task.task_id == task_id: task.status = AGVTaskStatus.COMPLETED self.current_task_count = max(0, self.current_task_count - 1) self.total_completed_tasks += 1 self.logger.info(f"AGV {self.agvId} 完成任务 {task_id}") return True return False def can_accept_task(self, priority: int) -> bool: """ 检查是否可以接受新任务 Args: priority: 任务优先级 Returns: bool: 是否可以接受 """ # 检查容量 if self.is_overloaded(): self.logger.debug(f"AGV {self.agvId} 任务已满载({self.current_task_count}/{self.max_capacity}),拒绝新任务") return False # 检查AGV状态(兼容整数和字符串格式) # 状态 0(空闲), 1(忙碌), 2(充电) 可以接受任务 # 状态 3(故障), 4(维护) 不能接受任务 status_value = str(self.status) # 统一转换为字符串进行比较 if status_value not in ["0", "1", "2"]: self.logger.debug(f"AGV {self.agvId} 状态异常(status={status_value}),拒绝新任务") return False # 检查充电状态 - 如果电量过低必须充电,不能接受新任务 if self.need_charging(): self.logger.debug(f"AGV {self.agvId} 电量过低({self.voltage}% <= {self.lowVol}%),必须充电,拒绝新任务") return False # 如果是高优先级任务,即使低电量也可以接受 if priority >= 9: # 高优先级任务阈值 self.logger.debug(f"AGV {self.agvId} 接受高优先级任务(priority={priority})") return True # 对于普通优先级任务,如果电量低但不是必须充电,可以根据优先级决定 if self.can_auto_charge(): # 优先级越高,越倾向于接受任务 if priority >= 5: # 中高优先级 self.logger.debug(f"AGV {self.agvId} 接受中高优先级任务(priority={priority}, 电量={self.voltage}%)") return True elif priority >= 3: # 中等优先级,75%概率接受 import random accept = random.random() < 0.75 self.logger.debug(f"AGV {self.agvId} 随机决定{'接受' if accept else '拒绝'}中等优先级任务(priority={priority}, 电量={self.voltage}%)") return accept elif priority >= 1: # 低优先级,50%概率接受(包括优先级1) import random accept = random.random() < 0.5 self.logger.debug(f"AGV {self.agvId} 随机决定{'接受' if accept else '拒绝'}低优先级任务(priority={priority}, 电量={self.voltage}%)") return accept else: # 极低优先级(priority=0),拒绝 self.logger.debug(f"AGV {self.agvId} 电量偏低({self.voltage}%),拒绝极低优先级任务(priority={priority})") return False # 正常情况下可以接受任务 self.logger.debug(f"AGV {self.agvId} 状态良好,可以接受任务(电量={self.voltage}%, priority={priority})") return True def is_overloaded(self) -> bool: """ 检查是否过载 Returns: bool: 是否过载 """ return self.current_task_count >= self.max_capacity def get_workload_ratio(self) -> float: """ 获取工作负载比例 Returns: float: 工作负载比例 (0.0 - 1.0) """ return min(1.0, self.current_task_count / self.max_capacity) def get_task_capacity(self) -> int: """ 获取剩余任务容量 Returns: int: 剩余容量 """ return max(0, self.max_capacity - self.current_task_count) def need_charging(self) -> bool: """ 检查是否需要充电 Returns: bool: 是否需要充电 """ return self.voltage <= self.lowVol def can_auto_charge(self) -> bool: """ 检查是否可以自动充电(低于阈值但不是必须充电) Returns: bool: 是否可以自动充电 """ return self.lowVol < self.voltage <= self.autoCharge def is_low_power(self) -> bool: """ 检查是否为低电量状态 Returns: bool: 是否为低电量状态 """ return self.voltage <= self.autoCharge def get_charging_priority(self) -> int: """ 获取充电优先级(数值越大优先级越高) Returns: int: 充电优先级 """ if self.need_charging(): return 100 # 必须充电,最高优先级 elif self.can_auto_charge(): return 50 + (self.autoCharge - self.voltage) # 根据电量差计算优先级 else: return 0 # 无需充电 def calculate_efficiency_score(self, path_mapping: Dict[str, Dict[str, int]]) -> float: """ 计算AGV效率得分 Args: path_mapping: 路径点映射 Returns: float: 效率得分 (0.0 - 1.0) """ # 基础效率分数 base_score = 0.7 # 根据完成任务数量调整 if self.total_completed_tasks > 0: completion_bonus = min(0.2, self.total_completed_tasks * 0.01) base_score += completion_bonus # 根据负载情况调整 load_ratio = self.get_workload_ratio() if load_ratio < 0.8: # 负载不太高时效率更高 load_bonus = (0.8 - load_ratio) * 0.1 base_score += load_bonus # 根据AGV状态调整 if self.status == "0": # 正常状态 base_score += 0.1 elif self.status in ["3", "4"]: # 异常状态 base_score -= 0.2 # 时间因子:最近更新的AGV得分更高 time_since_update = time.time() - self.last_update_time if time_since_update < 60: # 1分钟内更新 time_bonus = (60 - time_since_update) / 600 # 最多加0.1 base_score += time_bonus return max(0.0, min(1.0, base_score)) def estimate_travel_time(self, target_code: str) -> float: """ 估算到目标位置的行驶时间 Args: target_code: 目标位置码 Returns: float: 估算时间(秒) """ if not self.coordinates: return float('inf') target_coord = get_coordinate_from_path_id(target_code, self.path_mapping) if not target_coord: return float('inf') # 计算曼哈顿距离 distance = calculate_manhattan_distance(self.coordinates, target_coord) # 假设AGV平均速度为1单位/秒 estimated_time = distance * 1.0 # 考虑当前负载对速度的影响 load_factor = 1.0 + (self.get_workload_ratio() * 0.2) return estimated_time * load_factor def get_task_by_id(self, task_id: str) -> Optional[TaskAssignment]: """ 根据任务ID获取任务分配信息 Args: task_id: 任务ID Returns: Optional[TaskAssignment]: 任务分配信息 """ for task in self.assigned_tasks: if task.task_id == task_id: return task return None def get_pending_tasks(self) -> List[TaskAssignment]: """ 获取待执行的任务 Returns: List[TaskAssignment]: 待执行任务列表 """ return [task for task in self.assigned_tasks if task.status in [AGVTaskStatus.ASSIGNED, AGVTaskStatus.EXECUTING]] def clear_completed_tasks(self): """清理已完成的任务""" self.assigned_tasks = [task for task in self.assigned_tasks if task.status != AGVTaskStatus.COMPLETED] def __str__(self) -> str: """字符串表示""" return (f"AGV({self.agvId}, status={self.status}, " f"pos={self.mapCode}, tasks={self.current_task_count}/{self.max_capacity})") class AGVModelManager: """AGV模型管理器""" def __init__(self, path_mapping: Dict[str, Dict[str, int]]): """ 初始化AGV模型管理器 Args: path_mapping: 路径点映射字典 """ self.path_mapping = path_mapping self.agv_models: Dict[str, AGVModel] = {} self.agv_status_data: Dict[str, AGVStatus] = {} # 存储原始AGV状态数据 self.logger = logging.getLogger(__name__) def update_agv_data(self, agv_status_list: List[AGVStatus]): """ 更新AGV数据 Args: agv_status_list: AGV状态列表 """ # 获取当前更新的AGV ID列表 current_agv_ids = {agv_status.agvId for agv_status in agv_status_list} # 移除不再存在的AGV模型和状态数据 removed_agvs = [] for agv_id in list(self.agv_models.keys()): if agv_id not in current_agv_ids: removed_agvs.append(agv_id) del self.agv_models[agv_id] if agv_id in self.agv_status_data: del self.agv_status_data[agv_id] if removed_agvs: self.logger.info(f"移除不存在的AGV: {removed_agvs}") # 更新或创建AGV模型和状态数据 for agv_status in agv_status_list: agv_id = agv_status.agvId # 存储原始AGV状态数据 self.agv_status_data[agv_id] = agv_status # 如果AGV模型不存在,创建新的 if agv_id not in self.agv_models: self.agv_models[agv_id] = AGVModel(agv_id, self.path_mapping) self.logger.info(f"创建新的AGV模型: {agv_id}") # 更新AGV模型 self.agv_models[agv_id].update_from_agv_status(agv_status) self.logger.debug(f"更新了 {len(agv_status_list)} 个AGV模型,当前总数: {len(self.agv_models)}") def get_all_agv_status(self) -> List[AGVStatus]: """ 获取所有AGV的原始状态数据 Returns: List[AGVStatus]: AGV状态数据列表 """ return list(self.agv_status_data.values()) def get_agv_status(self, agv_id: str) -> Optional[AGVStatus]: """ 获取指定AGV的原始状态数据 Args: agv_id: AGV ID Returns: Optional[AGVStatus]: AGV状态数据 """ return self.agv_status_data.get(agv_id) def get_agv_model(self, agv_id: str) -> Optional[AGVModel]: """ 获取AGV模型 Args: agv_id: AGV ID Returns: Optional[AGVModel]: AGV模型 """ return self.agv_models.get(agv_id) def get_all_agvs(self) -> List[AGVModel]: """ 获取所有AGV模型 Returns: List[AGVModel]: AGV模型列表 """ return list(self.agv_models.values()) def get_available_agvs(self) -> List[AGVModel]: """ 获取可用的AGV模型(未满载且状态正常) Returns: List[AGVModel]: 可用AGV模型列表 """ available_agvs = [] for agv in self.agv_models.values(): if not agv.is_overloaded() and agv.can_accept_task(5): available_agvs.append(agv) return available_agvs def get_agvs_by_status(self, status: str) -> List[AGVModel]: """ 根据状态获取AGV列表 Args: status: AGV状态 Returns: List[AGVModel]: 符合状态的AGV列表 """ return [agv for agv in self.agv_models.values() if agv.status == status] def cleanup_old_agvs(self, max_age_seconds: float = 300): """ 清理长时间未更新的AGV Args: max_age_seconds: 最大允许的未更新时间(秒) """ current_time = time.time() old_agvs = [] for agv_id, agv in self.agv_models.items(): if current_time - agv.last_update_time > max_age_seconds: old_agvs.append(agv_id) for agv_id in old_agvs: del self.agv_models[agv_id] self.logger.info(f"清理长时间未更新的AGV: {agv_id}") def get_statistics(self) -> Dict[str, any]: """ 获取AGV统计信息 Returns: Dict: 统计信息 """ total_agvs = len(self.agv_models) available_agvs = len(self.get_available_agvs()) total_tasks = sum(agv.current_task_count for agv in self.agv_models.values()) total_capacity = sum(agv.max_capacity for agv in self.agv_models.values()) avg_efficiency = 0.0 if self.agv_models: avg_efficiency = sum(agv.calculate_efficiency_score(self.path_mapping) for agv in self.agv_models.values()) / total_agvs return { "total_agvs": total_agvs, "available_agvs": available_agvs, "total_assigned_tasks": total_tasks, "total_capacity": total_capacity, "capacity_utilization": total_tasks / total_capacity if total_capacity > 0 else 0.0, "average_efficiency": avg_efficiency }