"""
|
AGV模型 - 用于算法系统的AGV数据建模
|
"""
|
import time
|
import logging
|
from typing import Dict, List, Optional, Tuple, Set
|
from dataclasses import dataclass, field
|
from enum import Enum
|
|
from common.data_models import AGVStatus, BackpackData
|
from common.utils import get_coordinate_from_path_id, calculate_manhattan_distance
|
|
|
class AGVTaskStatus(Enum):
|
"""AGV任务状态"""
|
IDLE = "IDLE" # 空闲
|
ASSIGNED = "ASSIGNED" # 已分配任务但未开始
|
EXECUTING = "EXECUTING" # 执行中
|
COMPLETED = "COMPLETED" # 已完成
|
|
|
@dataclass
|
class TaskAssignment:
|
"""任务分配信息"""
|
task_id: str
|
assigned_time: float
|
priority: int
|
status: AGVTaskStatus = AGVTaskStatus.ASSIGNED
|
start_code: Optional[str] = None
|
end_code: Optional[str] = None
|
estimated_duration: Optional[float] = None
|
|
|
class AGVModel:
|
"""AGV模型类,用于算法计算"""
|
|
def __init__(self, agv_id: str, path_mapping: Dict[str, Dict[str, int]]):
|
"""
|
初始化AGV模型
|
|
Args:
|
agv_id: AGV ID
|
path_mapping: 路径点映射字典
|
"""
|
self.agvId = agv_id
|
self.path_mapping = path_mapping
|
self.logger = logging.getLogger(__name__)
|
|
# AGV状态信息
|
self.status: str = "0" # AGV状态码
|
self.mapCode: str = "" # 当前位置码
|
self.coordinates: Optional[Tuple[int, int]] = None # 当前坐标
|
self.backpack: Optional[List[BackpackData]] = None # 背包信息
|
|
# 充电相关
|
self.voltage: int = 100 # 当前电量百分比
|
self.autoCharge: int = 20 # 低电量设定阈值
|
self.lowVol: int = 10 # 最低电量阈值
|
|
# 任务相关
|
self.assigned_tasks: List[TaskAssignment] = []
|
self.current_task_count: int = 0
|
self.max_capacity: int = 5 # 最大任务容量
|
|
# 性能相关
|
self.efficiency_score: float = 1.0 # 效率得分
|
self.last_update_time: float = time.time()
|
|
# 统计信息
|
self.total_completed_tasks: int = 0
|
self.total_distance_traveled: float = 0.0
|
self.average_completion_time: float = 0.0
|
|
def update_from_agv_status(self, agv_status: AGVStatus):
|
"""从AGV状态更新模型"""
|
self.status = agv_status.status
|
if hasattr(agv_status, 'position'):
|
self.mapCode = agv_status.position
|
elif hasattr(agv_status, 'mapCode'):
|
self.mapCode = agv_status.mapCode
|
else:
|
self.mapCode = ""
|
|
self.backpack = agv_status.backpack
|
self.last_update_time = time.time()
|
|
raw_voltage = getattr(agv_status, 'vol', 100)
|
raw_auto_charge = getattr(agv_status, 'autoCharge', 20)
|
raw_low_vol = getattr(agv_status, 'lowVol', 10)
|
|
# 电量数据标准化处理
|
# 如果电压值大于100,可能是毫伏值,需要转换为百分比
|
if raw_voltage > 100:
|
# 假设正常电压范围是3000-5000mV,转换为0-100%
|
# 这里使用简单的线性映射
|
normalized_voltage = max(0, min(100, ((raw_voltage - 3000) / 2000) * 100))
|
self.logger.debug(f"AGV {self.agvId} 电压标准化: {raw_voltage}mV -> {normalized_voltage:.1f}%")
|
self.voltage = int(normalized_voltage)
|
else:
|
self.voltage = raw_voltage
|
|
# 阈值标准化处理
|
if raw_auto_charge > 100:
|
# 如果阈值也是毫伏值,同样转换
|
self.autoCharge = max(0, min(100, ((raw_auto_charge - 3000) / 2000) * 100))
|
self.logger.debug(f"AGV {self.agvId} 自动充电阈值标准化: {raw_auto_charge}mV -> {self.autoCharge:.1f}%")
|
else:
|
self.autoCharge = raw_auto_charge
|
|
if raw_low_vol > 100:
|
# 如果最低电量阈值也是毫伏值,同样转换
|
self.lowVol = max(0, min(100, ((raw_low_vol - 3000) / 2000) * 100))
|
self.logger.debug(f"AGV {self.agvId} 最低电量阈值标准化: {raw_low_vol}mV -> {self.lowVol:.1f}%")
|
else:
|
self.lowVol = raw_low_vol
|
|
# 更新坐标
|
if self.mapCode:
|
self.coordinates = get_coordinate_from_path_id(self.mapCode, self.path_mapping)
|
|
# 根据AGV状态更新任务计数
|
if self.backpack and self.backpack:
|
self.current_task_count = len([bp for bp in self.backpack if bp.execute])
|
else:
|
self.current_task_count = 0
|
|
def assign_task(self, task_id: str, priority: int = 5,
|
start_code: str = "", end_code: str = "") -> bool:
|
"""
|
分配任务给AGV
|
|
Args:
|
task_id: 任务ID
|
priority: 任务优先级
|
start_code: 起始位置码
|
end_code: 结束位置码
|
|
Returns:
|
bool: 是否分配成功
|
"""
|
if self.is_overloaded():
|
self.logger.warning(f"AGV {self.agvId} 已满载,无法分配更多任务")
|
return False
|
|
# 创建任务分配记录
|
task_assignment = TaskAssignment(
|
task_id=task_id,
|
assigned_time=time.time(),
|
priority=priority,
|
start_code=start_code,
|
end_code=end_code
|
)
|
|
self.assigned_tasks.append(task_assignment)
|
self.current_task_count += 1
|
|
self.logger.info(f"任务 {task_id} 已分配给AGV {self.agvId}")
|
return True
|
|
def complete_task(self, task_id: str) -> bool:
|
"""
|
完成任务
|
|
Args:
|
task_id: 任务ID
|
|
Returns:
|
bool: 是否完成成功
|
"""
|
for task in self.assigned_tasks:
|
if task.task_id == task_id:
|
task.status = AGVTaskStatus.COMPLETED
|
self.current_task_count = max(0, self.current_task_count - 1)
|
self.total_completed_tasks += 1
|
|
self.logger.info(f"AGV {self.agvId} 完成任务 {task_id}")
|
return True
|
|
return False
|
|
def can_accept_task(self, priority: int) -> bool:
|
"""
|
检查是否可以接受新任务
|
|
Args:
|
priority: 任务优先级
|
|
Returns:
|
bool: 是否可以接受
|
"""
|
# 检查容量
|
if self.is_overloaded():
|
self.logger.debug(f"AGV {self.agvId} 任务已满载({self.current_task_count}/{self.max_capacity}),拒绝新任务")
|
return False
|
|
# 检查AGV状态(兼容整数和字符串格式)
|
# 状态 0(空闲), 1(忙碌), 2(充电) 可以接受任务
|
# 状态 3(故障), 4(维护) 不能接受任务
|
status_value = str(self.status) # 统一转换为字符串进行比较
|
if status_value not in ["0", "1", "2"]:
|
self.logger.debug(f"AGV {self.agvId} 状态异常(status={status_value}),拒绝新任务")
|
return False
|
|
# 检查充电状态 - 如果电量过低必须充电,不能接受新任务
|
if self.need_charging():
|
self.logger.debug(f"AGV {self.agvId} 电量过低({self.voltage}% <= {self.lowVol}%),必须充电,拒绝新任务")
|
return False
|
|
# 如果是高优先级任务,即使低电量也可以接受
|
if priority >= 9: # 高优先级任务阈值
|
self.logger.debug(f"AGV {self.agvId} 接受高优先级任务(priority={priority})")
|
return True
|
|
# 对于普通优先级任务,如果电量低但不是必须充电,可以根据优先级决定
|
if self.can_auto_charge():
|
# 优先级越高,越倾向于接受任务
|
if priority >= 5: # 中高优先级
|
self.logger.debug(f"AGV {self.agvId} 接受中高优先级任务(priority={priority}, 电量={self.voltage}%)")
|
return True
|
elif priority >= 3: # 中等优先级,75%概率接受
|
import random
|
accept = random.random() < 0.75
|
self.logger.debug(f"AGV {self.agvId} 随机决定{'接受' if accept else '拒绝'}中等优先级任务(priority={priority}, 电量={self.voltage}%)")
|
return accept
|
elif priority >= 1: # 低优先级,50%概率接受(包括优先级1)
|
import random
|
accept = random.random() < 0.5
|
self.logger.debug(f"AGV {self.agvId} 随机决定{'接受' if accept else '拒绝'}低优先级任务(priority={priority}, 电量={self.voltage}%)")
|
return accept
|
else: # 极低优先级(priority=0),拒绝
|
self.logger.debug(f"AGV {self.agvId} 电量偏低({self.voltage}%),拒绝极低优先级任务(priority={priority})")
|
return False
|
|
# 正常情况下可以接受任务
|
self.logger.debug(f"AGV {self.agvId} 状态良好,可以接受任务(电量={self.voltage}%, priority={priority})")
|
return True
|
|
def is_overloaded(self) -> bool:
|
"""
|
检查是否过载
|
|
Returns:
|
bool: 是否过载
|
"""
|
return self.current_task_count >= self.max_capacity
|
|
def get_workload_ratio(self) -> float:
|
"""
|
获取工作负载比例
|
|
Returns:
|
float: 工作负载比例 (0.0 - 1.0)
|
"""
|
return min(1.0, self.current_task_count / self.max_capacity)
|
|
def get_task_capacity(self) -> int:
|
"""
|
获取剩余任务容量
|
|
Returns:
|
int: 剩余容量
|
"""
|
return max(0, self.max_capacity - self.current_task_count)
|
|
def need_charging(self) -> bool:
|
"""
|
检查是否需要充电
|
|
Returns:
|
bool: 是否需要充电
|
"""
|
return self.voltage <= self.lowVol
|
|
def can_auto_charge(self) -> bool:
|
"""
|
检查是否可以自动充电(低于阈值但不是必须充电)
|
|
Returns:
|
bool: 是否可以自动充电
|
"""
|
return self.lowVol < self.voltage <= self.autoCharge
|
|
def is_low_power(self) -> bool:
|
"""
|
检查是否为低电量状态
|
|
Returns:
|
bool: 是否为低电量状态
|
"""
|
return self.voltage <= self.autoCharge
|
|
def get_charging_priority(self) -> int:
|
"""
|
获取充电优先级(数值越大优先级越高)
|
|
Returns:
|
int: 充电优先级
|
"""
|
if self.need_charging():
|
return 100 # 必须充电,最高优先级
|
elif self.can_auto_charge():
|
return 50 + (self.autoCharge - self.voltage) # 根据电量差计算优先级
|
else:
|
return 0 # 无需充电
|
|
def calculate_efficiency_score(self, path_mapping: Dict[str, Dict[str, int]]) -> float:
|
"""
|
计算AGV效率得分
|
|
Args:
|
path_mapping: 路径点映射
|
|
Returns:
|
float: 效率得分 (0.0 - 1.0)
|
"""
|
# 基础效率分数
|
base_score = 0.7
|
|
# 根据完成任务数量调整
|
if self.total_completed_tasks > 0:
|
completion_bonus = min(0.2, self.total_completed_tasks * 0.01)
|
base_score += completion_bonus
|
|
# 根据负载情况调整
|
load_ratio = self.get_workload_ratio()
|
if load_ratio < 0.8: # 负载不太高时效率更高
|
load_bonus = (0.8 - load_ratio) * 0.1
|
base_score += load_bonus
|
|
# 根据AGV状态调整
|
if self.status == "0": # 正常状态
|
base_score += 0.1
|
elif self.status in ["3", "4"]: # 异常状态
|
base_score -= 0.2
|
|
# 时间因子:最近更新的AGV得分更高
|
time_since_update = time.time() - self.last_update_time
|
if time_since_update < 60: # 1分钟内更新
|
time_bonus = (60 - time_since_update) / 600 # 最多加0.1
|
base_score += time_bonus
|
|
return max(0.0, min(1.0, base_score))
|
|
def estimate_travel_time(self, target_code: str) -> float:
|
"""
|
估算到目标位置的行驶时间
|
|
Args:
|
target_code: 目标位置码
|
|
Returns:
|
float: 估算时间(秒)
|
"""
|
if not self.coordinates:
|
return float('inf')
|
|
target_coord = get_coordinate_from_path_id(target_code, self.path_mapping)
|
if not target_coord:
|
return float('inf')
|
|
# 计算曼哈顿距离
|
distance = calculate_manhattan_distance(self.coordinates, target_coord)
|
|
# 假设AGV平均速度为1单位/秒
|
estimated_time = distance * 1.0
|
|
# 考虑当前负载对速度的影响
|
load_factor = 1.0 + (self.get_workload_ratio() * 0.2)
|
|
return estimated_time * load_factor
|
|
def get_task_by_id(self, task_id: str) -> Optional[TaskAssignment]:
|
"""
|
根据任务ID获取任务分配信息
|
|
Args:
|
task_id: 任务ID
|
|
Returns:
|
Optional[TaskAssignment]: 任务分配信息
|
"""
|
for task in self.assigned_tasks:
|
if task.task_id == task_id:
|
return task
|
return None
|
|
def get_pending_tasks(self) -> List[TaskAssignment]:
|
"""
|
获取待执行的任务
|
|
Returns:
|
List[TaskAssignment]: 待执行任务列表
|
"""
|
return [task for task in self.assigned_tasks
|
if task.status in [AGVTaskStatus.ASSIGNED, AGVTaskStatus.EXECUTING]]
|
|
def clear_completed_tasks(self):
|
"""清理已完成的任务"""
|
self.assigned_tasks = [task for task in self.assigned_tasks
|
if task.status != AGVTaskStatus.COMPLETED]
|
|
def __str__(self) -> str:
|
"""字符串表示"""
|
return (f"AGV({self.agvId}, status={self.status}, "
|
f"pos={self.mapCode}, tasks={self.current_task_count}/{self.max_capacity})")
|
|
|
class AGVModelManager:
|
"""AGV模型管理器"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
|
"""
|
初始化AGV模型管理器
|
|
Args:
|
path_mapping: 路径点映射字典
|
"""
|
self.path_mapping = path_mapping
|
self.agv_models: Dict[str, AGVModel] = {}
|
self.agv_status_data: Dict[str, AGVStatus] = {} # 存储原始AGV状态数据
|
self.logger = logging.getLogger(__name__)
|
|
def update_agv_data(self, agv_status_list: List[AGVStatus]):
|
"""
|
更新AGV数据
|
|
Args:
|
agv_status_list: AGV状态列表
|
"""
|
# 获取当前更新的AGV ID列表
|
current_agv_ids = {agv_status.agvId for agv_status in agv_status_list}
|
|
# 移除不再存在的AGV模型和状态数据
|
removed_agvs = []
|
for agv_id in list(self.agv_models.keys()):
|
if agv_id not in current_agv_ids:
|
removed_agvs.append(agv_id)
|
del self.agv_models[agv_id]
|
if agv_id in self.agv_status_data:
|
del self.agv_status_data[agv_id]
|
|
if removed_agvs:
|
self.logger.info(f"移除不存在的AGV: {removed_agvs}")
|
|
# 更新或创建AGV模型和状态数据
|
for agv_status in agv_status_list:
|
agv_id = agv_status.agvId
|
|
# 存储原始AGV状态数据
|
self.agv_status_data[agv_id] = agv_status
|
|
# 如果AGV模型不存在,创建新的
|
if agv_id not in self.agv_models:
|
self.agv_models[agv_id] = AGVModel(agv_id, self.path_mapping)
|
self.logger.info(f"创建新的AGV模型: {agv_id}")
|
|
# 更新AGV模型
|
self.agv_models[agv_id].update_from_agv_status(agv_status)
|
|
self.logger.debug(f"更新了 {len(agv_status_list)} 个AGV模型,当前总数: {len(self.agv_models)}")
|
|
def get_all_agv_status(self) -> List[AGVStatus]:
|
"""
|
获取所有AGV的原始状态数据
|
|
Returns:
|
List[AGVStatus]: AGV状态数据列表
|
"""
|
return list(self.agv_status_data.values())
|
|
def get_agv_status(self, agv_id: str) -> Optional[AGVStatus]:
|
"""
|
获取指定AGV的原始状态数据
|
|
Args:
|
agv_id: AGV ID
|
|
Returns:
|
Optional[AGVStatus]: AGV状态数据
|
"""
|
return self.agv_status_data.get(agv_id)
|
|
def get_agv_model(self, agv_id: str) -> Optional[AGVModel]:
|
"""
|
获取AGV模型
|
|
Args:
|
agv_id: AGV ID
|
|
Returns:
|
Optional[AGVModel]: AGV模型
|
"""
|
return self.agv_models.get(agv_id)
|
|
def get_all_agvs(self) -> List[AGVModel]:
|
"""
|
获取所有AGV模型
|
|
Returns:
|
List[AGVModel]: AGV模型列表
|
"""
|
return list(self.agv_models.values())
|
|
def get_available_agvs(self) -> List[AGVModel]:
|
"""
|
获取可用的AGV模型(未满载且状态正常)
|
|
Returns:
|
List[AGVModel]: 可用AGV模型列表
|
"""
|
available_agvs = []
|
for agv in self.agv_models.values():
|
if not agv.is_overloaded() and agv.can_accept_task(5):
|
available_agvs.append(agv)
|
|
return available_agvs
|
|
def get_agvs_by_status(self, status: str) -> List[AGVModel]:
|
"""
|
根据状态获取AGV列表
|
|
Args:
|
status: AGV状态
|
|
Returns:
|
List[AGVModel]: 符合状态的AGV列表
|
"""
|
return [agv for agv in self.agv_models.values() if agv.status == status]
|
|
def cleanup_old_agvs(self, max_age_seconds: float = 300):
|
"""
|
清理长时间未更新的AGV
|
|
Args:
|
max_age_seconds: 最大允许的未更新时间(秒)
|
"""
|
current_time = time.time()
|
old_agvs = []
|
|
for agv_id, agv in self.agv_models.items():
|
if current_time - agv.last_update_time > max_age_seconds:
|
old_agvs.append(agv_id)
|
|
for agv_id in old_agvs:
|
del self.agv_models[agv_id]
|
self.logger.info(f"清理长时间未更新的AGV: {agv_id}")
|
|
def get_statistics(self) -> Dict[str, any]:
|
"""
|
获取AGV统计信息
|
|
Returns:
|
Dict: 统计信息
|
"""
|
total_agvs = len(self.agv_models)
|
available_agvs = len(self.get_available_agvs())
|
total_tasks = sum(agv.current_task_count for agv in self.agv_models.values())
|
total_capacity = sum(agv.max_capacity for agv in self.agv_models.values())
|
|
avg_efficiency = 0.0
|
if self.agv_models:
|
avg_efficiency = sum(agv.calculate_efficiency_score(self.path_mapping)
|
for agv in self.agv_models.values()) / total_agvs
|
|
return {
|
"total_agvs": total_agvs,
|
"available_agvs": available_agvs,
|
"total_assigned_tasks": total_tasks,
|
"total_capacity": total_capacity,
|
"capacity_utilization": total_tasks / total_capacity if total_capacity > 0 else 0.0,
|
"average_efficiency": avg_efficiency
|
}
|