""" 路径规划算法 """ import heapq import random import math import time import logging from typing import Dict, List, Tuple, Optional, Set, Any from collections import defaultdict, deque from common.data_models import PlannedPath, PathCode, AGVStatus, BackpackData from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance, generate_segment_id, generate_navigation_code from algorithm_system.algorithms.collision_detection import CollisionDetector, CollisionResolver class ExecutingTaskExtractor: """执行中任务提取器""" def __init__(self, path_mapping: Dict[str, Dict[str, int]]): """ 初始化任务提取器 Args: path_mapping: 路径点映射字典 """ self.path_mapping = path_mapping self.logger = logging.getLogger(__name__) # 预计算坐标映射 self.coord_to_code = {(data['x'], data['y']): code for code, data in path_mapping.items()} # 预分类位置类型 self.pickup_positions = [pos for pos in path_mapping.keys() if pos.startswith("1")] self.charging_positions = [pos for pos in path_mapping.keys() if pos.startswith("2")] self.delivery_positions = [pos for pos in path_mapping.keys() if pos.startswith("3")] self.standby_positions = [pos for pos in path_mapping.keys() if pos.startswith("4")] def extract_optimized_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]: """智能提取AGV执行任务""" executing_tasks = [] for agv_status in agv_status_list: agv_id = agv_status.agvId current_position = agv_status.position if not agv_status.backpack: continue optimized_task = self._analyze_all_backpack_tasks( agv_id, current_position, agv_status.backpack, agv_status ) if optimized_task: executing_tasks.append(optimized_task) self.logger.info(f"智能分析 {len(executing_tasks)} 个AGV的背篓任务并生成最优路径") return executing_tasks def _analyze_all_backpack_tasks(self, agv_id: str, current_position: str, backpack: List[BackpackData], agv_status: AGVStatus) -> Optional[Dict]: """ 分析所有背篓任务,确定最优执行策略 """ # 1. 提取并分类所有有效任务 loaded_tasks = [] # 已装载的任务 unloaded_tasks = [] # 未装载的任务 all_tasks = [] # 所有任务 self.logger.debug(f"[AGV {agv_id}] 开始分析背篓任务,位置: {current_position}") for i, bp in enumerate(backpack): self.logger.debug(f"[AGV {agv_id}] 背篓位置{i+1}: taskId={bp.taskId}, loaded={bp.loaded}, execute={bp.execute}") if not bp.taskId: self.logger.debug(f"[AGV {agv_id}] 背篓位置{i+1}: 跳过(无任务ID)") continue # 尝试解析任务详情 task_info = self._parse_task_details(bp.taskId, current_position) if not task_info: self.logger.warning(f"[AGV {agv_id}] 任务 {bp.taskId} 解析失败,跳过") continue task_data = { 'backpack_item': bp, 'task_info': task_info, 'distance_to_start': task_info.get('distance_to_start', float('inf')), 'distance_to_end': task_info.get('distance_to_end', float('inf')) } all_tasks.append(task_data) if bp.loaded: loaded_tasks.append(task_data) self.logger.debug(f"[AGV {agv_id}] 任务 {bp.taskId} 已装载,距离终点: {task_info.get('distance_to_end', 'N/A')}") else: unloaded_tasks.append(task_data) self.logger.debug(f"[AGV {agv_id}] 任务 {bp.taskId} 未装载,距离起点: {task_info.get('distance_to_start', 'N/A')}") self.logger.debug(f"[AGV {agv_id}] 任务分析结果: 总任务={len(all_tasks)}, 已装载={len(loaded_tasks)}, 未装载={len(unloaded_tasks)}") # 2. 如果没有任何任务,返回None if not all_tasks: self.logger.debug(f"[AGV {agv_id}] 没有有效任务,返回None") return None # 3. 决策最优路径策略 optimal_strategy = self._determine_next_best_action( loaded_tasks, unloaded_tasks, current_position ) if not optimal_strategy: self.logger.debug(f"[AGV {agv_id}] 决策返回None,没有最优策略") return None self.logger.debug(f"[AGV {agv_id}] 决策完成: {optimal_strategy['strategy']} - {optimal_strategy['action']}") self.logger.debug(f"[AGV {agv_id}] 目标位置: {optimal_strategy['target_position']}") self.logger.debug(f"[AGV {agv_id}] 决策原因: {optimal_strategy.get('reason', 'N/A')}") # 4. 构建任务信息 result = { 'agvId': agv_id, 'currentPosition': current_position, 'strategy': optimal_strategy['strategy'], 'next_action': optimal_strategy['action'], 'target_position': optimal_strategy['target_position'], 'primary_task': optimal_strategy['selected_task'], 'all_loaded_tasks': loaded_tasks, 'all_unloaded_tasks': unloaded_tasks, 'total_tasks_count': len(all_tasks), 'estimated_efficiency': optimal_strategy.get('efficiency_score', 0), 'decision_reason': optimal_strategy.get('reason', ''), 'agvDirection': agv_status.direction, 'agvVoltage': agv_status.vol, 'agvError': agv_status.error } self.logger.debug(f"[AGV {agv_id}] 最终任务信息构建完成,返回结果") return result def _determine_next_best_action(self, loaded_tasks: List[Dict], unloaded_tasks: List[Dict], current_position: str) -> Optional[Dict]: """ 确定AGV下一步最优行动 Args: loaded_tasks: 已装载的任务 unloaded_tasks: 未装载的任务 current_position: 当前位置 Returns: Optional[Dict]: 最优行动策略 """ # 默认策略:综合优化决策 # 1. 如果有已装载任务,优先送货 if loaded_tasks: nearest_delivery = min(loaded_tasks, key=lambda t: t['distance_to_end']) # 2. 如果也有未装载任务,比较距离决策 if unloaded_tasks: nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start']) # 综合优化策略:如果取货点显著更近,优先取货;否则优先送货 if nearest_pickup['distance_to_start'] * 1.5 < nearest_delivery['distance_to_end']: return { 'strategy': 'comprehensive_optimization', 'action': 'pickup', 'selected_task': nearest_pickup, 'target_position': nearest_pickup['task_info']['start_position'], 'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position), 'reason': f"综合优化:取货点({nearest_pickup['distance_to_start']})比送货点({nearest_delivery['distance_to_end']})显著更近" } else: return { 'strategy': 'comprehensive_optimization', 'action': 'deliver', 'selected_task': nearest_delivery, 'target_position': nearest_delivery['task_info']['end_position'], 'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position), 'reason': f"综合优化:优先送达已装载货物,送货距离{nearest_delivery['distance_to_end']}" } else: # 只有已装载任务,直接送货 return { 'strategy': 'comprehensive_optimization', 'action': 'deliver', 'selected_task': nearest_delivery, 'target_position': nearest_delivery['task_info']['end_position'], 'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position), 'reason': f"综合优化:送达唯一已装载货物,距离{nearest_delivery['distance_to_end']}" } # 3. 如果只有未装载任务,取货 elif unloaded_tasks: nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start']) return { 'strategy': 'comprehensive_optimization', 'action': 'pickup', 'selected_task': nearest_pickup, 'target_position': nearest_pickup['task_info']['start_position'], 'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position), 'reason': f"综合优化:取货唯一未装载任务,距离{nearest_pickup['distance_to_start']}" } # 4. 没有任务 return None def _parse_task_details(self, task_id: str, current_position: str) -> Optional[Dict]: """ 解析任务详细信息,包括起终点和距离 Args: task_id: 任务ID current_position: 当前位置 Returns: Optional[Dict]: 任务详细信息 """ self.logger.debug(f"开始解析任务 {task_id},当前位置: {current_position}") start_pos, end_pos = self._fast_parse_task_start_end(task_id) if not start_pos or not end_pos: self.logger.warning(f"任务 {task_id} 解析失败: start_pos={start_pos}, end_pos={end_pos}") return None self.logger.debug(f"任务 {task_id} 解析成功: start={start_pos}, end={end_pos}") # 计算距离 current_coord = get_coordinate_from_path_id(current_position, self.path_mapping) start_coord = get_coordinate_from_path_id(start_pos, self.path_mapping) end_coord = get_coordinate_from_path_id(end_pos, self.path_mapping) if not all([current_coord, start_coord, end_coord]): return None distance_to_start = calculate_manhattan_distance(current_coord, start_coord) distance_to_end = calculate_manhattan_distance(current_coord, end_coord) return { 'task_id': task_id, 'start_position': start_pos, 'end_position': end_pos, 'distance_to_start': distance_to_start, 'distance_to_end': distance_to_end, 'start_coord': start_coord, 'end_coord': end_coord } def _calculate_delivery_efficiency(self, task_data: Dict, current_position: str) -> float: """ 计算送货效率分数 Args: task_data: 任务数据 current_position: 当前位置 Returns: float: 效率分数 (0-10) """ distance = task_data['distance_to_end'] # 距离越近效率越高 distance_score = max(0, 10 - distance / 5) # 已装载的任务有额外奖励 loaded_bonus = 3.0 if task_data['backpack_item'].loaded else 0 return distance_score + loaded_bonus def _calculate_pickup_efficiency(self, task_data: Dict, current_position: str) -> float: """ 计算取货效率分数 Args: task_data: 任务数据 current_position: 当前位置 Returns: float: 效率分数 (0-10) """ distance_to_start = task_data['distance_to_start'] # 距离越近效率越高 distance_score = max(0, 10 - distance_to_start / 3) # 很近的取货点有高奖励 if distance_to_start <= 3: distance_score += 5.0 elif distance_to_start <= 5: distance_score += 2.0 return distance_score def _fast_parse_task_start_end(self, task_id: str) -> Tuple[Optional[str], Optional[str]]: """ 解析任务起点和终点 支持两种格式: 1. 标准格式:xxx_start_end 2. 简单格式:数字ID(通过hash分配位置) Args: task_id: 任务ID Returns: Tuple[Optional[str], Optional[str]]: (起点, 终点) """ self.logger.debug(f"解析任务ID: {task_id}") # 方法1:尝试标准格式解析(包含下划线) if "_" in task_id: parts = task_id.split("_") self.logger.debug(f"任务ID包含下划线,分割结果: {parts}") if len(parts) >= 3: potential_start = parts[-2] potential_end = parts[-1] start_valid = potential_start in self.path_mapping end_valid = potential_end in self.path_mapping self.logger.debug(f"标准格式解析结果: start={potential_start}(valid={start_valid}), end={potential_end}(valid={end_valid})") if start_valid and end_valid: return potential_start, potential_end self.logger.debug(f"任务ID分割后部分数量不足: {len(parts)} < 3") # 方法2:简单数字ID的备选机制(通过hash分配位置) self.logger.debug(f"尝试简单数字ID备选解析机制") # 获取所有可用位置 all_positions = list(self.path_mapping.keys()) if len(all_positions) < 2: self.logger.warning(f"路径映射位置不足,无法为任务分配起终点") return None, None # 使用任务ID的hash值来分配起点和终点 try: task_hash = abs(hash(task_id)) # 分配起点(使用hash的前一半) start_index = task_hash % len(all_positions) start_pos = all_positions[start_index] # 分配终点(使用hash的后一半,确保与起点不同) end_index = (task_hash // len(all_positions)) % len(all_positions) if end_index == start_index: end_index = (end_index + 1) % len(all_positions) end_pos = all_positions[end_index] self.logger.debug(f"简单ID备选解析成功: 任务{task_id} -> start={start_pos}, end={end_pos}") return start_pos, end_pos except Exception as e: self.logger.error(f"简单ID备选解析失败: {e}") self.logger.warning(f"任务ID {task_id} 所有解析方法都失败,返回 None, None") return None, None def _fast_parse_target(self, task_id: str, current_position: str, loaded: bool) -> Optional[str]: """ 解析任务目标位置 Args: task_id: 任务ID current_position: 当前位置 loaded: 是否已装载 Returns: Optional[str]: 目标位置码 """ _, task_end = self._fast_parse_task_start_end(task_id) if task_end: return task_end # 基于hash值和装载状态快速选择 hash_val = hash(task_id) % 1000 if loaded: # 已装载,选择出库区域 target_positions = [pos for pos in self.delivery_positions if pos != current_position] if not target_positions: target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position] else: # 未装载,选择仓库区域 target_positions = [pos for pos in self.pickup_positions if pos != current_position] if not target_positions: target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position] if target_positions: return target_positions[hash_val % len(target_positions)] return None def extract_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]: """提取所有AGV正在执行的任务""" self.logger.debug(f"开始转换任务为标准格式,AGV数量: {len(agv_status_list)}") optimized_tasks = self.extract_optimized_executing_tasks(agv_status_list) self.logger.debug(f"提取结果数量: {len(optimized_tasks)}") legacy_tasks = [] for i, task in enumerate(optimized_tasks): try: self.logger.debug(f"转换任务 #{i+1}: AGV {task['agvId']}") primary_task = task['primary_task'] task_info = primary_task['task_info'] backpack_item = primary_task['backpack_item'] self.logger.debug(f" - primary_task存在: {primary_task is not None}") self.logger.debug(f" - task_info存在: {task_info is not None}") self.logger.debug(f" - backpack_item存在: {backpack_item is not None}") # 根据action确定状态 is_pickup_action = task['next_action'] == 'pickup' self.logger.debug(f" - next_action: {task['next_action']}") self.logger.debug(f" - is_pickup_action: {is_pickup_action}") self.logger.debug(f" - target_position: {task['target_position']}") # 保持原有的标准JSON格式 legacy_task = { 'agvId': task['agvId'], 'taskId': task_info['task_id'], 'currentPosition': task['currentPosition'], 'backpackIndex': backpack_item.index, 'status': 'executing', 'agvDirection': task['agvDirection'], 'agvVoltage': task['agvVoltage'], 'agvError': task['agvError'], 'loaded': backpack_item.loaded, 'needPickup': is_pickup_action, 'targetPosition': task['target_position'], 'taskStart': task_info['start_position'], 'taskEnd': task_info['end_position'] } self.logger.debug(f" - 任务构建成功: {legacy_task['agvId']} -> {legacy_task['targetPosition']}") legacy_tasks.append(legacy_task) except Exception as e: self.logger.error(f"转换任务 #{i+1} 失败: {e}") self.logger.error(f"任务内容: {task}") continue self.logger.info(f"使用综合优化策略,为 {len(legacy_tasks)} 个AGV生成标准格式路径") return legacy_tasks class BatchPathPlanner: """批量路径规划器""" def __init__(self, path_mapping: Dict[str, Dict[str, int]], algorithm_type: str = "A_STAR", agv_manager=None): """ 初始化批量路径规划器 Args: path_mapping: 路径点映射字典 algorithm_type: 基础路径规划算法类型 agv_manager: 用于获取AGV状态信息 """ self.path_mapping = path_mapping self.algorithm_type = algorithm_type self.agv_manager = agv_manager self.logger = logging.getLogger(__name__) # 初始化优化组件 self.task_extractor = ExecutingTaskExtractor(path_mapping) self.collision_detector = FastCollisionDetector(min_distance=3.0) self.collision_resolver = FastCollisionResolver(path_mapping, self.collision_detector, agv_manager) self.base_planner = PathPlanningFactory.create_path_planner(algorithm_type, path_mapping) def plan_all_agv_paths(self, agv_status_list: List[AGVStatus], include_idle_agv: bool = False, constraints: List[Tuple[int, int, float]] = None) -> Dict: """ 为所有AGV规划路径(每个AGV只生成一个路径) Args: agv_status_list: AGV状态列表 include_idle_agv: 是否包含空闲AGV constraints: 路径约束条件 Returns: Dict: 包含所有路径信息的结果 """ start_time = time.time() # 1. 提取执行中的任务(使用向后兼容的方法) executing_tasks = self.task_extractor.extract_executing_tasks(agv_status_list) # 2. 路径规划 planned_paths = [] total_planning_time = 0 planned_agv_ids = set() for task in executing_tasks: agv_id = task['agvId'] current_pos = task['currentPosition'] target_pos = task['targetPosition'] task_id = task['taskId'] self.logger.debug(f"开始路径规划: AGV {agv_id}, 任务 {task_id}") self.logger.debug(f" - current_pos: {current_pos} ({type(current_pos)})") self.logger.debug(f" - target_pos: {target_pos} ({type(target_pos)})") # 避免重复规划 if agv_id in planned_agv_ids: self.logger.debug(f" - 跳过AGV {agv_id}:已规划") continue # 验证位置格式 if not current_pos or not target_pos: self.logger.error(f" - AGV {agv_id} 位置信息无效: current={current_pos}, target={target_pos}") continue # 路径格式标准化 from common.utils import normalize_path_id normalized_current = normalize_path_id(current_pos) normalized_target = normalize_path_id(target_pos) self.logger.debug(f" - 格式标准化: {current_pos} -> {normalized_current}, {target_pos} -> {normalized_target}") # 路径规划 path_start_time = time.time() self.logger.debug(f" - 调用base_planner.plan_path({normalized_current}, {normalized_target})") planned_path = self.base_planner.plan_path(normalized_current, normalized_target, constraints) path_end_time = time.time() planning_time = (path_end_time - path_start_time) * 1000 total_planning_time += planning_time self.logger.debug(f" - 路径规划结果: {planned_path is not None}") if planned_path: self.logger.debug(f" - 路径长度: {len(planned_path.codeList) if hasattr(planned_path, 'codeList') else 'N/A'}") else: self.logger.error(f" - AGV {agv_id} 路径规划失败: {current_pos} -> {target_pos}") if planned_path: # 生成路径字典 seg_id = generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2") path_dict = { 'agvId': agv_id, 'segId': seg_id, 'codeList': [] } # 转换路径点格式 for i, path_code in enumerate(planned_path.codeList): if isinstance(path_code, PathCode): action_type = "2" # 任务类型 pos_type = None backpack_level = task.get('backpackIndex', 0) is_target_point = False # 确定位置类型:最后一个点的动作类型 if i == len(planned_path.codeList) - 1: # 最后一个路径点 is_target_point = True if task.get('loaded', False): # 已装载任务:当前路径是去终点放货 pos_type = "2" # 放货 self.logger.debug(f"AGV {agv_id} 在终点 {path_code.code} 设置放货动作") else: # 未装载任务:当前路径是去起点取货 pos_type = "1" # 取货 self.logger.debug(f"AGV {agv_id} 在起点 {path_code.code} 设置取货动作") enhanced_code = generate_navigation_code( code=path_code.code, direction=path_code.direction, action_type=action_type, task_id=task_id, pos_type=pos_type, backpack_level=backpack_level, is_target_point=is_target_point ) path_dict['codeList'].append(enhanced_code) else: enhanced_code = generate_navigation_code( code=path_code.get('code', ''), direction=path_code.get('direction', '90'), action_type="2", task_id=task_id ) path_dict['codeList'].append(enhanced_code) planned_paths.append(path_dict) planned_agv_ids.add(agv_id) self.logger.debug(f"AGV {agv_id} 快速规划: {current_pos} -> {target_pos}") # 3. 空闲AGV处理 if include_idle_agv: idle_paths = self._fast_plan_idle_agv_paths(agv_status_list, executing_tasks, constraints, planned_agv_ids) planned_paths.extend(idle_paths) # 4. 去重检查 final_paths = [] final_agv_ids = set() for path in planned_paths: agv_id = path['agvId'] if agv_id not in final_agv_ids: final_paths.append(path) final_agv_ids.add(agv_id) planned_paths = final_paths # 5. 碰撞检测和解决 conflicts = self.collision_detector.detect_conflicts(planned_paths) if conflicts: self.logger.info(f"发现 {len(conflicts)} 个碰撞,快速解决中...") planned_paths = self.collision_resolver.resolve_conflicts(planned_paths, conflicts, executing_tasks) remaining_conflicts = self.collision_detector.detect_conflicts(planned_paths) else: remaining_conflicts = [] end_time = time.time() total_time = (end_time - start_time) * 1000 # 6. 构建优化结果 result = { 'totalAgvs': len(agv_status_list), 'executingTasksCount': len(executing_tasks), 'plannedPathsCount': len(planned_paths), 'totalPlanningTime': round(total_time, 2), 'pathPlanningTime': round(total_planning_time, 2), 'conflictsDetected': len(conflicts), 'remainingConflicts': len(remaining_conflicts), 'algorithm': f"Optimized_{self.algorithm_type}", 'fourDirectionCompliant': True, 'collisionFree': len(remaining_conflicts) == 0, 'plannedPaths': planned_paths, 'executingTasks': executing_tasks } self.logger.info(f"优化批量路径规划完成 - 总AGV: {result['totalAgvs']}, " f"执行中任务: {result['executingTasksCount']}, " f"规划路径: {result['plannedPathsCount']}, " f"总耗时: {result['totalPlanningTime']:.2f}ms, " f"无碰撞: {result['collisionFree']}") return result def _fast_plan_idle_agv_paths(self, agv_status_list: List[AGVStatus], executing_tasks: List[Dict], constraints: List[Tuple[int, int, float]] = None, planned_agv_ids: Set[str] = None) -> List[Dict]: """ 为空闲AGV规划路径 """ idle_paths = [] executing_agv_ids = {task['agvId'] for task in executing_tasks} if planned_agv_ids is None: planned_agv_ids = executing_agv_ids.copy() for agv_status in agv_status_list: agv_id = agv_status.agvId if agv_id in planned_agv_ids: continue if self._is_agv_idle_fast(agv_status): current_pos = agv_status.position target_pos = self._get_idle_agv_target_fast(agv_id, current_pos) if target_pos and target_pos != current_pos: planned_path = self.base_planner.plan_path(current_pos, target_pos, constraints) if planned_path: action_type = "3" if self._is_charging_path(target_pos) else "4" seg_id = generate_segment_id(agv_id=agv_id, target_position=target_pos, action_type=action_type) path_dict = { 'agvId': agv_id, 'segId': seg_id, 'codeList': [] } for i, path_code in enumerate(planned_path.codeList): if isinstance(path_code, PathCode): # 空闲AGV不需要taskId、posType和lev字段 enhanced_code = generate_navigation_code( code=path_code.code, direction=path_code.direction, action_type=action_type, is_target_point=False # 空闲AGV路径无目标点概念 ) path_dict['codeList'].append(enhanced_code) idle_paths.append(path_dict) planned_agv_ids.add(agv_id) return idle_paths def _is_agv_idle_fast(self, agv_status: AGVStatus) -> bool: """判断AGV是否空闲""" if agv_status.status not in [0, 1, 2] or agv_status.error != 0: return False # 检查背篓 if agv_status.backpack: for item in agv_status.backpack[:2]: # 只检查前两个 if item.loaded and item.execute: return False return True def _get_idle_agv_target_fast(self, agv_id: str, current_position: str) -> Optional[str]: """获取空闲AGV目标位置""" hash_val = hash(agv_id) % 1000 # 选择逻辑 charging_positions = [pos for pos in self.task_extractor.charging_positions if pos != current_position] if charging_positions: return charging_positions[hash_val % len(charging_positions)] standby_positions = [pos for pos in self.task_extractor.standby_positions if pos != current_position] if standby_positions: return standby_positions[hash_val % len(standby_positions)] all_positions = [pos for pos in self.path_mapping.keys() if pos != current_position] if all_positions: return all_positions[hash_val % len(all_positions)] return None def _is_charging_path(self, target_position: str) -> bool: """ 判断目标位置是否为充电站(后期修改) """ if not target_position: return False # 充电站识别 return target_position.startswith("2") or "charge" in target_position.lower() class PathPlanner: """路径规划器基类""" def __init__(self, path_mapping: Dict[str, Dict[str, int]]): """ 初始化路径规划器 Args: path_mapping: 路径点映射字典 """ self.path_mapping = path_mapping self.logger = logging.getLogger(__name__) def plan_path(self, start_code: str, end_code: str, constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]: """ 规划从起点到终点的路径 Args: start_code: 起始路径点ID end_code: 目标路径点ID constraints: 约束条件列表 Returns: Optional[PlannedPath]: 规划的路径,如果无法规划则返回None """ raise NotImplementedError("子类必须实现此方法") def is_valid_path_point(self, code: str) -> bool: """ 检查路径点ID是否有效 Args: code: 路径点ID Returns: bool: 是否有效 """ return code in self.path_mapping class AStarPathPlanner(PathPlanner): """A*路径规划器""" def __init__(self, path_mapping: Dict[str, Dict[str, int]], turn_cost: float = 0.5): """ 初始化A*路径规划器 Args: path_mapping: 路径点映射字典 turn_cost: 转向代价 """ super().__init__(path_mapping) self.turn_cost = turn_cost self.logger = logging.getLogger(__name__) # 四个方向:上、右、下、左 (严格四方向移动) self.directions = [(0, -1), (1, 0), (0, 1), (-1, 0)] self.direction_angles = ["270", "0", "90", "180"] # 预计算坐标映射(必须在 _build_adjacency_list 之前初始化) self.coord_to_code = {} for code, data in path_mapping.items(): if isinstance(data, dict) and 'x' in data and 'y' in data: self.coord_to_code[(data['x'], data['y'])] = code # 预计算邻接表 self.adjacency = self._build_adjacency_list() # 性能监控 self.planning_count = 0 self.total_planning_time = 0.0 self.logger.debug(f"A*规划器初始化完成,邻接表: {len(self.adjacency)} 个节点") def _build_adjacency_list(self) -> Dict[str, List[Tuple[str, str]]]: """ 预计算邻接表 Returns: Dict[str, List[Tuple[str, str]]]: {节点: [(邻居节点, 移动方向), ...]} """ adjacency = defaultdict(list) for code, coord_data in self.path_mapping.items(): x, y = coord_data['x'], coord_data['y'] # 检查四个方向的邻居 for i, (dx, dy) in enumerate(self.directions): next_x, next_y = x + dx, y + dy # 查找邻居代码 neighbor_coord = (next_x, next_y) if neighbor_coord in self.coord_to_code: neighbor_code = self.coord_to_code[neighbor_coord] direction = self.direction_angles[i] adjacency[code].append((neighbor_code, direction)) return adjacency def plan_path(self, start_code: str, end_code: str, constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]: """ Args: start_code: 起始路径点ID end_code: 目标路径点ID constraints: 约束条件列表 Returns: Optional[PlannedPath]: 规划的路径 """ # 验证 if not self.is_valid_path_point(start_code) or not self.is_valid_path_point(end_code): return None if start_code == end_code: return PlannedPath(agvId="", codeList=[PathCode(code=start_code, direction="90")]) # 获取坐标(使用预计算映射) start_coord = (self.path_mapping[start_code]['x'], self.path_mapping[start_code]['y']) end_coord = (self.path_mapping[end_code]['x'], self.path_mapping[end_code]['y']) # A*实现 # 使用元组堆,减少对象创建开销 open_set = [(0, start_code, None, [])] # (f_score, code, parent, path) closed_set = set() g_scores = {start_code: 0} # 预计算约束检查函数 constraint_check = self._build_constraint_checker(constraints) if constraints else None while open_set: f_score, current_code, parent_code, path = heapq.heappop(open_set) if current_code in closed_set: continue closed_set.add(current_code) current_path = path + [current_code] if current_code == end_code: # 重建路径 return self._build_path_result(current_path) # 使用预计算的邻接表 for neighbor_code, direction in self.adjacency[current_code]: if neighbor_code in closed_set: continue # 约束检查 if constraint_check and constraint_check(neighbor_code): continue # 代价计算 g_cost = g_scores[current_code] + 1 if neighbor_code not in g_scores or g_cost < g_scores[neighbor_code]: g_scores[neighbor_code] = g_cost # 启发式计算 neighbor_coord = (self.path_mapping[neighbor_code]['x'], self.path_mapping[neighbor_code]['y']) h_cost = abs(neighbor_coord[0] - end_coord[0]) + abs(neighbor_coord[1] - end_coord[1]) f_cost = g_cost + h_cost heapq.heappush(open_set, (f_cost, neighbor_code, current_code, current_path)) return None def _build_constraint_checker(self, constraints: List[Tuple[int, int, float]]): """构建约束检查函数""" def check_constraints(code: str) -> bool: coord = (self.path_mapping[code]['x'], self.path_mapping[code]['y']) for cx, cy, radius in constraints: if (coord[0] - cx) ** 2 + (coord[1] - cy) ** 2 <= radius ** 2: return True return False return check_constraints def _build_path_result(self, path_codes: List[str]) -> PlannedPath: """构建路径结果""" result_codes = [] for i, code in enumerate(path_codes): direction = "90" # 默认方向 if i < len(path_codes) - 1: # 使用预计算的邻接表查找方向 next_code = path_codes[i + 1] for neighbor_code, neighbor_direction in self.adjacency[code]: if neighbor_code == next_code: direction = neighbor_direction break result_codes.append(PathCode(code=code, direction=direction)) return PlannedPath(agvId="", codeList=result_codes) # 碰撞检测器 class FastCollisionDetector: """碰撞检测器""" def __init__(self, min_distance: float = 3.0): self.min_distance = min_distance self.logger = logging.getLogger(__name__) def detect_conflicts(self, planned_paths: List[Dict]) -> List[Dict]: """ 碰撞检测 - 基于时间步的优化检测 Args: planned_paths: 规划路径列表 Returns: List[Dict]: 冲突列表 """ conflicts = [] if len(planned_paths) < 2: return conflicts # 构建时间-位置映射 time_positions = defaultdict(list) for path_data in planned_paths: agv_id = path_data['agvId'] code_list = path_data.get('codeList', []) for time_step, path_code in enumerate(code_list): # 快速提取位置码 if isinstance(path_code, dict): position = path_code.get('code', '') elif hasattr(path_code, 'code'): position = path_code.code else: position = str(path_code) if position: time_positions[time_step].append((agv_id, position)) # 快速冲突检测 for time_step, agv_positions in time_positions.items(): if len(agv_positions) < 2: continue # 位置分组检查 position_groups = defaultdict(list) for agv_id, position in agv_positions: position_groups[position].append(agv_id) # 发现冲突 for position, agv_list in position_groups.items(): if len(agv_list) > 1: conflicts.append({ 'type': 'position_conflict', 'time_step': time_step, 'position': position, 'agv_ids': agv_list, 'severity': 'high' }) self.logger.debug(f"快速检测到 {len(conflicts)} 个冲突") return conflicts # 碰撞解决器 class FastCollisionResolver: """碰撞解决器""" def __init__(self, path_mapping: Dict[str, Dict[str, int]], collision_detector: FastCollisionDetector, agv_manager=None): self.path_mapping = path_mapping self.collision_detector = collision_detector self.agv_manager = agv_manager self.logger = logging.getLogger(__name__) def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Dict], executing_tasks: List[Dict] = None) -> List[Dict]: """ 解决冲突 Args: planned_paths: 规划路径列表 conflicts: 冲突列表 executing_tasks: 执行任务列表 Returns: List[Dict]: 解决冲突后的路径列表 """ if not conflicts: return planned_paths # 构建路径字典便于快速访问 paths_dict = {path['agvId']: path for path in planned_paths} # 按时间步排序处理冲突 sorted_conflicts = sorted(conflicts, key=lambda x: x.get('time_step', 0)) for conflict in sorted_conflicts: if conflict['type'] == 'position_conflict': agv_ids = conflict['agv_ids'] time_step = conflict['time_step'] # 简单策略:保留第一个AGV,其他延迟 sorted_agv_ids = sorted(agv_ids) for i, agv_id in enumerate(sorted_agv_ids[1:], 1): if agv_id in paths_dict: self._add_delay_to_path(paths_dict[agv_id], delay_steps=i) self.logger.debug(f"为AGV {agv_id} 添加 {i} 步延迟") return list(paths_dict.values()) def _add_delay_to_path(self, path_data: Dict, delay_steps: int): """为路径添加延迟步骤""" if delay_steps <= 0 or not path_data.get('codeList'): return # 在路径开始处重复第一个位置 first_code = path_data['codeList'][0] # 创建延迟代码 if isinstance(first_code, dict): delay_code = first_code.copy() else: delay_code = { 'code': first_code.code if hasattr(first_code, 'code') else str(first_code), 'direction': first_code.direction if hasattr(first_code, 'direction') else "90" } # 添加延迟步骤 for _ in range(delay_steps): path_data['codeList'].insert(0, delay_code) def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]: """验证四向移动约束""" return planned_paths class PathPlanningFactory: """路径规划器工厂类""" @staticmethod def create_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]]) -> PathPlanner: """ 创建路径规划器实例 Args: algorithm_type: 算法类型 path_mapping: 路径点映射字典 Returns: PathPlanner: 路径规划器实例 """ algorithm_upper = algorithm_type.upper() if algorithm_upper in ["A_STAR", "ASTAR", "A*", "DIJKSTRA", "GREEDY"]: return AStarPathPlanner(path_mapping) else: # 默认A*算法 logging.getLogger(__name__).warning(f"未知算法类型 {algorithm_type},使用默认A*算法") return AStarPathPlanner(path_mapping) @staticmethod def create_batch_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]], agv_manager=None) -> BatchPathPlanner: """ 创建批量路径规划器实例 Args: algorithm_type: 基础算法类型 path_mapping: 路径点映射字典 agv_manager: AGV管理器,用于获取AGV状态信息 Returns: BatchPathPlanner: 批量路径规划器实例 """ return BatchPathPlanner(path_mapping, algorithm_type, agv_manager) # 导出核心类和函数 __all__ = [ 'ExecutingTaskExtractor', 'BatchPathPlanner', 'PathPlanner', 'AStarPathPlanner', 'PathPlanningFactory', 'FastCollisionDetector', 'FastCollisionResolver' ]