From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001
From: zhang <zc857179121@qq.com>
Date: 星期三, 02 七月 2025 13:12:26 +0800
Subject: [PATCH] 初版提交

---
 algorithm_system/algorithms/path_planning.py | 1131 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 1,131 insertions(+), 0 deletions(-)

diff --git a/algorithm_system/algorithms/path_planning.py b/algorithm_system/algorithms/path_planning.py
new file mode 100644
index 0000000..8820f7a
--- /dev/null
+++ b/algorithm_system/algorithms/path_planning.py
@@ -0,0 +1,1131 @@
+"""
+璺緞瑙勫垝绠楁硶
+"""
+import heapq
+import random
+import math
+import time
+import logging
+from typing import Dict, List, Tuple, Optional, Set, Any
+from collections import defaultdict, deque
+
+from common.data_models import PlannedPath, PathCode, AGVStatus, BackpackData
+from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance, generate_segment_id, generate_navigation_code
+from algorithm_system.algorithms.collision_detection import CollisionDetector, CollisionResolver
+
+
+class ExecutingTaskExtractor:
+    """鎵ц涓换鍔℃彁鍙栧櫒"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
+        """
+        鍒濆鍖栦换鍔℃彁鍙栧櫒
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+        """
+        self.path_mapping = path_mapping
+        self.logger = logging.getLogger(__name__)
+        
+        # 棰勮绠楀潗鏍囨槧灏�
+        self.coord_to_code = {(data['x'], data['y']): code for code, data in path_mapping.items()}
+        
+        # 棰勫垎绫讳綅缃被鍨�
+        self.pickup_positions = [pos for pos in path_mapping.keys() if pos.startswith("1")]
+        self.charging_positions = [pos for pos in path_mapping.keys() if pos.startswith("2")]
+        self.delivery_positions = [pos for pos in path_mapping.keys() if pos.startswith("3")]
+        self.standby_positions = [pos for pos in path_mapping.keys() if pos.startswith("4")]
+    
+    def extract_optimized_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
+        """鏅鸿兘鎻愬彇AGV鎵ц浠诲姟"""
+        executing_tasks = []
+        
+        for agv_status in agv_status_list:
+            agv_id = agv_status.agvId
+            current_position = agv_status.position
+            
+            if not agv_status.backpack:
+                continue
+                
+            optimized_task = self._analyze_all_backpack_tasks(
+                agv_id, current_position, agv_status.backpack, agv_status
+            )
+            
+            if optimized_task:
+                executing_tasks.append(optimized_task)
+                
+        self.logger.info(f"鏅鸿兘鍒嗘瀽 {len(executing_tasks)} 涓狝GV鐨勮儗绡撲换鍔″苟鐢熸垚鏈�浼樿矾寰�")
+        return executing_tasks
+    
+    def _analyze_all_backpack_tasks(self, agv_id: str, current_position: str, 
+                                   backpack: List[BackpackData], agv_status: AGVStatus) -> Optional[Dict]:
+        """
+        鍒嗘瀽鎵�鏈夎儗绡撲换鍔★紝纭畾鏈�浼樻墽琛岀瓥鐣�
+        """
+        # 1. 鎻愬彇骞跺垎绫绘墍鏈夋湁鏁堜换鍔�
+        loaded_tasks = []      # 宸茶杞界殑浠诲姟
+        unloaded_tasks = []    # 鏈杞界殑浠诲姟
+        all_tasks = []         # 鎵�鏈変换鍔�
+        
+        self.logger.debug(f"[AGV {agv_id}] 寮�濮嬪垎鏋愯儗绡撲换鍔★紝浣嶇疆: {current_position}")
+        
+        for i, bp in enumerate(backpack):
+            self.logger.debug(f"[AGV {agv_id}] 鑳岀瘬浣嶇疆{i+1}: taskId={bp.taskId}, loaded={bp.loaded}, execute={bp.execute}")
+            
+            if not bp.taskId:
+                self.logger.debug(f"[AGV {agv_id}] 鑳岀瘬浣嶇疆{i+1}: 璺宠繃锛堟棤浠诲姟ID锛�")
+                continue
+                
+            # 灏濊瘯瑙f瀽浠诲姟璇︽儏
+            task_info = self._parse_task_details(bp.taskId, current_position)
+            if not task_info:
+                self.logger.warning(f"[AGV {agv_id}] 浠诲姟 {bp.taskId} 瑙f瀽澶辫触锛岃烦杩�")
+                continue
+                
+            task_data = {
+                'backpack_item': bp,
+                'task_info': task_info,
+                'distance_to_start': task_info.get('distance_to_start', float('inf')),
+                'distance_to_end': task_info.get('distance_to_end', float('inf'))
+            }
+            
+            all_tasks.append(task_data)
+            
+            if bp.loaded:
+                loaded_tasks.append(task_data)
+                self.logger.debug(f"[AGV {agv_id}] 浠诲姟 {bp.taskId} 宸茶杞斤紝璺濈缁堢偣: {task_info.get('distance_to_end', 'N/A')}")
+            else:
+                unloaded_tasks.append(task_data)
+                self.logger.debug(f"[AGV {agv_id}] 浠诲姟 {bp.taskId} 鏈杞斤紝璺濈璧风偣: {task_info.get('distance_to_start', 'N/A')}")
+        
+        self.logger.debug(f"[AGV {agv_id}] 浠诲姟鍒嗘瀽缁撴灉: 鎬讳换鍔�={len(all_tasks)}, 宸茶杞�={len(loaded_tasks)}, 鏈杞�={len(unloaded_tasks)}")
+        
+        # 2. 濡傛灉娌℃湁浠讳綍浠诲姟锛岃繑鍥濶one
+        if not all_tasks:
+            self.logger.debug(f"[AGV {agv_id}] 娌℃湁鏈夋晥浠诲姟锛岃繑鍥濶one")
+            return None
+            
+        # 3. 鍐崇瓥鏈�浼樿矾寰勭瓥鐣�
+        optimal_strategy = self._determine_next_best_action(
+            loaded_tasks, unloaded_tasks, current_position
+        )
+        
+        if not optimal_strategy:
+            self.logger.debug(f"[AGV {agv_id}] 鍐崇瓥杩斿洖None锛屾病鏈夋渶浼樼瓥鐣�")
+            return None
+            
+        self.logger.debug(f"[AGV {agv_id}] 鍐崇瓥瀹屾垚: {optimal_strategy['strategy']} - {optimal_strategy['action']}")
+        self.logger.debug(f"[AGV {agv_id}] 鐩爣浣嶇疆: {optimal_strategy['target_position']}")
+        self.logger.debug(f"[AGV {agv_id}] 鍐崇瓥鍘熷洜: {optimal_strategy.get('reason', 'N/A')}")
+            
+        # 4. 鏋勫缓浠诲姟淇℃伅
+        result = {
+            'agvId': agv_id,
+            'currentPosition': current_position,
+            'strategy': optimal_strategy['strategy'],
+            'next_action': optimal_strategy['action'],
+            'target_position': optimal_strategy['target_position'],
+            'primary_task': optimal_strategy['selected_task'],
+            'all_loaded_tasks': loaded_tasks,
+            'all_unloaded_tasks': unloaded_tasks,
+            'total_tasks_count': len(all_tasks),
+            'estimated_efficiency': optimal_strategy.get('efficiency_score', 0),
+            'decision_reason': optimal_strategy.get('reason', ''),
+            'agvDirection': agv_status.direction,
+            'agvVoltage': agv_status.vol,
+            'agvError': agv_status.error
+        }
+        
+        self.logger.debug(f"[AGV {agv_id}] 鏈�缁堜换鍔′俊鎭瀯寤哄畬鎴愶紝杩斿洖缁撴灉")
+        return result
+    
+    def _determine_next_best_action(self, loaded_tasks: List[Dict], 
+                                   unloaded_tasks: List[Dict], current_position: str) -> Optional[Dict]:
+        """
+        纭畾AGV涓嬩竴姝ユ渶浼樿鍔�
+        
+        Args:
+            loaded_tasks: 宸茶杞界殑浠诲姟
+            unloaded_tasks: 鏈杞界殑浠诲姟
+            current_position: 褰撳墠浣嶇疆
+            
+        Returns:
+            Optional[Dict]: 鏈�浼樿鍔ㄧ瓥鐣�
+        """
+        # 榛樿绛栫暐锛氱患鍚堜紭鍖栧喅绛�
+        # 1. 濡傛灉鏈夊凡瑁呰浇浠诲姟锛屼紭鍏堥�佽揣
+        if loaded_tasks:
+            nearest_delivery = min(loaded_tasks, key=lambda t: t['distance_to_end'])
+            
+            # 2. 濡傛灉涔熸湁鏈杞戒换鍔★紝姣旇緝璺濈鍐崇瓥
+            if unloaded_tasks:
+                nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start'])
+                
+                # 缁煎悎浼樺寲绛栫暐锛氬鏋滃彇璐х偣鏄捐憲鏇磋繎锛屼紭鍏堝彇璐э紱鍚﹀垯浼樺厛閫佽揣
+                if nearest_pickup['distance_to_start'] * 1.5 < nearest_delivery['distance_to_end']:
+                    return {
+                        'strategy': 'comprehensive_optimization',
+                        'action': 'pickup',
+                        'selected_task': nearest_pickup,
+                        'target_position': nearest_pickup['task_info']['start_position'],
+                        'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position),
+                        'reason': f"缁煎悎浼樺寲锛氬彇璐х偣({nearest_pickup['distance_to_start']})姣旈�佽揣鐐�({nearest_delivery['distance_to_end']})鏄捐憲鏇磋繎"
+                    }
+                else:
+                    return {
+                        'strategy': 'comprehensive_optimization',
+                        'action': 'deliver',
+                        'selected_task': nearest_delivery,
+                        'target_position': nearest_delivery['task_info']['end_position'],
+                        'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position),
+                        'reason': f"缁煎悎浼樺寲锛氫紭鍏堥�佽揪宸茶杞借揣鐗╋紝閫佽揣璺濈{nearest_delivery['distance_to_end']}"
+                    }
+            else:
+                # 鍙湁宸茶杞戒换鍔★紝鐩存帴閫佽揣
+                return {
+                    'strategy': 'comprehensive_optimization',
+                    'action': 'deliver',
+                    'selected_task': nearest_delivery,
+                    'target_position': nearest_delivery['task_info']['end_position'],
+                    'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position),
+                    'reason': f"缁煎悎浼樺寲锛氶�佽揪鍞竴宸茶杞借揣鐗╋紝璺濈{nearest_delivery['distance_to_end']}"
+                }
+        
+        # 3. 濡傛灉鍙湁鏈杞戒换鍔★紝鍙栬揣
+        elif unloaded_tasks:
+            nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start'])
+            return {
+                'strategy': 'comprehensive_optimization',
+                'action': 'pickup',
+                'selected_task': nearest_pickup,
+                'target_position': nearest_pickup['task_info']['start_position'],
+                'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position),
+                'reason': f"缁煎悎浼樺寲锛氬彇璐у敮涓�鏈杞戒换鍔★紝璺濈{nearest_pickup['distance_to_start']}"
+            }
+        
+        # 4. 娌℃湁浠诲姟
+        return None
+    
+    def _parse_task_details(self, task_id: str, current_position: str) -> Optional[Dict]:
+        """
+        瑙f瀽浠诲姟璇︾粏淇℃伅锛屽寘鎷捣缁堢偣鍜岃窛绂�
+        
+        Args:
+            task_id: 浠诲姟ID
+            current_position: 褰撳墠浣嶇疆
+            
+        Returns:
+            Optional[Dict]: 浠诲姟璇︾粏淇℃伅
+        """
+        self.logger.debug(f"寮�濮嬭В鏋愪换鍔� {task_id}锛屽綋鍓嶄綅缃�: {current_position}")
+        
+        start_pos, end_pos = self._fast_parse_task_start_end(task_id)
+        
+        if not start_pos or not end_pos:
+            self.logger.warning(f"浠诲姟 {task_id} 瑙f瀽澶辫触: start_pos={start_pos}, end_pos={end_pos}")
+            return None
+            
+        self.logger.debug(f"浠诲姟 {task_id} 瑙f瀽鎴愬姛: start={start_pos}, end={end_pos}")
+        
+        # 璁$畻璺濈
+        current_coord = get_coordinate_from_path_id(current_position, self.path_mapping)
+        start_coord = get_coordinate_from_path_id(start_pos, self.path_mapping) 
+        end_coord = get_coordinate_from_path_id(end_pos, self.path_mapping)
+        
+        if not all([current_coord, start_coord, end_coord]):
+            return None
+            
+        distance_to_start = calculate_manhattan_distance(current_coord, start_coord)
+        distance_to_end = calculate_manhattan_distance(current_coord, end_coord)
+        
+        return {
+            'task_id': task_id,
+            'start_position': start_pos,
+            'end_position': end_pos,
+            'distance_to_start': distance_to_start,
+            'distance_to_end': distance_to_end,
+            'start_coord': start_coord,
+            'end_coord': end_coord
+        }
+    
+    def _calculate_delivery_efficiency(self, task_data: Dict, current_position: str) -> float:
+        """
+        璁$畻閫佽揣鏁堢巼鍒嗘暟
+        
+        Args:
+            task_data: 浠诲姟鏁版嵁
+            current_position: 褰撳墠浣嶇疆
+            
+        Returns:
+            float: 鏁堢巼鍒嗘暟 (0-10)
+        """
+        distance = task_data['distance_to_end']
+        
+        # 璺濈瓒婅繎鏁堢巼瓒婇珮
+        distance_score = max(0, 10 - distance / 5)
+        
+        # 宸茶杞界殑浠诲姟鏈夐澶栧鍔�
+        loaded_bonus = 3.0 if task_data['backpack_item'].loaded else 0
+        
+        return distance_score + loaded_bonus
+    
+    def _calculate_pickup_efficiency(self, task_data: Dict, current_position: str) -> float:
+        """
+        璁$畻鍙栬揣鏁堢巼鍒嗘暟
+        
+        Args:
+            task_data: 浠诲姟鏁版嵁
+            current_position: 褰撳墠浣嶇疆
+            
+        Returns:
+            float: 鏁堢巼鍒嗘暟 (0-10)
+        """
+        distance_to_start = task_data['distance_to_start']
+        
+        # 璺濈瓒婅繎鏁堢巼瓒婇珮
+        distance_score = max(0, 10 - distance_to_start / 3)
+        
+        # 寰堣繎鐨勫彇璐х偣鏈夐珮濂栧姳
+        if distance_to_start <= 3:
+            distance_score += 5.0
+        elif distance_to_start <= 5:
+            distance_score += 2.0
+            
+        return distance_score
+    
+    def _fast_parse_task_start_end(self, task_id: str) -> Tuple[Optional[str], Optional[str]]:
+        """
+        瑙f瀽浠诲姟璧风偣鍜岀粓鐐�
+        鏀寔涓ょ鏍煎紡锛�
+        1. 鏍囧噯鏍煎紡锛歺xx_start_end
+        2. 绠�鍗曟牸寮忥細鏁板瓧ID锛堥�氳繃hash鍒嗛厤浣嶇疆锛�
+        
+        Args:
+            task_id: 浠诲姟ID
+            
+        Returns:
+            Tuple[Optional[str], Optional[str]]: (璧风偣, 缁堢偣)
+        """
+        self.logger.debug(f"瑙f瀽浠诲姟ID: {task_id}")
+        
+        # 鏂规硶1锛氬皾璇曟爣鍑嗘牸寮忚В鏋愶紙鍖呭惈涓嬪垝绾匡級
+        if "_" in task_id:
+            parts = task_id.split("_")
+            self.logger.debug(f"浠诲姟ID鍖呭惈涓嬪垝绾匡紝鍒嗗壊缁撴灉: {parts}")
+            
+            if len(parts) >= 3:
+                potential_start = parts[-2]
+                potential_end = parts[-1]
+                
+                start_valid = potential_start in self.path_mapping
+                end_valid = potential_end in self.path_mapping
+                
+                self.logger.debug(f"鏍囧噯鏍煎紡瑙f瀽缁撴灉: start={potential_start}(valid={start_valid}), end={potential_end}(valid={end_valid})")
+                
+                if start_valid and end_valid:
+                    return potential_start, potential_end
+                    
+            self.logger.debug(f"浠诲姟ID鍒嗗壊鍚庨儴鍒嗘暟閲忎笉瓒�: {len(parts)} < 3")
+        
+        # 鏂规硶2锛氱畝鍗曟暟瀛桰D鐨勫閫夋満鍒讹紙閫氳繃hash鍒嗛厤浣嶇疆锛�
+        self.logger.debug(f"灏濊瘯绠�鍗曟暟瀛桰D澶囬�夎В鏋愭満鍒�")
+        
+        # 鑾峰彇鎵�鏈夊彲鐢ㄤ綅缃�
+        all_positions = list(self.path_mapping.keys())
+        if len(all_positions) < 2:
+            self.logger.warning(f"璺緞鏄犲皠浣嶇疆涓嶈冻锛屾棤娉曚负浠诲姟鍒嗛厤璧风粓鐐�")
+            return None, None
+        
+        # 浣跨敤浠诲姟ID鐨刪ash鍊兼潵鍒嗛厤璧风偣鍜岀粓鐐�
+        try:
+            task_hash = abs(hash(task_id))
+            
+            # 鍒嗛厤璧风偣锛堜娇鐢╤ash鐨勫墠涓�鍗婏級
+            start_index = task_hash % len(all_positions)
+            start_pos = all_positions[start_index]
+            
+            # 鍒嗛厤缁堢偣锛堜娇鐢╤ash鐨勫悗涓�鍗婏紝纭繚涓庤捣鐐逛笉鍚岋級
+            end_index = (task_hash // len(all_positions)) % len(all_positions)
+            if end_index == start_index:
+                end_index = (end_index + 1) % len(all_positions)
+            end_pos = all_positions[end_index]
+            
+            self.logger.debug(f"绠�鍗旾D澶囬�夎В鏋愭垚鍔�: 浠诲姟{task_id} -> start={start_pos}, end={end_pos}")
+            return start_pos, end_pos
+            
+        except Exception as e:
+            self.logger.error(f"绠�鍗旾D澶囬�夎В鏋愬け璐�: {e}")
+        
+        self.logger.warning(f"浠诲姟ID {task_id} 鎵�鏈夎В鏋愭柟娉曢兘澶辫触锛岃繑鍥� None, None")
+        return None, None
+    
+    def _fast_parse_target(self, task_id: str, current_position: str, loaded: bool) -> Optional[str]:
+        """
+        瑙f瀽浠诲姟鐩爣浣嶇疆
+        
+        Args:
+            task_id: 浠诲姟ID
+            current_position: 褰撳墠浣嶇疆
+            loaded: 鏄惁宸茶杞�
+            
+        Returns:
+            Optional[str]: 鐩爣浣嶇疆鐮�
+        """
+        _, task_end = self._fast_parse_task_start_end(task_id)
+        if task_end:
+            return task_end
+        
+        # 鍩轰簬hash鍊煎拰瑁呰浇鐘舵�佸揩閫熼�夋嫨
+        hash_val = hash(task_id) % 1000
+        
+        if loaded:
+            # 宸茶杞斤紝閫夋嫨鍑哄簱鍖哄煙
+            target_positions = [pos for pos in self.delivery_positions if pos != current_position]
+            if not target_positions:
+                target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
+        else:
+            # 鏈杞斤紝閫夋嫨浠撳簱鍖哄煙
+            target_positions = [pos for pos in self.pickup_positions if pos != current_position]
+            if not target_positions:
+                target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
+        
+        if target_positions:
+            return target_positions[hash_val % len(target_positions)]
+        
+        return None
+
+    def extract_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
+        """鎻愬彇鎵�鏈堿GV姝e湪鎵ц鐨勪换鍔�"""
+        self.logger.debug(f"寮�濮嬭浆鎹换鍔′负鏍囧噯鏍煎紡锛孉GV鏁伴噺: {len(agv_status_list)}")
+        
+        optimized_tasks = self.extract_optimized_executing_tasks(agv_status_list)
+        
+        self.logger.debug(f"鎻愬彇缁撴灉鏁伴噺: {len(optimized_tasks)}")
+        
+        legacy_tasks = []
+        for i, task in enumerate(optimized_tasks):
+            try:
+                self.logger.debug(f"杞崲浠诲姟 #{i+1}: AGV {task['agvId']}")
+                
+                primary_task = task['primary_task']
+                task_info = primary_task['task_info']
+                backpack_item = primary_task['backpack_item']
+                
+                self.logger.debug(f"  - primary_task瀛樺湪: {primary_task is not None}")
+                self.logger.debug(f"  - task_info瀛樺湪: {task_info is not None}")
+                self.logger.debug(f"  - backpack_item瀛樺湪: {backpack_item is not None}")
+                
+                # 鏍规嵁action纭畾鐘舵��
+                is_pickup_action = task['next_action'] == 'pickup'
+                
+                self.logger.debug(f"  - next_action: {task['next_action']}")
+                self.logger.debug(f"  - is_pickup_action: {is_pickup_action}")
+                self.logger.debug(f"  - target_position: {task['target_position']}")
+                
+                # 淇濇寔鍘熸湁鐨勬爣鍑咼SON鏍煎紡
+                legacy_task = {
+                    'agvId': task['agvId'],
+                    'taskId': task_info['task_id'],
+                    'currentPosition': task['currentPosition'],
+                    'backpackIndex': backpack_item.index,
+                    'status': 'executing',
+                    'agvDirection': task['agvDirection'],
+                    'agvVoltage': task['agvVoltage'],
+                    'agvError': task['agvError'],
+                    'loaded': backpack_item.loaded,
+                    'needPickup': is_pickup_action,
+                    'targetPosition': task['target_position'],
+                    'taskStart': task_info['start_position'],
+                    'taskEnd': task_info['end_position']
+                }
+                
+                self.logger.debug(f"  - 浠诲姟鏋勫缓鎴愬姛: {legacy_task['agvId']} -> {legacy_task['targetPosition']}")
+                legacy_tasks.append(legacy_task)
+                
+            except Exception as e:
+                self.logger.error(f"杞崲浠诲姟 #{i+1} 澶辫触: {e}")
+                self.logger.error(f"浠诲姟鍐呭: {task}")
+                continue
+            
+        self.logger.info(f"浣跨敤缁煎悎浼樺寲绛栫暐锛屼负 {len(legacy_tasks)} 涓狝GV鐢熸垚鏍囧噯鏍煎紡璺緞")
+        return legacy_tasks
+
+
+class BatchPathPlanner:
+    """鎵归噺璺緞瑙勫垝鍣�"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]], 
+                 algorithm_type: str = "A_STAR", agv_manager=None):
+        """
+        鍒濆鍖栨壒閲忚矾寰勮鍒掑櫒
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+            algorithm_type: 鍩虹璺緞瑙勫垝绠楁硶绫诲瀷
+            agv_manager: 鐢ㄤ簬鑾峰彇AGV鐘舵�佷俊鎭�
+        """
+        self.path_mapping = path_mapping
+        self.algorithm_type = algorithm_type
+        self.agv_manager = agv_manager
+        self.logger = logging.getLogger(__name__)
+        
+        # 鍒濆鍖栦紭鍖栫粍浠�
+        self.task_extractor = ExecutingTaskExtractor(path_mapping)
+        self.collision_detector = FastCollisionDetector(min_distance=3.0)
+        self.collision_resolver = FastCollisionResolver(path_mapping, self.collision_detector, agv_manager)
+        self.base_planner = PathPlanningFactory.create_path_planner(algorithm_type, path_mapping)
+    
+    def plan_all_agv_paths(self, agv_status_list: List[AGVStatus], 
+                          include_idle_agv: bool = False,
+                          constraints: List[Tuple[int, int, float]] = None) -> Dict:
+        """
+        涓烘墍鏈堿GV瑙勫垝璺緞锛堟瘡涓狝GV鍙敓鎴愪竴涓矾寰勶級
+        
+        Args:
+            agv_status_list: AGV鐘舵�佸垪琛�
+            include_idle_agv: 鏄惁鍖呭惈绌洪棽AGV
+            constraints: 璺緞绾︽潫鏉′欢
+            
+        Returns:
+            Dict: 鍖呭惈鎵�鏈夎矾寰勪俊鎭殑缁撴灉
+        """
+        start_time = time.time()
+        
+        # 1. 鎻愬彇鎵ц涓殑浠诲姟锛堜娇鐢ㄥ悜鍚庡吋瀹圭殑鏂规硶锛�
+        executing_tasks = self.task_extractor.extract_executing_tasks(agv_status_list)
+        
+        # 2. 璺緞瑙勫垝
+        planned_paths = []
+        total_planning_time = 0
+        planned_agv_ids = set()
+        
+        for task in executing_tasks:
+            agv_id = task['agvId']
+            current_pos = task['currentPosition']
+            target_pos = task['targetPosition']
+            task_id = task['taskId']
+            
+            self.logger.debug(f"寮�濮嬭矾寰勮鍒�: AGV {agv_id}, 浠诲姟 {task_id}")
+            self.logger.debug(f"  - current_pos: {current_pos} ({type(current_pos)})")
+            self.logger.debug(f"  - target_pos: {target_pos} ({type(target_pos)})")
+            
+            # 閬垮厤閲嶅瑙勫垝
+            if agv_id in planned_agv_ids:
+                self.logger.debug(f"  - 璺宠繃AGV {agv_id}锛氬凡瑙勫垝")
+                continue
+            
+            # 楠岃瘉浣嶇疆鏍煎紡
+            if not current_pos or not target_pos:
+                self.logger.error(f"  - AGV {agv_id} 浣嶇疆淇℃伅鏃犳晥: current={current_pos}, target={target_pos}")
+                continue
+            
+            # 璺緞鏍煎紡鏍囧噯鍖�
+            from common.utils import normalize_path_id
+            normalized_current = normalize_path_id(current_pos)
+            normalized_target = normalize_path_id(target_pos)
+            
+            self.logger.debug(f"  - 鏍煎紡鏍囧噯鍖�: {current_pos} -> {normalized_current}, {target_pos} -> {normalized_target}")
+            
+            # 璺緞瑙勫垝
+            path_start_time = time.time()
+            self.logger.debug(f"  - 璋冪敤base_planner.plan_path({normalized_current}, {normalized_target})")
+            planned_path = self.base_planner.plan_path(normalized_current, normalized_target, constraints)
+            path_end_time = time.time()
+            
+            planning_time = (path_end_time - path_start_time) * 1000
+            total_planning_time += planning_time
+            
+            self.logger.debug(f"  - 璺緞瑙勫垝缁撴灉: {planned_path is not None}")
+            if planned_path:
+                self.logger.debug(f"  - 璺緞闀垮害: {len(planned_path.codeList) if hasattr(planned_path, 'codeList') else 'N/A'}")
+            else:
+                self.logger.error(f"  - AGV {agv_id} 璺緞瑙勫垝澶辫触: {current_pos} -> {target_pos}")
+            
+            if planned_path:
+                # 鐢熸垚璺緞瀛楀吀
+                seg_id = generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2")
+                
+                path_dict = {
+                    'agvId': agv_id,
+                    'segId': seg_id,
+                    'codeList': []
+                }
+                
+                # 杞崲璺緞鐐规牸寮�
+                for i, path_code in enumerate(planned_path.codeList):
+                    if isinstance(path_code, PathCode):
+                        action_type = "2"  # 浠诲姟绫诲瀷
+                        pos_type = None
+                        backpack_level = task.get('backpackIndex', 0)
+                        is_target_point = False
+                        
+                        # 纭畾浣嶇疆绫诲瀷锛氭渶鍚庝竴涓偣鐨勫姩浣滅被鍨�
+                        if i == len(planned_path.codeList) - 1:  # 鏈�鍚庝竴涓矾寰勭偣
+                            is_target_point = True
+                            if task.get('loaded', False):
+                                # 宸茶杞戒换鍔★細褰撳墠璺緞鏄幓缁堢偣鏀捐揣
+                                pos_type = "2"  # 鏀捐揣
+                                self.logger.debug(f"AGV {agv_id} 鍦ㄧ粓鐐� {path_code.code} 璁剧疆鏀捐揣鍔ㄤ綔")
+                            else:
+                                # 鏈杞戒换鍔★細褰撳墠璺緞鏄幓璧风偣鍙栬揣
+                                pos_type = "1"  # 鍙栬揣
+                                self.logger.debug(f"AGV {agv_id} 鍦ㄨ捣鐐� {path_code.code} 璁剧疆鍙栬揣鍔ㄤ綔")
+                        
+                        enhanced_code = generate_navigation_code(
+                            code=path_code.code,
+                            direction=path_code.direction,
+                            action_type=action_type,
+                            task_id=task_id,
+                            pos_type=pos_type,
+                            backpack_level=backpack_level,
+                            is_target_point=is_target_point
+                        )
+                        path_dict['codeList'].append(enhanced_code)
+                    else:
+                        enhanced_code = generate_navigation_code(
+                            code=path_code.get('code', ''),
+                            direction=path_code.get('direction', '90'),
+                            action_type="2",
+                            task_id=task_id
+                        )
+                        path_dict['codeList'].append(enhanced_code)
+                
+                planned_paths.append(path_dict)
+                planned_agv_ids.add(agv_id)
+                self.logger.debug(f"AGV {agv_id} 蹇�熻鍒�: {current_pos} -> {target_pos}")
+        
+        # 3. 绌洪棽AGV澶勭悊
+        if include_idle_agv:
+            idle_paths = self._fast_plan_idle_agv_paths(agv_status_list, executing_tasks, constraints, planned_agv_ids)
+            planned_paths.extend(idle_paths)
+        
+        # 4. 鍘婚噸妫�鏌�
+        final_paths = []
+        final_agv_ids = set()
+        for path in planned_paths:
+            agv_id = path['agvId']
+            if agv_id not in final_agv_ids:
+                final_paths.append(path)
+                final_agv_ids.add(agv_id)
+        planned_paths = final_paths
+        
+        # 5. 纰版挒妫�娴嬪拰瑙e喅
+        conflicts = self.collision_detector.detect_conflicts(planned_paths)
+        
+        if conflicts:
+            self.logger.info(f"鍙戠幇 {len(conflicts)} 涓鎾烇紝蹇�熻В鍐充腑...")
+            planned_paths = self.collision_resolver.resolve_conflicts(planned_paths, conflicts, executing_tasks)
+            remaining_conflicts = self.collision_detector.detect_conflicts(planned_paths)
+        else:
+            remaining_conflicts = []
+        
+        end_time = time.time()
+        total_time = (end_time - start_time) * 1000
+        
+        # 6. 鏋勫缓浼樺寲缁撴灉
+        result = {
+            'totalAgvs': len(agv_status_list),
+            'executingTasksCount': len(executing_tasks),
+            'plannedPathsCount': len(planned_paths),
+            'totalPlanningTime': round(total_time, 2),
+            'pathPlanningTime': round(total_planning_time, 2),
+            'conflictsDetected': len(conflicts),
+            'remainingConflicts': len(remaining_conflicts),
+            'algorithm': f"Optimized_{self.algorithm_type}",
+            'fourDirectionCompliant': True,
+            'collisionFree': len(remaining_conflicts) == 0,
+            'plannedPaths': planned_paths,
+            'executingTasks': executing_tasks
+        }
+        
+        self.logger.info(f"浼樺寲鎵归噺璺緞瑙勫垝瀹屾垚 - 鎬籄GV: {result['totalAgvs']}, "
+                        f"鎵ц涓换鍔�: {result['executingTasksCount']}, "
+                        f"瑙勫垝璺緞: {result['plannedPathsCount']}, "
+                        f"鎬昏�楁椂: {result['totalPlanningTime']:.2f}ms, "
+                        f"鏃犵鎾�: {result['collisionFree']}")
+        
+        return result
+    
+    def _fast_plan_idle_agv_paths(self, agv_status_list: List[AGVStatus], 
+                                 executing_tasks: List[Dict],
+                                 constraints: List[Tuple[int, int, float]] = None,
+                                 planned_agv_ids: Set[str] = None) -> List[Dict]:
+        """
+        涓虹┖闂睞GV瑙勫垝璺緞
+        """
+        idle_paths = []
+        executing_agv_ids = {task['agvId'] for task in executing_tasks}
+        
+        if planned_agv_ids is None:
+            planned_agv_ids = executing_agv_ids.copy()
+        
+        for agv_status in agv_status_list:
+            agv_id = agv_status.agvId
+            
+            if agv_id in planned_agv_ids:
+                continue
+            
+            if self._is_agv_idle_fast(agv_status):
+                current_pos = agv_status.position
+                target_pos = self._get_idle_agv_target_fast(agv_id, current_pos)
+                
+                if target_pos and target_pos != current_pos:
+                    planned_path = self.base_planner.plan_path(current_pos, target_pos, constraints)
+                    
+                    if planned_path:
+                        action_type = "3" if self._is_charging_path(target_pos) else "4"
+                        seg_id = generate_segment_id(agv_id=agv_id, target_position=target_pos, action_type=action_type)
+                        
+                        path_dict = {
+                            'agvId': agv_id,
+                            'segId': seg_id,
+                            'codeList': []
+                        }
+                        
+                        for i, path_code in enumerate(planned_path.codeList):
+                            if isinstance(path_code, PathCode):
+                                # 绌洪棽AGV涓嶉渶瑕乼askId銆乸osType鍜宭ev瀛楁
+                                enhanced_code = generate_navigation_code(
+                                    code=path_code.code,
+                                    direction=path_code.direction,
+                                    action_type=action_type,
+                                    is_target_point=False  # 绌洪棽AGV璺緞鏃犵洰鏍囩偣姒傚康
+                                )
+                                path_dict['codeList'].append(enhanced_code)
+                        
+                        idle_paths.append(path_dict)
+                        planned_agv_ids.add(agv_id)
+        
+        return idle_paths
+    
+    def _is_agv_idle_fast(self, agv_status: AGVStatus) -> bool:
+        """鍒ゆ柇AGV鏄惁绌洪棽"""
+        if agv_status.status not in [0, 1, 2] or agv_status.error != 0:
+            return False
+        
+        # 妫�鏌ヨ儗绡�
+        if agv_status.backpack:
+            for item in agv_status.backpack[:2]:  # 鍙鏌ュ墠涓や釜
+                if item.loaded and item.execute:
+                    return False
+        
+        return True
+    
+    def _get_idle_agv_target_fast(self, agv_id: str, current_position: str) -> Optional[str]:
+        """鑾峰彇绌洪棽AGV鐩爣浣嶇疆"""
+        hash_val = hash(agv_id) % 1000
+        
+        # 閫夋嫨閫昏緫
+        charging_positions = [pos for pos in self.task_extractor.charging_positions if pos != current_position]
+        if charging_positions:
+            return charging_positions[hash_val % len(charging_positions)]
+        
+        standby_positions = [pos for pos in self.task_extractor.standby_positions if pos != current_position]
+        if standby_positions:
+            return standby_positions[hash_val % len(standby_positions)]
+        
+        all_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
+        if all_positions:
+            return all_positions[hash_val % len(all_positions)]
+        
+        return None
+
+    def _is_charging_path(self, target_position: str) -> bool:
+        """
+        鍒ゆ柇鐩爣浣嶇疆鏄惁涓哄厖鐢电珯锛堝悗鏈熶慨鏀癸級
+        """
+        if not target_position:
+            return False
+        
+        # 鍏呯數绔欒瘑鍒�
+        return target_position.startswith("2") or "charge" in target_position.lower()
+
+
+class PathPlanner:
+    """璺緞瑙勫垝鍣ㄥ熀绫�"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
+        """
+        鍒濆鍖栬矾寰勮鍒掑櫒
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+        """
+        self.path_mapping = path_mapping
+        self.logger = logging.getLogger(__name__)
+    
+    def plan_path(self, start_code: str, end_code: str, 
+                 constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]:
+        """
+        瑙勫垝浠庤捣鐐瑰埌缁堢偣鐨勮矾寰�
+        
+        Args:
+            start_code: 璧峰璺緞鐐笽D
+            end_code: 鐩爣璺緞鐐笽D
+            constraints: 绾︽潫鏉′欢鍒楄〃
+            
+        Returns:
+            Optional[PlannedPath]: 瑙勫垝鐨勮矾寰勶紝濡傛灉鏃犳硶瑙勫垝鍒欒繑鍥濶one
+        """
+        raise NotImplementedError("瀛愮被蹇呴』瀹炵幇姝ゆ柟娉�")
+    
+    def is_valid_path_point(self, code: str) -> bool:
+        """
+        妫�鏌ヨ矾寰勭偣ID鏄惁鏈夋晥
+        
+        Args:
+            code: 璺緞鐐笽D
+            
+        Returns:
+            bool: 鏄惁鏈夋晥
+        """
+        return code in self.path_mapping
+
+
+class AStarPathPlanner(PathPlanner):
+    """A*璺緞瑙勫垝鍣�"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]], turn_cost: float = 0.5):
+        """
+        鍒濆鍖朅*璺緞瑙勫垝鍣�
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+            turn_cost: 杞悜浠d环
+        """
+        super().__init__(path_mapping)
+        self.turn_cost = turn_cost
+        self.logger = logging.getLogger(__name__)
+        
+        # 鍥涗釜鏂瑰悜锛氫笂銆佸彸銆佷笅銆佸乏 (涓ユ牸鍥涙柟鍚戠Щ鍔�)
+        self.directions = [(0, -1), (1, 0), (0, 1), (-1, 0)]
+        self.direction_angles = ["270", "0", "90", "180"]
+        
+        # 棰勮绠楀潗鏍囨槧灏勶紙蹇呴』鍦� _build_adjacency_list 涔嬪墠鍒濆鍖栵級
+        self.coord_to_code = {}
+        for code, data in path_mapping.items():
+            if isinstance(data, dict) and 'x' in data and 'y' in data:
+                self.coord_to_code[(data['x'], data['y'])] = code
+        
+        # 棰勮绠楅偦鎺ヨ〃
+        self.adjacency = self._build_adjacency_list()
+        
+        # 鎬ц兘鐩戞帶
+        self.planning_count = 0
+        self.total_planning_time = 0.0
+        
+        self.logger.debug(f"A*瑙勫垝鍣ㄥ垵濮嬪寲瀹屾垚锛岄偦鎺ヨ〃: {len(self.adjacency)} 涓妭鐐�")
+    
+    def _build_adjacency_list(self) -> Dict[str, List[Tuple[str, str]]]:
+        """
+        棰勮绠楅偦鎺ヨ〃
+        
+        Returns:
+            Dict[str, List[Tuple[str, str]]]: {鑺傜偣: [(閭诲眳鑺傜偣, 绉诲姩鏂瑰悜), ...]}
+        """
+        adjacency = defaultdict(list)
+        
+        for code, coord_data in self.path_mapping.items():
+            x, y = coord_data['x'], coord_data['y']
+            
+            # 妫�鏌ュ洓涓柟鍚戠殑閭诲眳
+            for i, (dx, dy) in enumerate(self.directions):
+                next_x, next_y = x + dx, y + dy
+                
+                # 鏌ユ壘閭诲眳浠g爜
+                neighbor_coord = (next_x, next_y)
+                if neighbor_coord in self.coord_to_code:
+                    neighbor_code = self.coord_to_code[neighbor_coord]
+                    direction = self.direction_angles[i]
+                    adjacency[code].append((neighbor_code, direction))
+        
+        return adjacency
+    
+    def plan_path(self, start_code: str, end_code: str, 
+                 constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]:
+        """       
+        Args:
+            start_code: 璧峰璺緞鐐笽D
+            end_code: 鐩爣璺緞鐐笽D
+            constraints: 绾︽潫鏉′欢鍒楄〃
+            
+        Returns:
+            Optional[PlannedPath]: 瑙勫垝鐨勮矾寰�
+        """
+        # 楠岃瘉
+        if not self.is_valid_path_point(start_code) or not self.is_valid_path_point(end_code):
+            return None
+        
+        if start_code == end_code:
+            return PlannedPath(agvId="", codeList=[PathCode(code=start_code, direction="90")])
+        
+        # 鑾峰彇鍧愭爣锛堜娇鐢ㄩ璁$畻鏄犲皠锛�
+        start_coord = (self.path_mapping[start_code]['x'], self.path_mapping[start_code]['y'])
+        end_coord = (self.path_mapping[end_code]['x'], self.path_mapping[end_code]['y'])
+        
+        # A*瀹炵幇
+        # 浣跨敤鍏冪粍鍫嗭紝鍑忓皯瀵硅薄鍒涘缓寮�閿�
+        open_set = [(0, start_code, None, [])]  # (f_score, code, parent, path)
+        closed_set = set()
+        g_scores = {start_code: 0}
+        
+        # 棰勮绠楃害鏉熸鏌ュ嚱鏁�
+        constraint_check = self._build_constraint_checker(constraints) if constraints else None
+        
+        while open_set:
+            f_score, current_code, parent_code, path = heapq.heappop(open_set)
+            
+            if current_code in closed_set:
+                continue
+                
+            closed_set.add(current_code)
+            current_path = path + [current_code]
+            
+            if current_code == end_code:
+                # 閲嶅缓璺緞
+                return self._build_path_result(current_path)
+            
+            # 浣跨敤棰勮绠楃殑閭绘帴琛�
+            for neighbor_code, direction in self.adjacency[current_code]:
+                if neighbor_code in closed_set:
+                    continue
+                
+                # 绾︽潫妫�鏌�
+                if constraint_check and constraint_check(neighbor_code):
+                    continue
+                
+                # 浠d环璁$畻
+                g_cost = g_scores[current_code] + 1
+                
+                if neighbor_code not in g_scores or g_cost < g_scores[neighbor_code]:
+                    g_scores[neighbor_code] = g_cost
+                    
+                    # 鍚彂寮忚绠�
+                    neighbor_coord = (self.path_mapping[neighbor_code]['x'], 
+                                    self.path_mapping[neighbor_code]['y'])
+                    h_cost = abs(neighbor_coord[0] - end_coord[0]) + abs(neighbor_coord[1] - end_coord[1])
+                    f_cost = g_cost + h_cost
+                    
+                    heapq.heappush(open_set, (f_cost, neighbor_code, current_code, current_path))
+        
+        return None
+    
+    def _build_constraint_checker(self, constraints: List[Tuple[int, int, float]]):
+        """鏋勫缓绾︽潫妫�鏌ュ嚱鏁�"""
+        def check_constraints(code: str) -> bool:
+            coord = (self.path_mapping[code]['x'], self.path_mapping[code]['y'])
+            for cx, cy, radius in constraints:
+                if (coord[0] - cx) ** 2 + (coord[1] - cy) ** 2 <= radius ** 2:
+                    return True
+            return False
+        return check_constraints
+    
+    def _build_path_result(self, path_codes: List[str]) -> PlannedPath:
+        """鏋勫缓璺緞缁撴灉"""
+        result_codes = []
+        
+        for i, code in enumerate(path_codes):
+            direction = "90"  # 榛樿鏂瑰悜
+            
+            if i < len(path_codes) - 1:
+                # 浣跨敤棰勮绠楃殑閭绘帴琛ㄦ煡鎵炬柟鍚�
+                next_code = path_codes[i + 1]
+                for neighbor_code, neighbor_direction in self.adjacency[code]:
+                    if neighbor_code == next_code:
+                        direction = neighbor_direction
+                        break
+            
+            result_codes.append(PathCode(code=code, direction=direction))
+        
+        return PlannedPath(agvId="", codeList=result_codes)
+
+
+# 纰版挒妫�娴嬪櫒
+class FastCollisionDetector:
+    """纰版挒妫�娴嬪櫒"""
+    
+    def __init__(self, min_distance: float = 3.0):
+        self.min_distance = min_distance
+        self.logger = logging.getLogger(__name__)
+    
+    def detect_conflicts(self, planned_paths: List[Dict]) -> List[Dict]:
+        """
+        纰版挒妫�娴� - 鍩轰簬鏃堕棿姝ョ殑浼樺寲妫�娴�
+        
+        Args:
+            planned_paths: 瑙勫垝璺緞鍒楄〃
+            
+        Returns:
+            List[Dict]: 鍐茬獊鍒楄〃
+        """
+        conflicts = []
+        if len(planned_paths) < 2:
+            return conflicts
+        
+        # 鏋勫缓鏃堕棿-浣嶇疆鏄犲皠
+        time_positions = defaultdict(list)
+        
+        for path_data in planned_paths:
+            agv_id = path_data['agvId']
+            code_list = path_data.get('codeList', [])
+            
+            for time_step, path_code in enumerate(code_list):
+                # 蹇�熸彁鍙栦綅缃爜
+                if isinstance(path_code, dict):
+                    position = path_code.get('code', '')
+                elif hasattr(path_code, 'code'):
+                    position = path_code.code
+                else:
+                    position = str(path_code)
+                
+                if position:
+                    time_positions[time_step].append((agv_id, position))
+        
+        # 蹇�熷啿绐佹娴�
+        for time_step, agv_positions in time_positions.items():
+            if len(agv_positions) < 2:
+                continue
+            
+            # 浣嶇疆鍒嗙粍妫�鏌�
+            position_groups = defaultdict(list)
+            for agv_id, position in agv_positions:
+                position_groups[position].append(agv_id)
+            
+            # 鍙戠幇鍐茬獊
+            for position, agv_list in position_groups.items():
+                if len(agv_list) > 1:
+                    conflicts.append({
+                        'type': 'position_conflict',
+                        'time_step': time_step,
+                        'position': position,
+                        'agv_ids': agv_list,
+                        'severity': 'high'
+                    })
+        
+        self.logger.debug(f"蹇�熸娴嬪埌 {len(conflicts)} 涓啿绐�")
+        return conflicts
+
+
+# 纰版挒瑙e喅鍣�
+class FastCollisionResolver:
+    """纰版挒瑙e喅鍣�"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]], 
+                 collision_detector: FastCollisionDetector, agv_manager=None):
+        self.path_mapping = path_mapping
+        self.collision_detector = collision_detector
+        self.agv_manager = agv_manager
+        self.logger = logging.getLogger(__name__)
+    
+    def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Dict], executing_tasks: List[Dict] = None) -> List[Dict]:
+        """
+        瑙e喅鍐茬獊
+        
+        Args:
+            planned_paths: 瑙勫垝璺緞鍒楄〃
+            conflicts: 鍐茬獊鍒楄〃
+            executing_tasks: 鎵ц浠诲姟鍒楄〃
+            
+        Returns:
+            List[Dict]: 瑙e喅鍐茬獊鍚庣殑璺緞鍒楄〃
+        """
+        if not conflicts:
+            return planned_paths
+        
+        # 鏋勫缓璺緞瀛楀吀渚夸簬蹇�熻闂�
+        paths_dict = {path['agvId']: path for path in planned_paths}
+        
+        # 鎸夋椂闂存鎺掑簭澶勭悊鍐茬獊
+        sorted_conflicts = sorted(conflicts, key=lambda x: x.get('time_step', 0))
+        
+        for conflict in sorted_conflicts:
+            if conflict['type'] == 'position_conflict':
+                agv_ids = conflict['agv_ids']
+                time_step = conflict['time_step']
+                
+                # 绠�鍗曠瓥鐣ワ細淇濈暀绗竴涓狝GV锛屽叾浠栧欢杩�
+                sorted_agv_ids = sorted(agv_ids)
+                
+                for i, agv_id in enumerate(sorted_agv_ids[1:], 1):
+                    if agv_id in paths_dict:
+                        self._add_delay_to_path(paths_dict[agv_id], delay_steps=i)
+                        self.logger.debug(f"涓篈GV {agv_id} 娣诲姞 {i} 姝ュ欢杩�")
+        
+        return list(paths_dict.values())
+    
+    def _add_delay_to_path(self, path_data: Dict, delay_steps: int):
+        """涓鸿矾寰勬坊鍔犲欢杩熸楠�"""
+        if delay_steps <= 0 or not path_data.get('codeList'):
+            return
+        
+        # 鍦ㄨ矾寰勫紑濮嬪閲嶅绗竴涓綅缃�
+        first_code = path_data['codeList'][0]
+        
+        # 鍒涘缓寤惰繜浠g爜
+        if isinstance(first_code, dict):
+            delay_code = first_code.copy()
+        else:
+            delay_code = {
+                'code': first_code.code if hasattr(first_code, 'code') else str(first_code),
+                'direction': first_code.direction if hasattr(first_code, 'direction') else "90"
+            }
+        
+        # 娣诲姞寤惰繜姝ラ
+        for _ in range(delay_steps):
+            path_data['codeList'].insert(0, delay_code)
+    
+    def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]:
+        """楠岃瘉鍥涘悜绉诲姩绾︽潫"""
+        return planned_paths
+
+
+class PathPlanningFactory:
+    """璺緞瑙勫垝鍣ㄥ伐鍘傜被"""
+    
+    @staticmethod
+    def create_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]]) -> PathPlanner:
+        """
+        鍒涘缓璺緞瑙勫垝鍣ㄥ疄渚�
+        
+        Args:
+            algorithm_type: 绠楁硶绫诲瀷
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+            
+        Returns:
+            PathPlanner: 璺緞瑙勫垝鍣ㄥ疄渚�
+        """
+        algorithm_upper = algorithm_type.upper()
+        
+        if algorithm_upper in ["A_STAR", "ASTAR", "A*", "DIJKSTRA", "GREEDY"]:
+            return AStarPathPlanner(path_mapping)
+        else:
+            # 榛樿A*绠楁硶
+            logging.getLogger(__name__).warning(f"鏈煡绠楁硶绫诲瀷 {algorithm_type}锛屼娇鐢ㄩ粯璁*绠楁硶")
+            return AStarPathPlanner(path_mapping)
+    
+    @staticmethod
+    def create_batch_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]], 
+                                 agv_manager=None) -> BatchPathPlanner:
+        """
+        鍒涘缓鎵归噺璺緞瑙勫垝鍣ㄥ疄渚�
+        
+        Args:
+            algorithm_type: 鍩虹绠楁硶绫诲瀷
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+            agv_manager: AGV绠$悊鍣紝鐢ㄤ簬鑾峰彇AGV鐘舵�佷俊鎭�
+            
+        Returns:
+            BatchPathPlanner: 鎵归噺璺緞瑙勫垝鍣ㄥ疄渚�
+        """
+        return BatchPathPlanner(path_mapping, algorithm_type, agv_manager)
+
+# 瀵煎嚭鏍稿績绫诲拰鍑芥暟
+__all__ = [
+    'ExecutingTaskExtractor',
+    'BatchPathPlanner',
+    'PathPlanner',
+    'AStarPathPlanner',
+    'PathPlanningFactory',
+    'FastCollisionDetector',
+    'FastCollisionResolver'
+] 
\ No newline at end of file

--
Gitblit v1.9.1