From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001
From: zhang <zc857179121@qq.com>
Date: 星期三, 02 七月 2025 13:12:26 +0800
Subject: [PATCH] 初版提交

---
 algorithm_system/algorithms/collision_detection.py |  701 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 701 insertions(+), 0 deletions(-)

diff --git a/algorithm_system/algorithms/collision_detection.py b/algorithm_system/algorithms/collision_detection.py
new file mode 100644
index 0000000..5b099bf
--- /dev/null
+++ b/algorithm_system/algorithms/collision_detection.py
@@ -0,0 +1,701 @@
+"""
+AGV璺緞纰版挒妫�娴嬪拰瑙e喅妯″潡
+"""
+import math
+import logging
+from typing import Dict, List, Tuple, Optional, Set
+from dataclasses import dataclass
+from collections import defaultdict
+
+from common.data_models import PlannedPath, PathCode, AGVActionTypeEnum
+from common.utils import get_coordinate_from_path_id
+
+
+@dataclass
+class SpaceTimeNode:
+    """鏃剁┖鑺傜偣 - 琛ㄧずAGV鍦ㄧ壒瀹氭椂闂村拰浣嶇疆鐨勭姸鎬�"""
+    agv_id: str
+    position: str  # 浣嶇疆鐮�
+    coordinates: Tuple[int, int]  # 鍧愭爣
+    time_step: int  # 鏃堕棿姝�
+    direction: str  # 鏂瑰悜
+
+
+@dataclass
+class Conflict:
+    """鍐茬獊鎻忚堪"""
+    type: str  # 鍐茬獊绫诲瀷: "vertex", "edge", "follow"
+    agv1: str
+    agv2: str
+    time_step: int
+    position1: str
+    position2: str
+    description: str
+
+
+@dataclass
+class AGVPriority:
+    """AGV浼樺厛绾ц瘎浼扮粨鏋�"""
+    agv_id: str
+    priority_score: float  # 浼樺厛绾у垎鏁帮紝瓒婇珮瓒婁紭鍏�
+    task_status: str  # 浠诲姟鐘舵��
+    action_type: str  # 鍔ㄤ綔绫诲瀷
+    task_priority: int  # 浠诲姟浼樺厛绾�
+    voltage: int  # 鐢甸噺
+    explanation: str  # 浼樺厛绾ц鏄�
+
+
+class CollisionDetector:
+    """AGV璺緞纰版挒妫�娴嬪櫒"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]], 
+                 min_distance: float = 3.0, time_buffer: int = 1):
+        """
+        鍒濆鍖栫鎾炴娴嬪櫒
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+            min_distance: AGV涔嬮棿鐨勬渶灏忓畨鍏ㄨ窛绂�
+            time_buffer: 鏃堕棿缂撳啿鍖猴紙鏃堕棿姝ユ暟锛�
+        """
+        self.path_mapping = path_mapping
+        self.min_distance = min_distance
+        self.time_buffer = time_buffer
+        self.logger = logging.getLogger(__name__)
+    
+    def detect_conflicts(self, planned_paths: List[Dict]) -> List[Conflict]:
+        """
+        妫�娴嬫墍鏈堿GV璺緞涔嬮棿鐨勫啿绐�
+        
+        Args:
+            planned_paths: 瑙勫垝璺緞鍒楄〃
+            
+        Returns:
+            List[Conflict]: 鍐茬獊鍒楄〃
+        """
+        conflicts = []
+        
+        # 鏋勫缓鏃剁┖琛�
+        space_time_table = self._build_space_time_table(planned_paths)
+        
+        # 妫�娴嬮《鐐瑰啿绐侊紙鍚屼竴鏃堕棿鍚屼竴浣嶇疆锛�
+        conflicts.extend(self._detect_vertex_conflicts(space_time_table))
+        
+        # 妫�娴嬭竟鍐茬獊锛圓GV浜ゆ崲浣嶇疆锛�
+        conflicts.extend(self._detect_edge_conflicts(space_time_table))
+        
+        # 妫�娴嬭窡闅忓啿绐侊紙AGV璺濈杩囪繎锛�
+        conflicts.extend(self._detect_following_conflicts(space_time_table))
+        
+        self.logger.info(f"妫�娴嬪埌 {len(conflicts)} 涓矾寰勫啿绐�")
+        return conflicts
+    
+    def _build_space_time_table(self, planned_paths: List[Dict]) -> Dict[int, List[SpaceTimeNode]]:
+        """
+        鏋勫缓鏃剁┖琛�
+        
+        Args:
+            planned_paths: 瑙勫垝璺緞鍒楄〃
+            
+        Returns:
+            Dict[int, List[SpaceTimeNode]]: 鏃堕棿姝� -> 鏃剁┖鑺傜偣鍒楄〃
+        """
+        space_time_table = defaultdict(list)
+        
+        for path_data in planned_paths:
+            agv_id = path_data.get('agvId', '')
+            code_list = path_data.get('codeList', [])
+            
+            for time_step, path_code in enumerate(code_list):
+                position = path_code.get('code', '') if isinstance(path_code, dict) else path_code.code
+                direction = path_code.get('direction', '90') if isinstance(path_code, dict) else path_code.direction
+                
+                coordinates = get_coordinate_from_path_id(position, self.path_mapping)
+                if coordinates:
+                    node = SpaceTimeNode(
+                        agv_id=agv_id,
+                        position=position,
+                        coordinates=coordinates,
+                        time_step=time_step,
+                        direction=direction
+                    )
+                    space_time_table[time_step].append(node)
+        
+        return space_time_table
+    
+    def _detect_vertex_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
+        """妫�娴嬮《鐐瑰啿绐侊紙鍚屼竴鏃堕棿鍚屼竴浣嶇疆锛�"""
+        conflicts = []
+        
+        for time_step, nodes in space_time_table.items():
+            # 鎸変綅缃垎缁�
+            position_groups = defaultdict(list)
+            for node in nodes:
+                position_groups[node.position].append(node)
+            
+            # 妫�鏌ユ瘡涓綅缃槸鍚︽湁澶氫釜AGV
+            for position, agv_nodes in position_groups.items():
+                if len(agv_nodes) > 1:
+                    # 鍙戠幇鍐茬獊
+                    for i in range(len(agv_nodes)):
+                        for j in range(i + 1, len(agv_nodes)):
+                            conflict = Conflict(
+                                type="vertex",
+                                agv1=agv_nodes[i].agv_id,
+                                agv2=agv_nodes[j].agv_id,
+                                time_step=time_step,
+                                position1=position,
+                                position2=position,
+                                description=f"AGV {agv_nodes[i].agv_id} 鍜� {agv_nodes[j].agv_id} 鍦ㄦ椂闂� {time_step} 鍗犵敤鍚屼竴浣嶇疆 {position}"
+                            )
+                            conflicts.append(conflict)
+        
+        return conflicts
+    
+    def _detect_edge_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
+        """妫�娴嬭竟鍐茬獊锛圓GV浜ゆ崲浣嶇疆锛�"""
+        conflicts = []
+        
+        max_time = max(space_time_table.keys()) if space_time_table else 0
+        
+        for time_step in range(max_time):
+            current_nodes = space_time_table.get(time_step, [])
+            next_nodes = space_time_table.get(time_step + 1, [])
+            
+            # 鏋勫缓褰撳墠鍜屼笅涓�鏃跺埢鐨勪綅缃槧灏�
+            current_positions = {node.agv_id: node.position for node in current_nodes}
+            next_positions = {node.agv_id: node.position for node in next_nodes}
+            
+            # 妫�鏌ユ槸鍚︽湁AGV浜ゆ崲浣嶇疆
+            for agv1, pos1_current in current_positions.items():
+                for agv2, pos2_current in current_positions.items():
+                    if agv1 >= agv2:  # 閬垮厤閲嶅妫�鏌�
+                        continue
+                    
+                    pos1_next = next_positions.get(agv1)
+                    pos2_next = next_positions.get(agv2)
+                    
+                    if (pos1_next and pos2_next and 
+                        pos1_current == pos2_next and pos2_current == pos1_next):
+                        # 鍙戠幇杈瑰啿绐�
+                        conflict = Conflict(
+                            type="edge",
+                            agv1=agv1,
+                            agv2=agv2,
+                            time_step=time_step,
+                            position1=pos1_current,
+                            position2=pos2_current,
+                            description=f"AGV {agv1} 鍜� {agv2} 鍦ㄦ椂闂� {time_step}-{time_step+1} 浜ゆ崲浣嶇疆 {pos1_current}<->{pos2_current}"
+                        )
+                        conflicts.append(conflict)
+        
+        return conflicts
+    
+    def _detect_following_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
+        """妫�娴嬭窡闅忓啿绐侊紙AGV璺濈杩囪繎锛�"""
+        conflicts = []
+        
+        for time_step, nodes in space_time_table.items():
+            # 妫�鏌ユ墍鏈堿GV瀵逛箣闂寸殑璺濈
+            for i in range(len(nodes)):
+                for j in range(i + 1, len(nodes)):
+                    node1, node2 = nodes[i], nodes[j]
+                    
+                    # 璁$畻娆у嚑閲屽緱璺濈
+                    distance = math.sqrt(
+                        (node1.coordinates[0] - node2.coordinates[0]) ** 2 +
+                        (node1.coordinates[1] - node2.coordinates[1]) ** 2
+                    )
+                    
+                    if distance < self.min_distance and distance > 0:
+                        conflict = Conflict(
+                            type="follow",
+                            agv1=node1.agv_id,
+                            agv2=node2.agv_id,
+                            time_step=time_step,
+                            position1=node1.position,
+                            position2=node2.position,
+                            description=f"AGV {node1.agv_id} 鍜� {node2.agv_id} 鍦ㄦ椂闂� {time_step} 璺濈杩囪繎 ({distance:.2f} < {self.min_distance})"
+                        )
+                        conflicts.append(conflict)
+        
+        return conflicts
+
+
+class CollisionResolver:
+    """AGV璺緞纰版挒瑙e喅鍣�"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]], detector: CollisionDetector, agv_manager=None):
+        """
+        鍒濆鍖栫鎾炶В鍐冲櫒
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+            detector: 纰版挒妫�娴嬪櫒
+            agv_manager: AGV绠$悊鍣紝鐢ㄤ簬鑾峰彇AGV鐘舵�佷俊鎭�
+        """
+        self.path_mapping = path_mapping
+        self.detector = detector
+        self.agv_manager = agv_manager
+        self.logger = logging.getLogger(__name__)
+    
+    def evaluate_agv_priority(self, agv_id: str, path: Dict, executing_tasks: List[Dict] = None) -> AGVPriority:
+        """
+        璇勪及AGV鐨勯伩璁╀紭鍏堢骇
+        
+        Args:
+            agv_id: AGV ID
+            path: AGV璺緞淇℃伅
+            executing_tasks: 鎵ц涓殑浠诲姟鍒楄〃
+            
+        Returns:
+            AGVPriority: AGV浼樺厛绾ц瘎浼扮粨鏋�
+        """
+        # 榛樿鍊�
+        task_status = "idle"
+        action_type = "unknown"
+        task_priority = 1
+        voltage = 100
+        priority_score = 0.0
+        explanation_parts = []
+        
+        # 鑾峰彇AGV鐘舵�佷俊鎭�
+        agv_model = None
+        if self.agv_manager:
+            agv_model = self.agv_manager.get_agv_by_id(agv_id)
+            if agv_model:
+                voltage = agv_model.voltage
+                
+        # 浠庤矾寰勪俊鎭腑鑾峰彇鍔ㄤ綔绫诲瀷
+        code_list = path.get('codeList', [])
+        if code_list:
+            first_code = code_list[0]
+            if isinstance(first_code, dict):
+                action_type = first_code.get('type', 'unknown')
+                
+        # 浠庢墽琛屼腑浠诲姟鑾峰彇浠诲姟鐘舵�佸拰浼樺厛绾�
+        if executing_tasks:
+            for task in executing_tasks:
+                if task.get('agvId') == agv_id:
+                    task_status = task.get('status', 'idle')
+                    # 灏濊瘯浠庝换鍔D鑾峰彇浼樺厛绾э紙绠�鍖栧鐞嗭級
+                    task_id = task.get('taskId', '')
+                    if 'HIGH_PRIORITY' in task_id:
+                        task_priority = 10
+                    elif 'PRIORITY' in task_id:
+                        task_priority = 8
+                    else:
+                        task_priority = 5
+                    break
+        
+        # 璁$畻浼樺厛绾у垎鏁帮紙鍒嗘暟瓒婇珮浼樺厛绾ц秺楂橈紝瓒婁笉瀹规槗璁╂锛�
+        priority_score = 0.0
+        
+        # 1. 浠诲姟鐘舵�佷紭鍏堢骇 (40%鏉冮噸)
+        if task_status == "executing":
+            priority_score += 40.0
+            explanation_parts.append("鎵ц浠诲姟涓�(+40)")
+        elif task_status == "assigned":
+            priority_score += 20.0
+            explanation_parts.append("宸插垎閰嶄换鍔�(+20)")
+        else:
+            priority_score += 5.0
+            explanation_parts.append("绌洪棽鐘舵��(+5)")
+        
+        # 2. 鍔ㄤ綔绫诲瀷浼樺厛绾� (30%鏉冮噸)
+        if action_type == AGVActionTypeEnum.TASK.value:
+            priority_score += 30.0
+            explanation_parts.append("浠诲姟琛屼负(+30)")
+        elif action_type == AGVActionTypeEnum.CHARGING.value:
+            priority_score += 25.0
+            explanation_parts.append("鍏呯數琛屼负(+25)")
+        elif action_type == AGVActionTypeEnum.AVOIDANCE.value:
+            priority_score += 5.0
+            explanation_parts.append("閬胯琛屼负(+5)")
+        elif action_type == AGVActionTypeEnum.STANDBY.value:
+            priority_score += 10.0
+            explanation_parts.append("寰呮満琛屼负(+10)")
+        else:
+            priority_score += 15.0
+            explanation_parts.append("鏈煡琛屼负(+15)")
+        
+        # 3. 浠诲姟浼樺厛绾� (20%鏉冮噸)
+        task_priority_score = (task_priority / 10.0) * 20.0
+        priority_score += task_priority_score
+        explanation_parts.append(f"浠诲姟浼樺厛绾task_priority}(+{task_priority_score:.1f})")
+        
+        # 4. 鐢甸噺鐘舵�� (10%鏉冮噸)
+        if voltage <= 10:  # 蹇呴』鍏呯數
+            priority_score += 10.0
+            explanation_parts.append("鐢甸噺鍗遍櫓(+10)")
+        elif voltage <= 20:  # 鍙嚜鍔ㄥ厖鐢�
+            priority_score += 8.0
+            explanation_parts.append("鐢甸噺鍋忎綆(+8)")
+        elif voltage <= 50:
+            priority_score += 5.0
+            explanation_parts.append("鐢甸噺涓�鑸�(+5)")
+        else:
+            priority_score += 2.0
+            explanation_parts.append("鐢甸噺鍏呰冻(+2)")
+        
+        explanation = f"鎬诲垎{priority_score:.1f}: " + ", ".join(explanation_parts)
+        
+        return AGVPriority(
+            agv_id=agv_id,
+            priority_score=priority_score,
+            task_status=task_status,
+            action_type=action_type,
+            task_priority=task_priority,
+            voltage=voltage,
+            explanation=explanation
+        )
+    
+    def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Conflict], executing_tasks: List[Dict] = None) -> List[Dict]:
+        """
+        瑙e喅鍐茬獊
+        
+        Args:
+            planned_paths: 鍘熷瑙勫垝璺緞
+            conflicts: 鍐茬獊鍒楄〃
+            executing_tasks: 鎵ц涓殑浠诲姟鍒楄〃
+            
+        Returns:
+            List[Dict]: 瑙e喅鍐茬獊鍚庣殑璺緞
+        """
+        if not conflicts:
+            return planned_paths
+        
+        resolved_paths = [path.copy() for path in planned_paths]
+        
+        # 鎸夋椂闂存鎺掑簭鍐茬獊锛屼紭鍏堣В鍐虫棭鏈熷啿绐�
+        conflicts_sorted = sorted(conflicts, key=lambda c: c.time_step)
+        
+        for conflict in conflicts_sorted:
+            resolved_paths = self._resolve_single_conflict(resolved_paths, conflict, executing_tasks)
+        
+        self.logger.info(f"瑙e喅浜� {len(conflicts)} 涓矾寰勫啿绐�")
+        return resolved_paths
+    
+    def _resolve_single_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
+        """
+        瑙e喅鍗曚釜鍐茬獊
+        
+        Args:
+            paths: 褰撳墠璺緞鍒楄〃
+            conflict: 鍐茬獊
+            executing_tasks: 鎵ц涓殑浠诲姟鍒楄〃
+            
+        Returns:
+            List[Dict]: 鏇存柊鍚庣殑璺緞鍒楄〃
+        """
+        if conflict.type == "vertex":
+            return self._resolve_vertex_conflict(paths, conflict, executing_tasks)
+        elif conflict.type == "edge":
+            return self._resolve_edge_conflict(paths, conflict, executing_tasks)
+        elif conflict.type == "follow":
+            return self._resolve_following_conflict(paths, conflict, executing_tasks)
+        
+        return paths
+    
+    def _resolve_vertex_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
+        """瑙e喅椤剁偣鍐茬獊 - 鍩轰簬浼樺厛绾ц瘎浼伴�夋嫨璁╂鐨凙GV"""
+        updated_paths = paths.copy()
+        
+        # 鎵惧埌鍐茬獊鐨凙GV璺緞
+        agv1_path = None
+        agv2_path = None
+        agv1_index = agv2_index = -1
+        
+        for i, path in enumerate(updated_paths):
+            if path.get('agvId') == conflict.agv1:
+                agv1_path = path
+                agv1_index = i
+            elif path.get('agvId') == conflict.agv2:
+                agv2_path = path
+                agv2_index = i
+        
+        if not agv1_path or not agv2_path:
+            return updated_paths
+        
+        # 璇勪及涓や釜AGV鐨勪紭鍏堢骇
+        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
+        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
+        
+        # 閫夋嫨浼樺厛绾ц緝浣庣殑AGV杩涜绛夊緟
+        if agv1_priority.priority_score <= agv2_priority.priority_score:
+            waiting_agv_path = agv1_path
+            waiting_agv_index = agv1_index
+            waiting_agv_id = conflict.agv1
+            higher_priority_agv_id = conflict.agv2
+        else:
+            waiting_agv_path = agv2_path
+            waiting_agv_index = agv2_index
+            waiting_agv_id = conflict.agv2
+            higher_priority_agv_id = conflict.agv1
+        
+        # 璁板綍璇︾粏鐨勯伩璁╁喅绛栦俊鎭�
+        self.logger.info(f"椤剁偣鍐茬獊閬胯鍐崇瓥 - 浣嶇疆: {conflict.position1}, 鏃堕棿: {conflict.time_step}")
+        self.logger.info(f"  AGV {conflict.agv1}: {agv1_priority.explanation}")
+        self.logger.info(f"  AGV {conflict.agv2}: {agv2_priority.explanation}")
+        self.logger.info(f"  鍐崇瓥: AGV {waiting_agv_id} 璁╂缁� AGV {higher_priority_agv_id}")
+        
+        # 鍦ㄥ啿绐佷綅缃箣鍓嶆彃鍏ョ瓑寰呮楠�
+        self._insert_wait_step(waiting_agv_path, conflict.time_step)
+        updated_paths[waiting_agv_index] = waiting_agv_path
+        
+        return updated_paths
+    
+    def _resolve_edge_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
+        """瑙e喅杈瑰啿绐� - 鍩轰簬浼樺厛绾ц瘎浼伴�夋嫨璁╂鐨凙GV"""
+        updated_paths = paths.copy()
+        
+        # 鎵惧埌鍐茬獊鐨凙GV璺緞
+        agv1_path = None
+        agv2_path = None
+        agv1_index = agv2_index = -1
+        
+        for i, path in enumerate(updated_paths):
+            if path.get('agvId') == conflict.agv1:
+                agv1_path = path
+                agv1_index = i
+            elif path.get('agvId') == conflict.agv2:
+                agv2_path = path
+                agv2_index = i
+        
+        if not agv1_path or not agv2_path:
+            return updated_paths
+        
+        # 璇勪及涓や釜AGV鐨勪紭鍏堢骇
+        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
+        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
+        
+        # 閫夋嫨浼樺厛绾ц緝浣庣殑AGV杩涜绛夊緟
+        if agv1_priority.priority_score <= agv2_priority.priority_score:
+            waiting_agv_path = agv1_path
+            waiting_agv_index = agv1_index
+            waiting_agv_id = conflict.agv1
+            higher_priority_agv_id = conflict.agv2
+        else:
+            waiting_agv_path = agv2_path
+            waiting_agv_index = agv2_index
+            waiting_agv_id = conflict.agv2
+            higher_priority_agv_id = conflict.agv1
+        
+        # 璁板綍璇︾粏鐨勯伩璁╁喅绛栦俊鎭�
+        self.logger.info(f"杈瑰啿绐侀伩璁╁喅绛� - 浣嶇疆浜ゆ崲: {conflict.position1}<->{conflict.position2}, 鏃堕棿: {conflict.time_step}")
+        self.logger.info(f"  AGV {conflict.agv1}: {agv1_priority.explanation}")
+        self.logger.info(f"  AGV {conflict.agv2}: {agv2_priority.explanation}")
+        self.logger.info(f"  鍐崇瓥: AGV {waiting_agv_id} 璁╂缁� AGV {higher_priority_agv_id}")
+        
+        # 鍦ㄥ啿绐佷綅缃箣鍓嶆彃鍏ョ瓑寰呮楠�
+        self._insert_wait_step(waiting_agv_path, conflict.time_step)
+        updated_paths[waiting_agv_index] = waiting_agv_path
+        
+        return updated_paths
+    
+    def _resolve_following_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
+        """瑙e喅璺熼殢鍐茬獊 - 鍩轰簬浼樺厛绾ц瘎浼伴�夋嫨璁╂鐨凙GV"""
+        updated_paths = paths.copy()
+        
+        # 鎵惧埌鍐茬獊鐨凙GV璺緞
+        agv1_path = None
+        agv2_path = None
+        agv1_index = agv2_index = -1
+        
+        for i, path in enumerate(updated_paths):
+            if path.get('agvId') == conflict.agv1:
+                agv1_path = path
+                agv1_index = i
+            elif path.get('agvId') == conflict.agv2:
+                agv2_path = path
+                agv2_index = i
+        
+        if not agv1_path or not agv2_path:
+            return updated_paths
+        
+        # 璇勪及涓や釜AGV鐨勪紭鍏堢骇
+        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
+        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
+        
+        # 閫夋嫨浼樺厛绾ц緝浣庣殑AGV杩涜绛夊緟
+        if agv1_priority.priority_score <= agv2_priority.priority_score:
+            waiting_agv_path = agv1_path
+            waiting_agv_index = agv1_index
+            waiting_agv_id = conflict.agv1
+            higher_priority_agv_id = conflict.agv2
+        else:
+            waiting_agv_path = agv2_path
+            waiting_agv_index = agv2_index
+            waiting_agv_id = conflict.agv2
+            higher_priority_agv_id = conflict.agv1
+        
+        # 璁板綍璇︾粏鐨勯伩璁╁喅绛栦俊鎭�
+        self.logger.info(f"璺熼殢鍐茬獊閬胯鍐崇瓥 - 璺濈杩囪繎, 鏃堕棿: {conflict.time_step}")
+        self.logger.info(f"  AGV {conflict.agv1} (浣嶇疆: {conflict.position1}): {agv1_priority.explanation}")
+        self.logger.info(f"  AGV {conflict.agv2} (浣嶇疆: {conflict.position2}): {agv2_priority.explanation}")
+        self.logger.info(f"  鍐崇瓥: AGV {waiting_agv_id} 璁╂缁� AGV {higher_priority_agv_id}")
+        
+        # 鍦ㄥ啿绐佷綅缃箣鍓嶆彃鍏ョ瓑寰呮楠�
+        self._insert_wait_step(waiting_agv_path, conflict.time_step)
+        updated_paths[waiting_agv_index] = waiting_agv_path
+        
+        return updated_paths
+    
+    def _insert_wait_step(self, path: Dict, time_step: int):
+        """
+        鍦ㄦ寚瀹氭椂闂存鎻掑叆绛夊緟姝ラ
+        
+        Args:
+            path: AGV璺緞
+            time_step: 鎻掑叆浣嶇疆鐨勬椂闂存
+        """
+        code_list = path.get('codeList', [])
+        
+        if time_step < len(code_list) and time_step > 0:
+            # 鍦ㄦ寚瀹氫綅缃彃鍏ョ瓑寰呮楠わ紙閲嶅鍓嶄竴涓綅缃級
+            prev_code = code_list[time_step - 1]
+            
+            # 纭繚澶嶅埗鎵�鏈夊瓧娈�
+            if isinstance(prev_code, dict):
+                wait_code = prev_code.copy()
+            else:
+                wait_code = {
+                    'code': prev_code.code,
+                    'direction': prev_code.direction
+                }
+            
+            code_list.insert(time_step, wait_code)
+            path['codeList'] = code_list
+    
+    def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]:
+        """
+        楠岃瘉骞朵慨姝h矾寰勶紝纭繚AGV鍙兘浠�0銆�90銆�180銆�270搴︾Щ鍔�
+        
+        Args:
+            planned_paths: 瑙勫垝璺緞鍒楄〃
+            
+        Returns:
+            List[Dict]: 淇鍚庣殑璺緞鍒楄〃
+        """
+        validated_paths = []
+        
+        for path_data in planned_paths:
+            validated_path = self._validate_single_path_directions(path_data)
+            validated_paths.append(validated_path)
+        
+        return validated_paths
+    
+    def _validate_single_path_directions(self, path_data: Dict) -> Dict:
+        """
+        楠岃瘉鍗曚釜璺緞鐨勭Щ鍔ㄦ柟鍚�
+        
+        Args:
+            path_data: 鍗曚釜AGV璺緞鏁版嵁
+            
+        Returns:
+            Dict: 淇鍚庣殑璺緞鏁版嵁
+        """
+        validated_path = path_data.copy()
+        code_list = path_data.get('codeList', [])
+        
+        if len(code_list) < 2:
+            return validated_path
+        
+        validated_code_list = []
+        
+        for i in range(len(code_list)):
+            current_code = code_list[i]
+            
+            if isinstance(current_code, dict):
+                position = current_code.get('code', '')
+                direction = current_code.get('direction', '90')
+                
+                # 淇濈暀鍘熸湁鐨勬墍鏈夊瓧娈�
+                validated_code = current_code.copy()
+            else:
+                position = current_code.code
+                direction = current_code.direction
+                
+                # 杞崲涓哄瓧鍏告牸寮忓苟淇濈暀鍩烘湰瀛楁
+                validated_code = {
+                    'code': position,
+                    'direction': direction
+                }
+            
+            # 濡傛灉涓嶆槸绗竴涓偣锛岃绠楁纭殑鏂瑰悜
+            if i > 0:
+                prev_code = code_list[i - 1]
+                prev_position = prev_code.get('code', '') if isinstance(prev_code, dict) else prev_code.code
+                
+                # 璁$畻绉诲姩鏂瑰悜
+                correct_direction = self._calculate_movement_direction(prev_position, position)
+                if correct_direction:
+                    direction = correct_direction
+            
+            # 纭繚鏂瑰悜鏄湁鏁堢殑鍥涙柟鍚戜箣涓�
+            direction = self._normalize_direction(direction)
+            
+            # 鏇存柊鏂瑰悜锛屼繚鐣欏叾浠栨墍鏈夊瓧娈�
+            validated_code['direction'] = direction
+            validated_code_list.append(validated_code)
+        
+        validated_path['codeList'] = validated_code_list
+        return validated_path
+    
+    def _calculate_movement_direction(self, from_position: str, to_position: str) -> Optional[str]:
+        """
+        璁$畻浠庝竴涓綅缃埌鍙︿竴涓綅缃殑绉诲姩鏂瑰悜
+        
+        Args:
+            from_position: 璧峰浣嶇疆鐮�
+            to_position: 鐩爣浣嶇疆鐮�
+            
+        Returns:
+            Optional[str]: 绉诲姩鏂瑰悜锛�0, 90, 180, 270锛�
+        """
+        from_coord = get_coordinate_from_path_id(from_position, self.path_mapping)
+        to_coord = get_coordinate_from_path_id(to_position, self.path_mapping)
+        
+        if not from_coord or not to_coord:
+            return None
+        
+        dx = to_coord[0] - from_coord[0]
+        dy = to_coord[1] - from_coord[1]
+        
+        # 鍙厑璁稿洓涓柟鍚戠殑绉诲姩
+        if dx > 0 and dy == 0:
+            return "0"    # 鍚戜笢
+        elif dx < 0 and dy == 0:
+            return "180"  # 鍚戣タ
+        elif dx == 0 and dy > 0:
+            return "90"   # 鍚戝崡
+        elif dx == 0 and dy < 0:
+            return "270"  # 鍚戝寳
+        else:
+            # 涓嶆槸鏍囧噯鍥涙柟鍚戠Щ鍔紝杩斿洖榛樿鏂瑰悜
+            return "90"
+    
+    def _normalize_direction(self, direction: str) -> str:
+        """
+        鏍囧噯鍖栨柟鍚戝�硷紝纭繚鍙湁0銆�90銆�180銆�270
+        
+        Args:
+            direction: 鍘熷鏂瑰悜鍊�
+            
+        Returns:
+            str: 鏍囧噯鍖栧悗鐨勬柟鍚戝��
+        """
+        try:
+            angle = int(direction) % 360
+            
+            # 灏嗚搴︽槧灏勫埌鏈�杩戠殑鍥涙柟鍚�
+            if angle <= 45 or angle > 315:
+                return "0"
+            elif 45 < angle <= 135:
+                return "90"
+            elif 135 < angle <= 225:
+                return "180"
+            else:
+                return "270"
+        except:
+            return "90"  # 榛樿鏂瑰悜 
\ No newline at end of file

--
Gitblit v1.9.1