From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001 From: zhang <zc857179121@qq.com> Date: 星期三, 02 七月 2025 13:12:26 +0800 Subject: [PATCH] 初版提交 --- algorithm_system/algorithms/path_planning.py | 1131 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 1,131 insertions(+), 0 deletions(-) diff --git a/algorithm_system/algorithms/path_planning.py b/algorithm_system/algorithms/path_planning.py new file mode 100644 index 0000000..8820f7a --- /dev/null +++ b/algorithm_system/algorithms/path_planning.py @@ -0,0 +1,1131 @@ +""" +璺緞瑙勫垝绠楁硶 +""" +import heapq +import random +import math +import time +import logging +from typing import Dict, List, Tuple, Optional, Set, Any +from collections import defaultdict, deque + +from common.data_models import PlannedPath, PathCode, AGVStatus, BackpackData +from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance, generate_segment_id, generate_navigation_code +from algorithm_system.algorithms.collision_detection import CollisionDetector, CollisionResolver + + +class ExecutingTaskExtractor: + """鎵ц涓换鍔℃彁鍙栧櫒""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]]): + """ + 鍒濆鍖栦换鍔℃彁鍙栧櫒 + + Args: + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + """ + self.path_mapping = path_mapping + self.logger = logging.getLogger(__name__) + + # 棰勮绠楀潗鏍囨槧灏� + self.coord_to_code = {(data['x'], data['y']): code for code, data in path_mapping.items()} + + # 棰勫垎绫讳綅缃被鍨� + self.pickup_positions = [pos for pos in path_mapping.keys() if pos.startswith("1")] + self.charging_positions = [pos for pos in path_mapping.keys() if pos.startswith("2")] + self.delivery_positions = [pos for pos in path_mapping.keys() if pos.startswith("3")] + self.standby_positions = [pos for pos in path_mapping.keys() if pos.startswith("4")] + + def extract_optimized_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]: + """鏅鸿兘鎻愬彇AGV鎵ц浠诲姟""" + executing_tasks = [] + + for agv_status in agv_status_list: + agv_id = agv_status.agvId + current_position = agv_status.position + + if not agv_status.backpack: + continue + + optimized_task = self._analyze_all_backpack_tasks( + agv_id, current_position, agv_status.backpack, agv_status + ) + + if optimized_task: + executing_tasks.append(optimized_task) + + self.logger.info(f"鏅鸿兘鍒嗘瀽 {len(executing_tasks)} 涓狝GV鐨勮儗绡撲换鍔″苟鐢熸垚鏈�浼樿矾寰�") + return executing_tasks + + def _analyze_all_backpack_tasks(self, agv_id: str, current_position: str, + backpack: List[BackpackData], agv_status: AGVStatus) -> Optional[Dict]: + """ + 鍒嗘瀽鎵�鏈夎儗绡撲换鍔★紝纭畾鏈�浼樻墽琛岀瓥鐣� + """ + # 1. 鎻愬彇骞跺垎绫绘墍鏈夋湁鏁堜换鍔� + loaded_tasks = [] # 宸茶杞界殑浠诲姟 + unloaded_tasks = [] # 鏈杞界殑浠诲姟 + all_tasks = [] # 鎵�鏈変换鍔� + + self.logger.debug(f"[AGV {agv_id}] 寮�濮嬪垎鏋愯儗绡撲换鍔★紝浣嶇疆: {current_position}") + + for i, bp in enumerate(backpack): + self.logger.debug(f"[AGV {agv_id}] 鑳岀瘬浣嶇疆{i+1}: taskId={bp.taskId}, loaded={bp.loaded}, execute={bp.execute}") + + if not bp.taskId: + self.logger.debug(f"[AGV {agv_id}] 鑳岀瘬浣嶇疆{i+1}: 璺宠繃锛堟棤浠诲姟ID锛�") + continue + + # 灏濊瘯瑙f瀽浠诲姟璇︽儏 + task_info = self._parse_task_details(bp.taskId, current_position) + if not task_info: + self.logger.warning(f"[AGV {agv_id}] 浠诲姟 {bp.taskId} 瑙f瀽澶辫触锛岃烦杩�") + continue + + task_data = { + 'backpack_item': bp, + 'task_info': task_info, + 'distance_to_start': task_info.get('distance_to_start', float('inf')), + 'distance_to_end': task_info.get('distance_to_end', float('inf')) + } + + all_tasks.append(task_data) + + if bp.loaded: + loaded_tasks.append(task_data) + self.logger.debug(f"[AGV {agv_id}] 浠诲姟 {bp.taskId} 宸茶杞斤紝璺濈缁堢偣: {task_info.get('distance_to_end', 'N/A')}") + else: + unloaded_tasks.append(task_data) + self.logger.debug(f"[AGV {agv_id}] 浠诲姟 {bp.taskId} 鏈杞斤紝璺濈璧风偣: {task_info.get('distance_to_start', 'N/A')}") + + self.logger.debug(f"[AGV {agv_id}] 浠诲姟鍒嗘瀽缁撴灉: 鎬讳换鍔�={len(all_tasks)}, 宸茶杞�={len(loaded_tasks)}, 鏈杞�={len(unloaded_tasks)}") + + # 2. 濡傛灉娌℃湁浠讳綍浠诲姟锛岃繑鍥濶one + if not all_tasks: + self.logger.debug(f"[AGV {agv_id}] 娌℃湁鏈夋晥浠诲姟锛岃繑鍥濶one") + return None + + # 3. 鍐崇瓥鏈�浼樿矾寰勭瓥鐣� + optimal_strategy = self._determine_next_best_action( + loaded_tasks, unloaded_tasks, current_position + ) + + if not optimal_strategy: + self.logger.debug(f"[AGV {agv_id}] 鍐崇瓥杩斿洖None锛屾病鏈夋渶浼樼瓥鐣�") + return None + + self.logger.debug(f"[AGV {agv_id}] 鍐崇瓥瀹屾垚: {optimal_strategy['strategy']} - {optimal_strategy['action']}") + self.logger.debug(f"[AGV {agv_id}] 鐩爣浣嶇疆: {optimal_strategy['target_position']}") + self.logger.debug(f"[AGV {agv_id}] 鍐崇瓥鍘熷洜: {optimal_strategy.get('reason', 'N/A')}") + + # 4. 鏋勫缓浠诲姟淇℃伅 + result = { + 'agvId': agv_id, + 'currentPosition': current_position, + 'strategy': optimal_strategy['strategy'], + 'next_action': optimal_strategy['action'], + 'target_position': optimal_strategy['target_position'], + 'primary_task': optimal_strategy['selected_task'], + 'all_loaded_tasks': loaded_tasks, + 'all_unloaded_tasks': unloaded_tasks, + 'total_tasks_count': len(all_tasks), + 'estimated_efficiency': optimal_strategy.get('efficiency_score', 0), + 'decision_reason': optimal_strategy.get('reason', ''), + 'agvDirection': agv_status.direction, + 'agvVoltage': agv_status.vol, + 'agvError': agv_status.error + } + + self.logger.debug(f"[AGV {agv_id}] 鏈�缁堜换鍔′俊鎭瀯寤哄畬鎴愶紝杩斿洖缁撴灉") + return result + + def _determine_next_best_action(self, loaded_tasks: List[Dict], + unloaded_tasks: List[Dict], current_position: str) -> Optional[Dict]: + """ + 纭畾AGV涓嬩竴姝ユ渶浼樿鍔� + + Args: + loaded_tasks: 宸茶杞界殑浠诲姟 + unloaded_tasks: 鏈杞界殑浠诲姟 + current_position: 褰撳墠浣嶇疆 + + Returns: + Optional[Dict]: 鏈�浼樿鍔ㄧ瓥鐣� + """ + # 榛樿绛栫暐锛氱患鍚堜紭鍖栧喅绛� + # 1. 濡傛灉鏈夊凡瑁呰浇浠诲姟锛屼紭鍏堥�佽揣 + if loaded_tasks: + nearest_delivery = min(loaded_tasks, key=lambda t: t['distance_to_end']) + + # 2. 濡傛灉涔熸湁鏈杞戒换鍔★紝姣旇緝璺濈鍐崇瓥 + if unloaded_tasks: + nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start']) + + # 缁煎悎浼樺寲绛栫暐锛氬鏋滃彇璐х偣鏄捐憲鏇磋繎锛屼紭鍏堝彇璐э紱鍚﹀垯浼樺厛閫佽揣 + if nearest_pickup['distance_to_start'] * 1.5 < nearest_delivery['distance_to_end']: + return { + 'strategy': 'comprehensive_optimization', + 'action': 'pickup', + 'selected_task': nearest_pickup, + 'target_position': nearest_pickup['task_info']['start_position'], + 'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position), + 'reason': f"缁煎悎浼樺寲锛氬彇璐х偣({nearest_pickup['distance_to_start']})姣旈�佽揣鐐�({nearest_delivery['distance_to_end']})鏄捐憲鏇磋繎" + } + else: + return { + 'strategy': 'comprehensive_optimization', + 'action': 'deliver', + 'selected_task': nearest_delivery, + 'target_position': nearest_delivery['task_info']['end_position'], + 'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position), + 'reason': f"缁煎悎浼樺寲锛氫紭鍏堥�佽揪宸茶杞借揣鐗╋紝閫佽揣璺濈{nearest_delivery['distance_to_end']}" + } + else: + # 鍙湁宸茶杞戒换鍔★紝鐩存帴閫佽揣 + return { + 'strategy': 'comprehensive_optimization', + 'action': 'deliver', + 'selected_task': nearest_delivery, + 'target_position': nearest_delivery['task_info']['end_position'], + 'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position), + 'reason': f"缁煎悎浼樺寲锛氶�佽揪鍞竴宸茶杞借揣鐗╋紝璺濈{nearest_delivery['distance_to_end']}" + } + + # 3. 濡傛灉鍙湁鏈杞戒换鍔★紝鍙栬揣 + elif unloaded_tasks: + nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start']) + return { + 'strategy': 'comprehensive_optimization', + 'action': 'pickup', + 'selected_task': nearest_pickup, + 'target_position': nearest_pickup['task_info']['start_position'], + 'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position), + 'reason': f"缁煎悎浼樺寲锛氬彇璐у敮涓�鏈杞戒换鍔★紝璺濈{nearest_pickup['distance_to_start']}" + } + + # 4. 娌℃湁浠诲姟 + return None + + def _parse_task_details(self, task_id: str, current_position: str) -> Optional[Dict]: + """ + 瑙f瀽浠诲姟璇︾粏淇℃伅锛屽寘鎷捣缁堢偣鍜岃窛绂� + + Args: + task_id: 浠诲姟ID + current_position: 褰撳墠浣嶇疆 + + Returns: + Optional[Dict]: 浠诲姟璇︾粏淇℃伅 + """ + self.logger.debug(f"寮�濮嬭В鏋愪换鍔� {task_id}锛屽綋鍓嶄綅缃�: {current_position}") + + start_pos, end_pos = self._fast_parse_task_start_end(task_id) + + if not start_pos or not end_pos: + self.logger.warning(f"浠诲姟 {task_id} 瑙f瀽澶辫触: start_pos={start_pos}, end_pos={end_pos}") + return None + + self.logger.debug(f"浠诲姟 {task_id} 瑙f瀽鎴愬姛: start={start_pos}, end={end_pos}") + + # 璁$畻璺濈 + current_coord = get_coordinate_from_path_id(current_position, self.path_mapping) + start_coord = get_coordinate_from_path_id(start_pos, self.path_mapping) + end_coord = get_coordinate_from_path_id(end_pos, self.path_mapping) + + if not all([current_coord, start_coord, end_coord]): + return None + + distance_to_start = calculate_manhattan_distance(current_coord, start_coord) + distance_to_end = calculate_manhattan_distance(current_coord, end_coord) + + return { + 'task_id': task_id, + 'start_position': start_pos, + 'end_position': end_pos, + 'distance_to_start': distance_to_start, + 'distance_to_end': distance_to_end, + 'start_coord': start_coord, + 'end_coord': end_coord + } + + def _calculate_delivery_efficiency(self, task_data: Dict, current_position: str) -> float: + """ + 璁$畻閫佽揣鏁堢巼鍒嗘暟 + + Args: + task_data: 浠诲姟鏁版嵁 + current_position: 褰撳墠浣嶇疆 + + Returns: + float: 鏁堢巼鍒嗘暟 (0-10) + """ + distance = task_data['distance_to_end'] + + # 璺濈瓒婅繎鏁堢巼瓒婇珮 + distance_score = max(0, 10 - distance / 5) + + # 宸茶杞界殑浠诲姟鏈夐澶栧鍔� + loaded_bonus = 3.0 if task_data['backpack_item'].loaded else 0 + + return distance_score + loaded_bonus + + def _calculate_pickup_efficiency(self, task_data: Dict, current_position: str) -> float: + """ + 璁$畻鍙栬揣鏁堢巼鍒嗘暟 + + Args: + task_data: 浠诲姟鏁版嵁 + current_position: 褰撳墠浣嶇疆 + + Returns: + float: 鏁堢巼鍒嗘暟 (0-10) + """ + distance_to_start = task_data['distance_to_start'] + + # 璺濈瓒婅繎鏁堢巼瓒婇珮 + distance_score = max(0, 10 - distance_to_start / 3) + + # 寰堣繎鐨勫彇璐х偣鏈夐珮濂栧姳 + if distance_to_start <= 3: + distance_score += 5.0 + elif distance_to_start <= 5: + distance_score += 2.0 + + return distance_score + + def _fast_parse_task_start_end(self, task_id: str) -> Tuple[Optional[str], Optional[str]]: + """ + 瑙f瀽浠诲姟璧风偣鍜岀粓鐐� + 鏀寔涓ょ鏍煎紡锛� + 1. 鏍囧噯鏍煎紡锛歺xx_start_end + 2. 绠�鍗曟牸寮忥細鏁板瓧ID锛堥�氳繃hash鍒嗛厤浣嶇疆锛� + + Args: + task_id: 浠诲姟ID + + Returns: + Tuple[Optional[str], Optional[str]]: (璧风偣, 缁堢偣) + """ + self.logger.debug(f"瑙f瀽浠诲姟ID: {task_id}") + + # 鏂规硶1锛氬皾璇曟爣鍑嗘牸寮忚В鏋愶紙鍖呭惈涓嬪垝绾匡級 + if "_" in task_id: + parts = task_id.split("_") + self.logger.debug(f"浠诲姟ID鍖呭惈涓嬪垝绾匡紝鍒嗗壊缁撴灉: {parts}") + + if len(parts) >= 3: + potential_start = parts[-2] + potential_end = parts[-1] + + start_valid = potential_start in self.path_mapping + end_valid = potential_end in self.path_mapping + + self.logger.debug(f"鏍囧噯鏍煎紡瑙f瀽缁撴灉: start={potential_start}(valid={start_valid}), end={potential_end}(valid={end_valid})") + + if start_valid and end_valid: + return potential_start, potential_end + + self.logger.debug(f"浠诲姟ID鍒嗗壊鍚庨儴鍒嗘暟閲忎笉瓒�: {len(parts)} < 3") + + # 鏂规硶2锛氱畝鍗曟暟瀛桰D鐨勫閫夋満鍒讹紙閫氳繃hash鍒嗛厤浣嶇疆锛� + self.logger.debug(f"灏濊瘯绠�鍗曟暟瀛桰D澶囬�夎В鏋愭満鍒�") + + # 鑾峰彇鎵�鏈夊彲鐢ㄤ綅缃� + all_positions = list(self.path_mapping.keys()) + if len(all_positions) < 2: + self.logger.warning(f"璺緞鏄犲皠浣嶇疆涓嶈冻锛屾棤娉曚负浠诲姟鍒嗛厤璧风粓鐐�") + return None, None + + # 浣跨敤浠诲姟ID鐨刪ash鍊兼潵鍒嗛厤璧风偣鍜岀粓鐐� + try: + task_hash = abs(hash(task_id)) + + # 鍒嗛厤璧风偣锛堜娇鐢╤ash鐨勫墠涓�鍗婏級 + start_index = task_hash % len(all_positions) + start_pos = all_positions[start_index] + + # 鍒嗛厤缁堢偣锛堜娇鐢╤ash鐨勫悗涓�鍗婏紝纭繚涓庤捣鐐逛笉鍚岋級 + end_index = (task_hash // len(all_positions)) % len(all_positions) + if end_index == start_index: + end_index = (end_index + 1) % len(all_positions) + end_pos = all_positions[end_index] + + self.logger.debug(f"绠�鍗旾D澶囬�夎В鏋愭垚鍔�: 浠诲姟{task_id} -> start={start_pos}, end={end_pos}") + return start_pos, end_pos + + except Exception as e: + self.logger.error(f"绠�鍗旾D澶囬�夎В鏋愬け璐�: {e}") + + self.logger.warning(f"浠诲姟ID {task_id} 鎵�鏈夎В鏋愭柟娉曢兘澶辫触锛岃繑鍥� None, None") + return None, None + + def _fast_parse_target(self, task_id: str, current_position: str, loaded: bool) -> Optional[str]: + """ + 瑙f瀽浠诲姟鐩爣浣嶇疆 + + Args: + task_id: 浠诲姟ID + current_position: 褰撳墠浣嶇疆 + loaded: 鏄惁宸茶杞� + + Returns: + Optional[str]: 鐩爣浣嶇疆鐮� + """ + _, task_end = self._fast_parse_task_start_end(task_id) + if task_end: + return task_end + + # 鍩轰簬hash鍊煎拰瑁呰浇鐘舵�佸揩閫熼�夋嫨 + hash_val = hash(task_id) % 1000 + + if loaded: + # 宸茶杞斤紝閫夋嫨鍑哄簱鍖哄煙 + target_positions = [pos for pos in self.delivery_positions if pos != current_position] + if not target_positions: + target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position] + else: + # 鏈杞斤紝閫夋嫨浠撳簱鍖哄煙 + target_positions = [pos for pos in self.pickup_positions if pos != current_position] + if not target_positions: + target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position] + + if target_positions: + return target_positions[hash_val % len(target_positions)] + + return None + + def extract_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]: + """鎻愬彇鎵�鏈堿GV姝e湪鎵ц鐨勪换鍔�""" + self.logger.debug(f"寮�濮嬭浆鎹换鍔′负鏍囧噯鏍煎紡锛孉GV鏁伴噺: {len(agv_status_list)}") + + optimized_tasks = self.extract_optimized_executing_tasks(agv_status_list) + + self.logger.debug(f"鎻愬彇缁撴灉鏁伴噺: {len(optimized_tasks)}") + + legacy_tasks = [] + for i, task in enumerate(optimized_tasks): + try: + self.logger.debug(f"杞崲浠诲姟 #{i+1}: AGV {task['agvId']}") + + primary_task = task['primary_task'] + task_info = primary_task['task_info'] + backpack_item = primary_task['backpack_item'] + + self.logger.debug(f" - primary_task瀛樺湪: {primary_task is not None}") + self.logger.debug(f" - task_info瀛樺湪: {task_info is not None}") + self.logger.debug(f" - backpack_item瀛樺湪: {backpack_item is not None}") + + # 鏍规嵁action纭畾鐘舵�� + is_pickup_action = task['next_action'] == 'pickup' + + self.logger.debug(f" - next_action: {task['next_action']}") + self.logger.debug(f" - is_pickup_action: {is_pickup_action}") + self.logger.debug(f" - target_position: {task['target_position']}") + + # 淇濇寔鍘熸湁鐨勬爣鍑咼SON鏍煎紡 + legacy_task = { + 'agvId': task['agvId'], + 'taskId': task_info['task_id'], + 'currentPosition': task['currentPosition'], + 'backpackIndex': backpack_item.index, + 'status': 'executing', + 'agvDirection': task['agvDirection'], + 'agvVoltage': task['agvVoltage'], + 'agvError': task['agvError'], + 'loaded': backpack_item.loaded, + 'needPickup': is_pickup_action, + 'targetPosition': task['target_position'], + 'taskStart': task_info['start_position'], + 'taskEnd': task_info['end_position'] + } + + self.logger.debug(f" - 浠诲姟鏋勫缓鎴愬姛: {legacy_task['agvId']} -> {legacy_task['targetPosition']}") + legacy_tasks.append(legacy_task) + + except Exception as e: + self.logger.error(f"杞崲浠诲姟 #{i+1} 澶辫触: {e}") + self.logger.error(f"浠诲姟鍐呭: {task}") + continue + + self.logger.info(f"浣跨敤缁煎悎浼樺寲绛栫暐锛屼负 {len(legacy_tasks)} 涓狝GV鐢熸垚鏍囧噯鏍煎紡璺緞") + return legacy_tasks + + +class BatchPathPlanner: + """鎵归噺璺緞瑙勫垝鍣�""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]], + algorithm_type: str = "A_STAR", agv_manager=None): + """ + 鍒濆鍖栨壒閲忚矾寰勮鍒掑櫒 + + Args: + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + algorithm_type: 鍩虹璺緞瑙勫垝绠楁硶绫诲瀷 + agv_manager: 鐢ㄤ簬鑾峰彇AGV鐘舵�佷俊鎭� + """ + self.path_mapping = path_mapping + self.algorithm_type = algorithm_type + self.agv_manager = agv_manager + self.logger = logging.getLogger(__name__) + + # 鍒濆鍖栦紭鍖栫粍浠� + self.task_extractor = ExecutingTaskExtractor(path_mapping) + self.collision_detector = FastCollisionDetector(min_distance=3.0) + self.collision_resolver = FastCollisionResolver(path_mapping, self.collision_detector, agv_manager) + self.base_planner = PathPlanningFactory.create_path_planner(algorithm_type, path_mapping) + + def plan_all_agv_paths(self, agv_status_list: List[AGVStatus], + include_idle_agv: bool = False, + constraints: List[Tuple[int, int, float]] = None) -> Dict: + """ + 涓烘墍鏈堿GV瑙勫垝璺緞锛堟瘡涓狝GV鍙敓鎴愪竴涓矾寰勶級 + + Args: + agv_status_list: AGV鐘舵�佸垪琛� + include_idle_agv: 鏄惁鍖呭惈绌洪棽AGV + constraints: 璺緞绾︽潫鏉′欢 + + Returns: + Dict: 鍖呭惈鎵�鏈夎矾寰勪俊鎭殑缁撴灉 + """ + start_time = time.time() + + # 1. 鎻愬彇鎵ц涓殑浠诲姟锛堜娇鐢ㄥ悜鍚庡吋瀹圭殑鏂规硶锛� + executing_tasks = self.task_extractor.extract_executing_tasks(agv_status_list) + + # 2. 璺緞瑙勫垝 + planned_paths = [] + total_planning_time = 0 + planned_agv_ids = set() + + for task in executing_tasks: + agv_id = task['agvId'] + current_pos = task['currentPosition'] + target_pos = task['targetPosition'] + task_id = task['taskId'] + + self.logger.debug(f"寮�濮嬭矾寰勮鍒�: AGV {agv_id}, 浠诲姟 {task_id}") + self.logger.debug(f" - current_pos: {current_pos} ({type(current_pos)})") + self.logger.debug(f" - target_pos: {target_pos} ({type(target_pos)})") + + # 閬垮厤閲嶅瑙勫垝 + if agv_id in planned_agv_ids: + self.logger.debug(f" - 璺宠繃AGV {agv_id}锛氬凡瑙勫垝") + continue + + # 楠岃瘉浣嶇疆鏍煎紡 + if not current_pos or not target_pos: + self.logger.error(f" - AGV {agv_id} 浣嶇疆淇℃伅鏃犳晥: current={current_pos}, target={target_pos}") + continue + + # 璺緞鏍煎紡鏍囧噯鍖� + from common.utils import normalize_path_id + normalized_current = normalize_path_id(current_pos) + normalized_target = normalize_path_id(target_pos) + + self.logger.debug(f" - 鏍煎紡鏍囧噯鍖�: {current_pos} -> {normalized_current}, {target_pos} -> {normalized_target}") + + # 璺緞瑙勫垝 + path_start_time = time.time() + self.logger.debug(f" - 璋冪敤base_planner.plan_path({normalized_current}, {normalized_target})") + planned_path = self.base_planner.plan_path(normalized_current, normalized_target, constraints) + path_end_time = time.time() + + planning_time = (path_end_time - path_start_time) * 1000 + total_planning_time += planning_time + + self.logger.debug(f" - 璺緞瑙勫垝缁撴灉: {planned_path is not None}") + if planned_path: + self.logger.debug(f" - 璺緞闀垮害: {len(planned_path.codeList) if hasattr(planned_path, 'codeList') else 'N/A'}") + else: + self.logger.error(f" - AGV {agv_id} 璺緞瑙勫垝澶辫触: {current_pos} -> {target_pos}") + + if planned_path: + # 鐢熸垚璺緞瀛楀吀 + seg_id = generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2") + + path_dict = { + 'agvId': agv_id, + 'segId': seg_id, + 'codeList': [] + } + + # 杞崲璺緞鐐规牸寮� + for i, path_code in enumerate(planned_path.codeList): + if isinstance(path_code, PathCode): + action_type = "2" # 浠诲姟绫诲瀷 + pos_type = None + backpack_level = task.get('backpackIndex', 0) + is_target_point = False + + # 纭畾浣嶇疆绫诲瀷锛氭渶鍚庝竴涓偣鐨勫姩浣滅被鍨� + if i == len(planned_path.codeList) - 1: # 鏈�鍚庝竴涓矾寰勭偣 + is_target_point = True + if task.get('loaded', False): + # 宸茶杞戒换鍔★細褰撳墠璺緞鏄幓缁堢偣鏀捐揣 + pos_type = "2" # 鏀捐揣 + self.logger.debug(f"AGV {agv_id} 鍦ㄧ粓鐐� {path_code.code} 璁剧疆鏀捐揣鍔ㄤ綔") + else: + # 鏈杞戒换鍔★細褰撳墠璺緞鏄幓璧风偣鍙栬揣 + pos_type = "1" # 鍙栬揣 + self.logger.debug(f"AGV {agv_id} 鍦ㄨ捣鐐� {path_code.code} 璁剧疆鍙栬揣鍔ㄤ綔") + + enhanced_code = generate_navigation_code( + code=path_code.code, + direction=path_code.direction, + action_type=action_type, + task_id=task_id, + pos_type=pos_type, + backpack_level=backpack_level, + is_target_point=is_target_point + ) + path_dict['codeList'].append(enhanced_code) + else: + enhanced_code = generate_navigation_code( + code=path_code.get('code', ''), + direction=path_code.get('direction', '90'), + action_type="2", + task_id=task_id + ) + path_dict['codeList'].append(enhanced_code) + + planned_paths.append(path_dict) + planned_agv_ids.add(agv_id) + self.logger.debug(f"AGV {agv_id} 蹇�熻鍒�: {current_pos} -> {target_pos}") + + # 3. 绌洪棽AGV澶勭悊 + if include_idle_agv: + idle_paths = self._fast_plan_idle_agv_paths(agv_status_list, executing_tasks, constraints, planned_agv_ids) + planned_paths.extend(idle_paths) + + # 4. 鍘婚噸妫�鏌� + final_paths = [] + final_agv_ids = set() + for path in planned_paths: + agv_id = path['agvId'] + if agv_id not in final_agv_ids: + final_paths.append(path) + final_agv_ids.add(agv_id) + planned_paths = final_paths + + # 5. 纰版挒妫�娴嬪拰瑙e喅 + conflicts = self.collision_detector.detect_conflicts(planned_paths) + + if conflicts: + self.logger.info(f"鍙戠幇 {len(conflicts)} 涓鎾烇紝蹇�熻В鍐充腑...") + planned_paths = self.collision_resolver.resolve_conflicts(planned_paths, conflicts, executing_tasks) + remaining_conflicts = self.collision_detector.detect_conflicts(planned_paths) + else: + remaining_conflicts = [] + + end_time = time.time() + total_time = (end_time - start_time) * 1000 + + # 6. 鏋勫缓浼樺寲缁撴灉 + result = { + 'totalAgvs': len(agv_status_list), + 'executingTasksCount': len(executing_tasks), + 'plannedPathsCount': len(planned_paths), + 'totalPlanningTime': round(total_time, 2), + 'pathPlanningTime': round(total_planning_time, 2), + 'conflictsDetected': len(conflicts), + 'remainingConflicts': len(remaining_conflicts), + 'algorithm': f"Optimized_{self.algorithm_type}", + 'fourDirectionCompliant': True, + 'collisionFree': len(remaining_conflicts) == 0, + 'plannedPaths': planned_paths, + 'executingTasks': executing_tasks + } + + self.logger.info(f"浼樺寲鎵归噺璺緞瑙勫垝瀹屾垚 - 鎬籄GV: {result['totalAgvs']}, " + f"鎵ц涓换鍔�: {result['executingTasksCount']}, " + f"瑙勫垝璺緞: {result['plannedPathsCount']}, " + f"鎬昏�楁椂: {result['totalPlanningTime']:.2f}ms, " + f"鏃犵鎾�: {result['collisionFree']}") + + return result + + def _fast_plan_idle_agv_paths(self, agv_status_list: List[AGVStatus], + executing_tasks: List[Dict], + constraints: List[Tuple[int, int, float]] = None, + planned_agv_ids: Set[str] = None) -> List[Dict]: + """ + 涓虹┖闂睞GV瑙勫垝璺緞 + """ + idle_paths = [] + executing_agv_ids = {task['agvId'] for task in executing_tasks} + + if planned_agv_ids is None: + planned_agv_ids = executing_agv_ids.copy() + + for agv_status in agv_status_list: + agv_id = agv_status.agvId + + if agv_id in planned_agv_ids: + continue + + if self._is_agv_idle_fast(agv_status): + current_pos = agv_status.position + target_pos = self._get_idle_agv_target_fast(agv_id, current_pos) + + if target_pos and target_pos != current_pos: + planned_path = self.base_planner.plan_path(current_pos, target_pos, constraints) + + if planned_path: + action_type = "3" if self._is_charging_path(target_pos) else "4" + seg_id = generate_segment_id(agv_id=agv_id, target_position=target_pos, action_type=action_type) + + path_dict = { + 'agvId': agv_id, + 'segId': seg_id, + 'codeList': [] + } + + for i, path_code in enumerate(planned_path.codeList): + if isinstance(path_code, PathCode): + # 绌洪棽AGV涓嶉渶瑕乼askId銆乸osType鍜宭ev瀛楁 + enhanced_code = generate_navigation_code( + code=path_code.code, + direction=path_code.direction, + action_type=action_type, + is_target_point=False # 绌洪棽AGV璺緞鏃犵洰鏍囩偣姒傚康 + ) + path_dict['codeList'].append(enhanced_code) + + idle_paths.append(path_dict) + planned_agv_ids.add(agv_id) + + return idle_paths + + def _is_agv_idle_fast(self, agv_status: AGVStatus) -> bool: + """鍒ゆ柇AGV鏄惁绌洪棽""" + if agv_status.status not in [0, 1, 2] or agv_status.error != 0: + return False + + # 妫�鏌ヨ儗绡� + if agv_status.backpack: + for item in agv_status.backpack[:2]: # 鍙鏌ュ墠涓や釜 + if item.loaded and item.execute: + return False + + return True + + def _get_idle_agv_target_fast(self, agv_id: str, current_position: str) -> Optional[str]: + """鑾峰彇绌洪棽AGV鐩爣浣嶇疆""" + hash_val = hash(agv_id) % 1000 + + # 閫夋嫨閫昏緫 + charging_positions = [pos for pos in self.task_extractor.charging_positions if pos != current_position] + if charging_positions: + return charging_positions[hash_val % len(charging_positions)] + + standby_positions = [pos for pos in self.task_extractor.standby_positions if pos != current_position] + if standby_positions: + return standby_positions[hash_val % len(standby_positions)] + + all_positions = [pos for pos in self.path_mapping.keys() if pos != current_position] + if all_positions: + return all_positions[hash_val % len(all_positions)] + + return None + + def _is_charging_path(self, target_position: str) -> bool: + """ + 鍒ゆ柇鐩爣浣嶇疆鏄惁涓哄厖鐢电珯锛堝悗鏈熶慨鏀癸級 + """ + if not target_position: + return False + + # 鍏呯數绔欒瘑鍒� + return target_position.startswith("2") or "charge" in target_position.lower() + + +class PathPlanner: + """璺緞瑙勫垝鍣ㄥ熀绫�""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]]): + """ + 鍒濆鍖栬矾寰勮鍒掑櫒 + + Args: + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + """ + self.path_mapping = path_mapping + self.logger = logging.getLogger(__name__) + + def plan_path(self, start_code: str, end_code: str, + constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]: + """ + 瑙勫垝浠庤捣鐐瑰埌缁堢偣鐨勮矾寰� + + Args: + start_code: 璧峰璺緞鐐笽D + end_code: 鐩爣璺緞鐐笽D + constraints: 绾︽潫鏉′欢鍒楄〃 + + Returns: + Optional[PlannedPath]: 瑙勫垝鐨勮矾寰勶紝濡傛灉鏃犳硶瑙勫垝鍒欒繑鍥濶one + """ + raise NotImplementedError("瀛愮被蹇呴』瀹炵幇姝ゆ柟娉�") + + def is_valid_path_point(self, code: str) -> bool: + """ + 妫�鏌ヨ矾寰勭偣ID鏄惁鏈夋晥 + + Args: + code: 璺緞鐐笽D + + Returns: + bool: 鏄惁鏈夋晥 + """ + return code in self.path_mapping + + +class AStarPathPlanner(PathPlanner): + """A*璺緞瑙勫垝鍣�""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]], turn_cost: float = 0.5): + """ + 鍒濆鍖朅*璺緞瑙勫垝鍣� + + Args: + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + turn_cost: 杞悜浠d环 + """ + super().__init__(path_mapping) + self.turn_cost = turn_cost + self.logger = logging.getLogger(__name__) + + # 鍥涗釜鏂瑰悜锛氫笂銆佸彸銆佷笅銆佸乏 (涓ユ牸鍥涙柟鍚戠Щ鍔�) + self.directions = [(0, -1), (1, 0), (0, 1), (-1, 0)] + self.direction_angles = ["270", "0", "90", "180"] + + # 棰勮绠楀潗鏍囨槧灏勶紙蹇呴』鍦� _build_adjacency_list 涔嬪墠鍒濆鍖栵級 + self.coord_to_code = {} + for code, data in path_mapping.items(): + if isinstance(data, dict) and 'x' in data and 'y' in data: + self.coord_to_code[(data['x'], data['y'])] = code + + # 棰勮绠楅偦鎺ヨ〃 + self.adjacency = self._build_adjacency_list() + + # 鎬ц兘鐩戞帶 + self.planning_count = 0 + self.total_planning_time = 0.0 + + self.logger.debug(f"A*瑙勫垝鍣ㄥ垵濮嬪寲瀹屾垚锛岄偦鎺ヨ〃: {len(self.adjacency)} 涓妭鐐�") + + def _build_adjacency_list(self) -> Dict[str, List[Tuple[str, str]]]: + """ + 棰勮绠楅偦鎺ヨ〃 + + Returns: + Dict[str, List[Tuple[str, str]]]: {鑺傜偣: [(閭诲眳鑺傜偣, 绉诲姩鏂瑰悜), ...]} + """ + adjacency = defaultdict(list) + + for code, coord_data in self.path_mapping.items(): + x, y = coord_data['x'], coord_data['y'] + + # 妫�鏌ュ洓涓柟鍚戠殑閭诲眳 + for i, (dx, dy) in enumerate(self.directions): + next_x, next_y = x + dx, y + dy + + # 鏌ユ壘閭诲眳浠g爜 + neighbor_coord = (next_x, next_y) + if neighbor_coord in self.coord_to_code: + neighbor_code = self.coord_to_code[neighbor_coord] + direction = self.direction_angles[i] + adjacency[code].append((neighbor_code, direction)) + + return adjacency + + def plan_path(self, start_code: str, end_code: str, + constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]: + """ + Args: + start_code: 璧峰璺緞鐐笽D + end_code: 鐩爣璺緞鐐笽D + constraints: 绾︽潫鏉′欢鍒楄〃 + + Returns: + Optional[PlannedPath]: 瑙勫垝鐨勮矾寰� + """ + # 楠岃瘉 + if not self.is_valid_path_point(start_code) or not self.is_valid_path_point(end_code): + return None + + if start_code == end_code: + return PlannedPath(agvId="", codeList=[PathCode(code=start_code, direction="90")]) + + # 鑾峰彇鍧愭爣锛堜娇鐢ㄩ璁$畻鏄犲皠锛� + start_coord = (self.path_mapping[start_code]['x'], self.path_mapping[start_code]['y']) + end_coord = (self.path_mapping[end_code]['x'], self.path_mapping[end_code]['y']) + + # A*瀹炵幇 + # 浣跨敤鍏冪粍鍫嗭紝鍑忓皯瀵硅薄鍒涘缓寮�閿� + open_set = [(0, start_code, None, [])] # (f_score, code, parent, path) + closed_set = set() + g_scores = {start_code: 0} + + # 棰勮绠楃害鏉熸鏌ュ嚱鏁� + constraint_check = self._build_constraint_checker(constraints) if constraints else None + + while open_set: + f_score, current_code, parent_code, path = heapq.heappop(open_set) + + if current_code in closed_set: + continue + + closed_set.add(current_code) + current_path = path + [current_code] + + if current_code == end_code: + # 閲嶅缓璺緞 + return self._build_path_result(current_path) + + # 浣跨敤棰勮绠楃殑閭绘帴琛� + for neighbor_code, direction in self.adjacency[current_code]: + if neighbor_code in closed_set: + continue + + # 绾︽潫妫�鏌� + if constraint_check and constraint_check(neighbor_code): + continue + + # 浠d环璁$畻 + g_cost = g_scores[current_code] + 1 + + if neighbor_code not in g_scores or g_cost < g_scores[neighbor_code]: + g_scores[neighbor_code] = g_cost + + # 鍚彂寮忚绠� + neighbor_coord = (self.path_mapping[neighbor_code]['x'], + self.path_mapping[neighbor_code]['y']) + h_cost = abs(neighbor_coord[0] - end_coord[0]) + abs(neighbor_coord[1] - end_coord[1]) + f_cost = g_cost + h_cost + + heapq.heappush(open_set, (f_cost, neighbor_code, current_code, current_path)) + + return None + + def _build_constraint_checker(self, constraints: List[Tuple[int, int, float]]): + """鏋勫缓绾︽潫妫�鏌ュ嚱鏁�""" + def check_constraints(code: str) -> bool: + coord = (self.path_mapping[code]['x'], self.path_mapping[code]['y']) + for cx, cy, radius in constraints: + if (coord[0] - cx) ** 2 + (coord[1] - cy) ** 2 <= radius ** 2: + return True + return False + return check_constraints + + def _build_path_result(self, path_codes: List[str]) -> PlannedPath: + """鏋勫缓璺緞缁撴灉""" + result_codes = [] + + for i, code in enumerate(path_codes): + direction = "90" # 榛樿鏂瑰悜 + + if i < len(path_codes) - 1: + # 浣跨敤棰勮绠楃殑閭绘帴琛ㄦ煡鎵炬柟鍚� + next_code = path_codes[i + 1] + for neighbor_code, neighbor_direction in self.adjacency[code]: + if neighbor_code == next_code: + direction = neighbor_direction + break + + result_codes.append(PathCode(code=code, direction=direction)) + + return PlannedPath(agvId="", codeList=result_codes) + + +# 纰版挒妫�娴嬪櫒 +class FastCollisionDetector: + """纰版挒妫�娴嬪櫒""" + + def __init__(self, min_distance: float = 3.0): + self.min_distance = min_distance + self.logger = logging.getLogger(__name__) + + def detect_conflicts(self, planned_paths: List[Dict]) -> List[Dict]: + """ + 纰版挒妫�娴� - 鍩轰簬鏃堕棿姝ョ殑浼樺寲妫�娴� + + Args: + planned_paths: 瑙勫垝璺緞鍒楄〃 + + Returns: + List[Dict]: 鍐茬獊鍒楄〃 + """ + conflicts = [] + if len(planned_paths) < 2: + return conflicts + + # 鏋勫缓鏃堕棿-浣嶇疆鏄犲皠 + time_positions = defaultdict(list) + + for path_data in planned_paths: + agv_id = path_data['agvId'] + code_list = path_data.get('codeList', []) + + for time_step, path_code in enumerate(code_list): + # 蹇�熸彁鍙栦綅缃爜 + if isinstance(path_code, dict): + position = path_code.get('code', '') + elif hasattr(path_code, 'code'): + position = path_code.code + else: + position = str(path_code) + + if position: + time_positions[time_step].append((agv_id, position)) + + # 蹇�熷啿绐佹娴� + for time_step, agv_positions in time_positions.items(): + if len(agv_positions) < 2: + continue + + # 浣嶇疆鍒嗙粍妫�鏌� + position_groups = defaultdict(list) + for agv_id, position in agv_positions: + position_groups[position].append(agv_id) + + # 鍙戠幇鍐茬獊 + for position, agv_list in position_groups.items(): + if len(agv_list) > 1: + conflicts.append({ + 'type': 'position_conflict', + 'time_step': time_step, + 'position': position, + 'agv_ids': agv_list, + 'severity': 'high' + }) + + self.logger.debug(f"蹇�熸娴嬪埌 {len(conflicts)} 涓啿绐�") + return conflicts + + +# 纰版挒瑙e喅鍣� +class FastCollisionResolver: + """纰版挒瑙e喅鍣�""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]], + collision_detector: FastCollisionDetector, agv_manager=None): + self.path_mapping = path_mapping + self.collision_detector = collision_detector + self.agv_manager = agv_manager + self.logger = logging.getLogger(__name__) + + def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Dict], executing_tasks: List[Dict] = None) -> List[Dict]: + """ + 瑙e喅鍐茬獊 + + Args: + planned_paths: 瑙勫垝璺緞鍒楄〃 + conflicts: 鍐茬獊鍒楄〃 + executing_tasks: 鎵ц浠诲姟鍒楄〃 + + Returns: + List[Dict]: 瑙e喅鍐茬獊鍚庣殑璺緞鍒楄〃 + """ + if not conflicts: + return planned_paths + + # 鏋勫缓璺緞瀛楀吀渚夸簬蹇�熻闂� + paths_dict = {path['agvId']: path for path in planned_paths} + + # 鎸夋椂闂存鎺掑簭澶勭悊鍐茬獊 + sorted_conflicts = sorted(conflicts, key=lambda x: x.get('time_step', 0)) + + for conflict in sorted_conflicts: + if conflict['type'] == 'position_conflict': + agv_ids = conflict['agv_ids'] + time_step = conflict['time_step'] + + # 绠�鍗曠瓥鐣ワ細淇濈暀绗竴涓狝GV锛屽叾浠栧欢杩� + sorted_agv_ids = sorted(agv_ids) + + for i, agv_id in enumerate(sorted_agv_ids[1:], 1): + if agv_id in paths_dict: + self._add_delay_to_path(paths_dict[agv_id], delay_steps=i) + self.logger.debug(f"涓篈GV {agv_id} 娣诲姞 {i} 姝ュ欢杩�") + + return list(paths_dict.values()) + + def _add_delay_to_path(self, path_data: Dict, delay_steps: int): + """涓鸿矾寰勬坊鍔犲欢杩熸楠�""" + if delay_steps <= 0 or not path_data.get('codeList'): + return + + # 鍦ㄨ矾寰勫紑濮嬪閲嶅绗竴涓綅缃� + first_code = path_data['codeList'][0] + + # 鍒涘缓寤惰繜浠g爜 + if isinstance(first_code, dict): + delay_code = first_code.copy() + else: + delay_code = { + 'code': first_code.code if hasattr(first_code, 'code') else str(first_code), + 'direction': first_code.direction if hasattr(first_code, 'direction') else "90" + } + + # 娣诲姞寤惰繜姝ラ + for _ in range(delay_steps): + path_data['codeList'].insert(0, delay_code) + + def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]: + """楠岃瘉鍥涘悜绉诲姩绾︽潫""" + return planned_paths + + +class PathPlanningFactory: + """璺緞瑙勫垝鍣ㄥ伐鍘傜被""" + + @staticmethod + def create_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]]) -> PathPlanner: + """ + 鍒涘缓璺緞瑙勫垝鍣ㄥ疄渚� + + Args: + algorithm_type: 绠楁硶绫诲瀷 + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + + Returns: + PathPlanner: 璺緞瑙勫垝鍣ㄥ疄渚� + """ + algorithm_upper = algorithm_type.upper() + + if algorithm_upper in ["A_STAR", "ASTAR", "A*", "DIJKSTRA", "GREEDY"]: + return AStarPathPlanner(path_mapping) + else: + # 榛樿A*绠楁硶 + logging.getLogger(__name__).warning(f"鏈煡绠楁硶绫诲瀷 {algorithm_type}锛屼娇鐢ㄩ粯璁*绠楁硶") + return AStarPathPlanner(path_mapping) + + @staticmethod + def create_batch_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]], + agv_manager=None) -> BatchPathPlanner: + """ + 鍒涘缓鎵归噺璺緞瑙勫垝鍣ㄥ疄渚� + + Args: + algorithm_type: 鍩虹绠楁硶绫诲瀷 + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + agv_manager: AGV绠$悊鍣紝鐢ㄤ簬鑾峰彇AGV鐘舵�佷俊鎭� + + Returns: + BatchPathPlanner: 鎵归噺璺緞瑙勫垝鍣ㄥ疄渚� + """ + return BatchPathPlanner(path_mapping, algorithm_type, agv_manager) + +# 瀵煎嚭鏍稿績绫诲拰鍑芥暟 +__all__ = [ + 'ExecutingTaskExtractor', + 'BatchPathPlanner', + 'PathPlanner', + 'AStarPathPlanner', + 'PathPlanningFactory', + 'FastCollisionDetector', + 'FastCollisionResolver' +] \ No newline at end of file -- Gitblit v1.9.1