""" AGV路径碰撞检测和解决模块 """ import math import logging from typing import Dict, List, Tuple, Optional, Set from dataclasses import dataclass from collections import defaultdict from common.data_models import PlannedPath, PathCode, AGVActionTypeEnum from common.utils import get_coordinate_from_path_id @dataclass class SpaceTimeNode: """时空节点 - 表示AGV在特定时间和位置的状态""" agv_id: str position: str # 位置码 coordinates: Tuple[int, int] # 坐标 time_step: int # 时间步 direction: str # 方向 @dataclass class Conflict: """冲突描述""" type: str # 冲突类型: "vertex", "edge", "follow" agv1: str agv2: str time_step: int position1: str position2: str description: str @dataclass class AGVPriority: """AGV优先级评估结果""" agv_id: str priority_score: float # 优先级分数,越高越优先 task_status: str # 任务状态 action_type: str # 动作类型 task_priority: int # 任务优先级 voltage: int # 电量 explanation: str # 优先级说明 class CollisionDetector: """AGV路径碰撞检测器""" def __init__(self, path_mapping: Dict[str, Dict[str, int]], min_distance: float = 3.0, time_buffer: int = 1): """ 初始化碰撞检测器 Args: path_mapping: 路径点映射字典 min_distance: AGV之间的最小安全距离 time_buffer: 时间缓冲区(时间步数) """ self.path_mapping = path_mapping self.min_distance = min_distance self.time_buffer = time_buffer self.logger = logging.getLogger(__name__) def detect_conflicts(self, planned_paths: List[Dict]) -> List[Conflict]: """ 检测所有AGV路径之间的冲突 Args: planned_paths: 规划路径列表 Returns: List[Conflict]: 冲突列表 """ conflicts = [] # 构建时空表 space_time_table = self._build_space_time_table(planned_paths) # 检测顶点冲突(同一时间同一位置) conflicts.extend(self._detect_vertex_conflicts(space_time_table)) # 检测边冲突(AGV交换位置) conflicts.extend(self._detect_edge_conflicts(space_time_table)) # 检测跟随冲突(AGV距离过近) conflicts.extend(self._detect_following_conflicts(space_time_table)) self.logger.info(f"检测到 {len(conflicts)} 个路径冲突") return conflicts def _build_space_time_table(self, planned_paths: List[Dict]) -> Dict[int, List[SpaceTimeNode]]: """ 构建时空表 Args: planned_paths: 规划路径列表 Returns: Dict[int, List[SpaceTimeNode]]: 时间步 -> 时空节点列表 """ space_time_table = defaultdict(list) for path_data in planned_paths: agv_id = path_data.get('agvId', '') code_list = path_data.get('codeList', []) for time_step, path_code in enumerate(code_list): position = path_code.get('code', '') if isinstance(path_code, dict) else path_code.code direction = path_code.get('direction', '90') if isinstance(path_code, dict) else path_code.direction coordinates = get_coordinate_from_path_id(position, self.path_mapping) if coordinates: node = SpaceTimeNode( agv_id=agv_id, position=position, coordinates=coordinates, time_step=time_step, direction=direction ) space_time_table[time_step].append(node) return space_time_table def _detect_vertex_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]: """检测顶点冲突(同一时间同一位置)""" conflicts = [] for time_step, nodes in space_time_table.items(): # 按位置分组 position_groups = defaultdict(list) for node in nodes: position_groups[node.position].append(node) # 检查每个位置是否有多个AGV for position, agv_nodes in position_groups.items(): if len(agv_nodes) > 1: # 发现冲突 for i in range(len(agv_nodes)): for j in range(i + 1, len(agv_nodes)): conflict = Conflict( type="vertex", agv1=agv_nodes[i].agv_id, agv2=agv_nodes[j].agv_id, time_step=time_step, position1=position, position2=position, description=f"AGV {agv_nodes[i].agv_id} 和 {agv_nodes[j].agv_id} 在时间 {time_step} 占用同一位置 {position}" ) conflicts.append(conflict) return conflicts def _detect_edge_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]: """检测边冲突(AGV交换位置)""" conflicts = [] max_time = max(space_time_table.keys()) if space_time_table else 0 for time_step in range(max_time): current_nodes = space_time_table.get(time_step, []) next_nodes = space_time_table.get(time_step + 1, []) # 构建当前和下一时刻的位置映射 current_positions = {node.agv_id: node.position for node in current_nodes} next_positions = {node.agv_id: node.position for node in next_nodes} # 检查是否有AGV交换位置 for agv1, pos1_current in current_positions.items(): for agv2, pos2_current in current_positions.items(): if agv1 >= agv2: # 避免重复检查 continue pos1_next = next_positions.get(agv1) pos2_next = next_positions.get(agv2) if (pos1_next and pos2_next and pos1_current == pos2_next and pos2_current == pos1_next): # 发现边冲突 conflict = Conflict( type="edge", agv1=agv1, agv2=agv2, time_step=time_step, position1=pos1_current, position2=pos2_current, description=f"AGV {agv1} 和 {agv2} 在时间 {time_step}-{time_step+1} 交换位置 {pos1_current}<->{pos2_current}" ) conflicts.append(conflict) return conflicts def _detect_following_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]: """检测跟随冲突(AGV距离过近)""" conflicts = [] for time_step, nodes in space_time_table.items(): # 检查所有AGV对之间的距离 for i in range(len(nodes)): for j in range(i + 1, len(nodes)): node1, node2 = nodes[i], nodes[j] # 计算欧几里得距离 distance = math.sqrt( (node1.coordinates[0] - node2.coordinates[0]) ** 2 + (node1.coordinates[1] - node2.coordinates[1]) ** 2 ) if distance < self.min_distance and distance > 0: conflict = Conflict( type="follow", agv1=node1.agv_id, agv2=node2.agv_id, time_step=time_step, position1=node1.position, position2=node2.position, description=f"AGV {node1.agv_id} 和 {node2.agv_id} 在时间 {time_step} 距离过近 ({distance:.2f} < {self.min_distance})" ) conflicts.append(conflict) return conflicts class CollisionResolver: """AGV路径碰撞解决器""" def __init__(self, path_mapping: Dict[str, Dict[str, int]], detector: CollisionDetector, agv_manager=None): """ 初始化碰撞解决器 Args: path_mapping: 路径点映射字典 detector: 碰撞检测器 agv_manager: AGV管理器,用于获取AGV状态信息 """ self.path_mapping = path_mapping self.detector = detector self.agv_manager = agv_manager self.logger = logging.getLogger(__name__) def evaluate_agv_priority(self, agv_id: str, path: Dict, executing_tasks: List[Dict] = None) -> AGVPriority: """ 评估AGV的避让优先级 Args: agv_id: AGV ID path: AGV路径信息 executing_tasks: 执行中的任务列表 Returns: AGVPriority: AGV优先级评估结果 """ # 默认值 task_status = "idle" action_type = "unknown" task_priority = 1 voltage = 100 priority_score = 0.0 explanation_parts = [] # 获取AGV状态信息 agv_model = None if self.agv_manager: agv_model = self.agv_manager.get_agv_by_id(agv_id) if agv_model: voltage = agv_model.voltage # 从路径信息中获取动作类型 code_list = path.get('codeList', []) if code_list: first_code = code_list[0] if isinstance(first_code, dict): action_type = first_code.get('type', 'unknown') # 从执行中任务获取任务状态和优先级 if executing_tasks: for task in executing_tasks: if task.get('agvId') == agv_id: task_status = task.get('status', 'idle') # 尝试从任务ID获取优先级(简化处理) task_id = task.get('taskId', '') if 'HIGH_PRIORITY' in task_id: task_priority = 10 elif 'PRIORITY' in task_id: task_priority = 8 else: task_priority = 5 break # 计算优先级分数(分数越高优先级越高,越不容易让步) priority_score = 0.0 # 1. 任务状态优先级 (40%权重) if task_status == "executing": priority_score += 40.0 explanation_parts.append("执行任务中(+40)") elif task_status == "assigned": priority_score += 20.0 explanation_parts.append("已分配任务(+20)") else: priority_score += 5.0 explanation_parts.append("空闲状态(+5)") # 2. 动作类型优先级 (30%权重) if action_type == AGVActionTypeEnum.TASK.value: priority_score += 30.0 explanation_parts.append("任务行为(+30)") elif action_type == AGVActionTypeEnum.CHARGING.value: priority_score += 25.0 explanation_parts.append("充电行为(+25)") elif action_type == AGVActionTypeEnum.AVOIDANCE.value: priority_score += 5.0 explanation_parts.append("避让行为(+5)") elif action_type == AGVActionTypeEnum.STANDBY.value: priority_score += 10.0 explanation_parts.append("待机行为(+10)") else: priority_score += 15.0 explanation_parts.append("未知行为(+15)") # 3. 任务优先级 (20%权重) task_priority_score = (task_priority / 10.0) * 20.0 priority_score += task_priority_score explanation_parts.append(f"任务优先级{task_priority}(+{task_priority_score:.1f})") # 4. 电量状态 (10%权重) if voltage <= 10: # 必须充电 priority_score += 10.0 explanation_parts.append("电量危险(+10)") elif voltage <= 20: # 可自动充电 priority_score += 8.0 explanation_parts.append("电量偏低(+8)") elif voltage <= 50: priority_score += 5.0 explanation_parts.append("电量一般(+5)") else: priority_score += 2.0 explanation_parts.append("电量充足(+2)") explanation = f"总分{priority_score:.1f}: " + ", ".join(explanation_parts) return AGVPriority( agv_id=agv_id, priority_score=priority_score, task_status=task_status, action_type=action_type, task_priority=task_priority, voltage=voltage, explanation=explanation ) def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Conflict], executing_tasks: List[Dict] = None) -> List[Dict]: """ 解决冲突 Args: planned_paths: 原始规划路径 conflicts: 冲突列表 executing_tasks: 执行中的任务列表 Returns: List[Dict]: 解决冲突后的路径 """ if not conflicts: return planned_paths resolved_paths = [path.copy() for path in planned_paths] # 按时间步排序冲突,优先解决早期冲突 conflicts_sorted = sorted(conflicts, key=lambda c: c.time_step) for conflict in conflicts_sorted: resolved_paths = self._resolve_single_conflict(resolved_paths, conflict, executing_tasks) self.logger.info(f"解决了 {len(conflicts)} 个路径冲突") return resolved_paths def _resolve_single_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: """ 解决单个冲突 Args: paths: 当前路径列表 conflict: 冲突 executing_tasks: 执行中的任务列表 Returns: List[Dict]: 更新后的路径列表 """ if conflict.type == "vertex": return self._resolve_vertex_conflict(paths, conflict, executing_tasks) elif conflict.type == "edge": return self._resolve_edge_conflict(paths, conflict, executing_tasks) elif conflict.type == "follow": return self._resolve_following_conflict(paths, conflict, executing_tasks) return paths def _resolve_vertex_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: """解决顶点冲突 - 基于优先级评估选择让步的AGV""" updated_paths = paths.copy() # 找到冲突的AGV路径 agv1_path = None agv2_path = None agv1_index = agv2_index = -1 for i, path in enumerate(updated_paths): if path.get('agvId') == conflict.agv1: agv1_path = path agv1_index = i elif path.get('agvId') == conflict.agv2: agv2_path = path agv2_index = i if not agv1_path or not agv2_path: return updated_paths # 评估两个AGV的优先级 agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks) agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks) # 选择优先级较低的AGV进行等待 if agv1_priority.priority_score <= agv2_priority.priority_score: waiting_agv_path = agv1_path waiting_agv_index = agv1_index waiting_agv_id = conflict.agv1 higher_priority_agv_id = conflict.agv2 else: waiting_agv_path = agv2_path waiting_agv_index = agv2_index waiting_agv_id = conflict.agv2 higher_priority_agv_id = conflict.agv1 # 记录详细的避让决策信息 self.logger.info(f"顶点冲突避让决策 - 位置: {conflict.position1}, 时间: {conflict.time_step}") self.logger.info(f" AGV {conflict.agv1}: {agv1_priority.explanation}") self.logger.info(f" AGV {conflict.agv2}: {agv2_priority.explanation}") self.logger.info(f" 决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}") # 在冲突位置之前插入等待步骤 self._insert_wait_step(waiting_agv_path, conflict.time_step) updated_paths[waiting_agv_index] = waiting_agv_path return updated_paths def _resolve_edge_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: """解决边冲突 - 基于优先级评估选择让步的AGV""" updated_paths = paths.copy() # 找到冲突的AGV路径 agv1_path = None agv2_path = None agv1_index = agv2_index = -1 for i, path in enumerate(updated_paths): if path.get('agvId') == conflict.agv1: agv1_path = path agv1_index = i elif path.get('agvId') == conflict.agv2: agv2_path = path agv2_index = i if not agv1_path or not agv2_path: return updated_paths # 评估两个AGV的优先级 agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks) agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks) # 选择优先级较低的AGV进行等待 if agv1_priority.priority_score <= agv2_priority.priority_score: waiting_agv_path = agv1_path waiting_agv_index = agv1_index waiting_agv_id = conflict.agv1 higher_priority_agv_id = conflict.agv2 else: waiting_agv_path = agv2_path waiting_agv_index = agv2_index waiting_agv_id = conflict.agv2 higher_priority_agv_id = conflict.agv1 # 记录详细的避让决策信息 self.logger.info(f"边冲突避让决策 - 位置交换: {conflict.position1}<->{conflict.position2}, 时间: {conflict.time_step}") self.logger.info(f" AGV {conflict.agv1}: {agv1_priority.explanation}") self.logger.info(f" AGV {conflict.agv2}: {agv2_priority.explanation}") self.logger.info(f" 决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}") # 在冲突位置之前插入等待步骤 self._insert_wait_step(waiting_agv_path, conflict.time_step) updated_paths[waiting_agv_index] = waiting_agv_path return updated_paths def _resolve_following_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: """解决跟随冲突 - 基于优先级评估选择让步的AGV""" updated_paths = paths.copy() # 找到冲突的AGV路径 agv1_path = None agv2_path = None agv1_index = agv2_index = -1 for i, path in enumerate(updated_paths): if path.get('agvId') == conflict.agv1: agv1_path = path agv1_index = i elif path.get('agvId') == conflict.agv2: agv2_path = path agv2_index = i if not agv1_path or not agv2_path: return updated_paths # 评估两个AGV的优先级 agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks) agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks) # 选择优先级较低的AGV进行等待 if agv1_priority.priority_score <= agv2_priority.priority_score: waiting_agv_path = agv1_path waiting_agv_index = agv1_index waiting_agv_id = conflict.agv1 higher_priority_agv_id = conflict.agv2 else: waiting_agv_path = agv2_path waiting_agv_index = agv2_index waiting_agv_id = conflict.agv2 higher_priority_agv_id = conflict.agv1 # 记录详细的避让决策信息 self.logger.info(f"跟随冲突避让决策 - 距离过近, 时间: {conflict.time_step}") self.logger.info(f" AGV {conflict.agv1} (位置: {conflict.position1}): {agv1_priority.explanation}") self.logger.info(f" AGV {conflict.agv2} (位置: {conflict.position2}): {agv2_priority.explanation}") self.logger.info(f" 决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}") # 在冲突位置之前插入等待步骤 self._insert_wait_step(waiting_agv_path, conflict.time_step) updated_paths[waiting_agv_index] = waiting_agv_path return updated_paths def _insert_wait_step(self, path: Dict, time_step: int): """ 在指定时间步插入等待步骤 Args: path: AGV路径 time_step: 插入位置的时间步 """ code_list = path.get('codeList', []) if time_step < len(code_list) and time_step > 0: # 在指定位置插入等待步骤(重复前一个位置) prev_code = code_list[time_step - 1] # 确保复制所有字段 if isinstance(prev_code, dict): wait_code = prev_code.copy() else: wait_code = { 'code': prev_code.code, 'direction': prev_code.direction } code_list.insert(time_step, wait_code) path['codeList'] = code_list def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]: """ 验证并修正路径,确保AGV只能以0、90、180、270度移动 Args: planned_paths: 规划路径列表 Returns: List[Dict]: 修正后的路径列表 """ validated_paths = [] for path_data in planned_paths: validated_path = self._validate_single_path_directions(path_data) validated_paths.append(validated_path) return validated_paths def _validate_single_path_directions(self, path_data: Dict) -> Dict: """ 验证单个路径的移动方向 Args: path_data: 单个AGV路径数据 Returns: Dict: 修正后的路径数据 """ validated_path = path_data.copy() code_list = path_data.get('codeList', []) if len(code_list) < 2: return validated_path validated_code_list = [] for i in range(len(code_list)): current_code = code_list[i] if isinstance(current_code, dict): position = current_code.get('code', '') direction = current_code.get('direction', '90') # 保留原有的所有字段 validated_code = current_code.copy() else: position = current_code.code direction = current_code.direction # 转换为字典格式并保留基本字段 validated_code = { 'code': position, 'direction': direction } # 如果不是第一个点,计算正确的方向 if i > 0: prev_code = code_list[i - 1] prev_position = prev_code.get('code', '') if isinstance(prev_code, dict) else prev_code.code # 计算移动方向 correct_direction = self._calculate_movement_direction(prev_position, position) if correct_direction: direction = correct_direction # 确保方向是有效的四方向之一 direction = self._normalize_direction(direction) # 更新方向,保留其他所有字段 validated_code['direction'] = direction validated_code_list.append(validated_code) validated_path['codeList'] = validated_code_list return validated_path def _calculate_movement_direction(self, from_position: str, to_position: str) -> Optional[str]: """ 计算从一个位置到另一个位置的移动方向 Args: from_position: 起始位置码 to_position: 目标位置码 Returns: Optional[str]: 移动方向(0, 90, 180, 270) """ from_coord = get_coordinate_from_path_id(from_position, self.path_mapping) to_coord = get_coordinate_from_path_id(to_position, self.path_mapping) if not from_coord or not to_coord: return None dx = to_coord[0] - from_coord[0] dy = to_coord[1] - from_coord[1] # 只允许四个方向的移动 if dx > 0 and dy == 0: return "0" # 向东 elif dx < 0 and dy == 0: return "180" # 向西 elif dx == 0 and dy > 0: return "90" # 向南 elif dx == 0 and dy < 0: return "270" # 向北 else: # 不是标准四方向移动,返回默认方向 return "90" def _normalize_direction(self, direction: str) -> str: """ 标准化方向值,确保只有0、90、180、270 Args: direction: 原始方向值 Returns: str: 标准化后的方向值 """ try: angle = int(direction) % 360 # 将角度映射到最近的四方向 if angle <= 45 or angle > 315: return "0" elif 45 < angle <= 135: return "90" elif 135 < angle <= 225: return "180" else: return "270" except: return "90" # 默认方向