From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001 From: zhang <zc857179121@qq.com> Date: 星期三, 02 七月 2025 13:12:26 +0800 Subject: [PATCH] 初版提交 --- algorithm_system/algorithms/collision_detection.py | 701 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 701 insertions(+), 0 deletions(-) diff --git a/algorithm_system/algorithms/collision_detection.py b/algorithm_system/algorithms/collision_detection.py new file mode 100644 index 0000000..5b099bf --- /dev/null +++ b/algorithm_system/algorithms/collision_detection.py @@ -0,0 +1,701 @@ +""" +AGV璺緞纰版挒妫�娴嬪拰瑙e喅妯″潡 +""" +import math +import logging +from typing import Dict, List, Tuple, Optional, Set +from dataclasses import dataclass +from collections import defaultdict + +from common.data_models import PlannedPath, PathCode, AGVActionTypeEnum +from common.utils import get_coordinate_from_path_id + + +@dataclass +class SpaceTimeNode: + """鏃剁┖鑺傜偣 - 琛ㄧずAGV鍦ㄧ壒瀹氭椂闂村拰浣嶇疆鐨勭姸鎬�""" + agv_id: str + position: str # 浣嶇疆鐮� + coordinates: Tuple[int, int] # 鍧愭爣 + time_step: int # 鏃堕棿姝� + direction: str # 鏂瑰悜 + + +@dataclass +class Conflict: + """鍐茬獊鎻忚堪""" + type: str # 鍐茬獊绫诲瀷: "vertex", "edge", "follow" + agv1: str + agv2: str + time_step: int + position1: str + position2: str + description: str + + +@dataclass +class AGVPriority: + """AGV浼樺厛绾ц瘎浼扮粨鏋�""" + agv_id: str + priority_score: float # 浼樺厛绾у垎鏁帮紝瓒婇珮瓒婁紭鍏� + task_status: str # 浠诲姟鐘舵�� + action_type: str # 鍔ㄤ綔绫诲瀷 + task_priority: int # 浠诲姟浼樺厛绾� + voltage: int # 鐢甸噺 + explanation: str # 浼樺厛绾ц鏄� + + +class CollisionDetector: + """AGV璺緞纰版挒妫�娴嬪櫒""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]], + min_distance: float = 3.0, time_buffer: int = 1): + """ + 鍒濆鍖栫鎾炴娴嬪櫒 + + Args: + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + min_distance: AGV涔嬮棿鐨勬渶灏忓畨鍏ㄨ窛绂� + time_buffer: 鏃堕棿缂撳啿鍖猴紙鏃堕棿姝ユ暟锛� + """ + self.path_mapping = path_mapping + self.min_distance = min_distance + self.time_buffer = time_buffer + self.logger = logging.getLogger(__name__) + + def detect_conflicts(self, planned_paths: List[Dict]) -> List[Conflict]: + """ + 妫�娴嬫墍鏈堿GV璺緞涔嬮棿鐨勫啿绐� + + Args: + planned_paths: 瑙勫垝璺緞鍒楄〃 + + Returns: + List[Conflict]: 鍐茬獊鍒楄〃 + """ + conflicts = [] + + # 鏋勫缓鏃剁┖琛� + space_time_table = self._build_space_time_table(planned_paths) + + # 妫�娴嬮《鐐瑰啿绐侊紙鍚屼竴鏃堕棿鍚屼竴浣嶇疆锛� + conflicts.extend(self._detect_vertex_conflicts(space_time_table)) + + # 妫�娴嬭竟鍐茬獊锛圓GV浜ゆ崲浣嶇疆锛� + conflicts.extend(self._detect_edge_conflicts(space_time_table)) + + # 妫�娴嬭窡闅忓啿绐侊紙AGV璺濈杩囪繎锛� + conflicts.extend(self._detect_following_conflicts(space_time_table)) + + self.logger.info(f"妫�娴嬪埌 {len(conflicts)} 涓矾寰勫啿绐�") + return conflicts + + def _build_space_time_table(self, planned_paths: List[Dict]) -> Dict[int, List[SpaceTimeNode]]: + """ + 鏋勫缓鏃剁┖琛� + + Args: + planned_paths: 瑙勫垝璺緞鍒楄〃 + + Returns: + Dict[int, List[SpaceTimeNode]]: 鏃堕棿姝� -> 鏃剁┖鑺傜偣鍒楄〃 + """ + space_time_table = defaultdict(list) + + for path_data in planned_paths: + agv_id = path_data.get('agvId', '') + code_list = path_data.get('codeList', []) + + for time_step, path_code in enumerate(code_list): + position = path_code.get('code', '') if isinstance(path_code, dict) else path_code.code + direction = path_code.get('direction', '90') if isinstance(path_code, dict) else path_code.direction + + coordinates = get_coordinate_from_path_id(position, self.path_mapping) + if coordinates: + node = SpaceTimeNode( + agv_id=agv_id, + position=position, + coordinates=coordinates, + time_step=time_step, + direction=direction + ) + space_time_table[time_step].append(node) + + return space_time_table + + def _detect_vertex_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]: + """妫�娴嬮《鐐瑰啿绐侊紙鍚屼竴鏃堕棿鍚屼竴浣嶇疆锛�""" + conflicts = [] + + for time_step, nodes in space_time_table.items(): + # 鎸変綅缃垎缁� + position_groups = defaultdict(list) + for node in nodes: + position_groups[node.position].append(node) + + # 妫�鏌ユ瘡涓綅缃槸鍚︽湁澶氫釜AGV + for position, agv_nodes in position_groups.items(): + if len(agv_nodes) > 1: + # 鍙戠幇鍐茬獊 + for i in range(len(agv_nodes)): + for j in range(i + 1, len(agv_nodes)): + conflict = Conflict( + type="vertex", + agv1=agv_nodes[i].agv_id, + agv2=agv_nodes[j].agv_id, + time_step=time_step, + position1=position, + position2=position, + description=f"AGV {agv_nodes[i].agv_id} 鍜� {agv_nodes[j].agv_id} 鍦ㄦ椂闂� {time_step} 鍗犵敤鍚屼竴浣嶇疆 {position}" + ) + conflicts.append(conflict) + + return conflicts + + def _detect_edge_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]: + """妫�娴嬭竟鍐茬獊锛圓GV浜ゆ崲浣嶇疆锛�""" + conflicts = [] + + max_time = max(space_time_table.keys()) if space_time_table else 0 + + for time_step in range(max_time): + current_nodes = space_time_table.get(time_step, []) + next_nodes = space_time_table.get(time_step + 1, []) + + # 鏋勫缓褰撳墠鍜屼笅涓�鏃跺埢鐨勪綅缃槧灏� + current_positions = {node.agv_id: node.position for node in current_nodes} + next_positions = {node.agv_id: node.position for node in next_nodes} + + # 妫�鏌ユ槸鍚︽湁AGV浜ゆ崲浣嶇疆 + for agv1, pos1_current in current_positions.items(): + for agv2, pos2_current in current_positions.items(): + if agv1 >= agv2: # 閬垮厤閲嶅妫�鏌� + continue + + pos1_next = next_positions.get(agv1) + pos2_next = next_positions.get(agv2) + + if (pos1_next and pos2_next and + pos1_current == pos2_next and pos2_current == pos1_next): + # 鍙戠幇杈瑰啿绐� + conflict = Conflict( + type="edge", + agv1=agv1, + agv2=agv2, + time_step=time_step, + position1=pos1_current, + position2=pos2_current, + description=f"AGV {agv1} 鍜� {agv2} 鍦ㄦ椂闂� {time_step}-{time_step+1} 浜ゆ崲浣嶇疆 {pos1_current}<->{pos2_current}" + ) + conflicts.append(conflict) + + return conflicts + + def _detect_following_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]: + """妫�娴嬭窡闅忓啿绐侊紙AGV璺濈杩囪繎锛�""" + conflicts = [] + + for time_step, nodes in space_time_table.items(): + # 妫�鏌ユ墍鏈堿GV瀵逛箣闂寸殑璺濈 + for i in range(len(nodes)): + for j in range(i + 1, len(nodes)): + node1, node2 = nodes[i], nodes[j] + + # 璁$畻娆у嚑閲屽緱璺濈 + distance = math.sqrt( + (node1.coordinates[0] - node2.coordinates[0]) ** 2 + + (node1.coordinates[1] - node2.coordinates[1]) ** 2 + ) + + if distance < self.min_distance and distance > 0: + conflict = Conflict( + type="follow", + agv1=node1.agv_id, + agv2=node2.agv_id, + time_step=time_step, + position1=node1.position, + position2=node2.position, + description=f"AGV {node1.agv_id} 鍜� {node2.agv_id} 鍦ㄦ椂闂� {time_step} 璺濈杩囪繎 ({distance:.2f} < {self.min_distance})" + ) + conflicts.append(conflict) + + return conflicts + + +class CollisionResolver: + """AGV璺緞纰版挒瑙e喅鍣�""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]], detector: CollisionDetector, agv_manager=None): + """ + 鍒濆鍖栫鎾炶В鍐冲櫒 + + Args: + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + detector: 纰版挒妫�娴嬪櫒 + agv_manager: AGV绠$悊鍣紝鐢ㄤ簬鑾峰彇AGV鐘舵�佷俊鎭� + """ + self.path_mapping = path_mapping + self.detector = detector + self.agv_manager = agv_manager + self.logger = logging.getLogger(__name__) + + def evaluate_agv_priority(self, agv_id: str, path: Dict, executing_tasks: List[Dict] = None) -> AGVPriority: + """ + 璇勪及AGV鐨勯伩璁╀紭鍏堢骇 + + Args: + agv_id: AGV ID + path: AGV璺緞淇℃伅 + executing_tasks: 鎵ц涓殑浠诲姟鍒楄〃 + + Returns: + AGVPriority: AGV浼樺厛绾ц瘎浼扮粨鏋� + """ + # 榛樿鍊� + task_status = "idle" + action_type = "unknown" + task_priority = 1 + voltage = 100 + priority_score = 0.0 + explanation_parts = [] + + # 鑾峰彇AGV鐘舵�佷俊鎭� + agv_model = None + if self.agv_manager: + agv_model = self.agv_manager.get_agv_by_id(agv_id) + if agv_model: + voltage = agv_model.voltage + + # 浠庤矾寰勪俊鎭腑鑾峰彇鍔ㄤ綔绫诲瀷 + code_list = path.get('codeList', []) + if code_list: + first_code = code_list[0] + if isinstance(first_code, dict): + action_type = first_code.get('type', 'unknown') + + # 浠庢墽琛屼腑浠诲姟鑾峰彇浠诲姟鐘舵�佸拰浼樺厛绾� + if executing_tasks: + for task in executing_tasks: + if task.get('agvId') == agv_id: + task_status = task.get('status', 'idle') + # 灏濊瘯浠庝换鍔D鑾峰彇浼樺厛绾э紙绠�鍖栧鐞嗭級 + task_id = task.get('taskId', '') + if 'HIGH_PRIORITY' in task_id: + task_priority = 10 + elif 'PRIORITY' in task_id: + task_priority = 8 + else: + task_priority = 5 + break + + # 璁$畻浼樺厛绾у垎鏁帮紙鍒嗘暟瓒婇珮浼樺厛绾ц秺楂橈紝瓒婁笉瀹规槗璁╂锛� + priority_score = 0.0 + + # 1. 浠诲姟鐘舵�佷紭鍏堢骇 (40%鏉冮噸) + if task_status == "executing": + priority_score += 40.0 + explanation_parts.append("鎵ц浠诲姟涓�(+40)") + elif task_status == "assigned": + priority_score += 20.0 + explanation_parts.append("宸插垎閰嶄换鍔�(+20)") + else: + priority_score += 5.0 + explanation_parts.append("绌洪棽鐘舵��(+5)") + + # 2. 鍔ㄤ綔绫诲瀷浼樺厛绾� (30%鏉冮噸) + if action_type == AGVActionTypeEnum.TASK.value: + priority_score += 30.0 + explanation_parts.append("浠诲姟琛屼负(+30)") + elif action_type == AGVActionTypeEnum.CHARGING.value: + priority_score += 25.0 + explanation_parts.append("鍏呯數琛屼负(+25)") + elif action_type == AGVActionTypeEnum.AVOIDANCE.value: + priority_score += 5.0 + explanation_parts.append("閬胯琛屼负(+5)") + elif action_type == AGVActionTypeEnum.STANDBY.value: + priority_score += 10.0 + explanation_parts.append("寰呮満琛屼负(+10)") + else: + priority_score += 15.0 + explanation_parts.append("鏈煡琛屼负(+15)") + + # 3. 浠诲姟浼樺厛绾� (20%鏉冮噸) + task_priority_score = (task_priority / 10.0) * 20.0 + priority_score += task_priority_score + explanation_parts.append(f"浠诲姟浼樺厛绾task_priority}(+{task_priority_score:.1f})") + + # 4. 鐢甸噺鐘舵�� (10%鏉冮噸) + if voltage <= 10: # 蹇呴』鍏呯數 + priority_score += 10.0 + explanation_parts.append("鐢甸噺鍗遍櫓(+10)") + elif voltage <= 20: # 鍙嚜鍔ㄥ厖鐢� + priority_score += 8.0 + explanation_parts.append("鐢甸噺鍋忎綆(+8)") + elif voltage <= 50: + priority_score += 5.0 + explanation_parts.append("鐢甸噺涓�鑸�(+5)") + else: + priority_score += 2.0 + explanation_parts.append("鐢甸噺鍏呰冻(+2)") + + explanation = f"鎬诲垎{priority_score:.1f}: " + ", ".join(explanation_parts) + + return AGVPriority( + agv_id=agv_id, + priority_score=priority_score, + task_status=task_status, + action_type=action_type, + task_priority=task_priority, + voltage=voltage, + explanation=explanation + ) + + def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Conflict], executing_tasks: List[Dict] = None) -> List[Dict]: + """ + 瑙e喅鍐茬獊 + + Args: + planned_paths: 鍘熷瑙勫垝璺緞 + conflicts: 鍐茬獊鍒楄〃 + executing_tasks: 鎵ц涓殑浠诲姟鍒楄〃 + + Returns: + List[Dict]: 瑙e喅鍐茬獊鍚庣殑璺緞 + """ + if not conflicts: + return planned_paths + + resolved_paths = [path.copy() for path in planned_paths] + + # 鎸夋椂闂存鎺掑簭鍐茬獊锛屼紭鍏堣В鍐虫棭鏈熷啿绐� + conflicts_sorted = sorted(conflicts, key=lambda c: c.time_step) + + for conflict in conflicts_sorted: + resolved_paths = self._resolve_single_conflict(resolved_paths, conflict, executing_tasks) + + self.logger.info(f"瑙e喅浜� {len(conflicts)} 涓矾寰勫啿绐�") + return resolved_paths + + def _resolve_single_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: + """ + 瑙e喅鍗曚釜鍐茬獊 + + Args: + paths: 褰撳墠璺緞鍒楄〃 + conflict: 鍐茬獊 + executing_tasks: 鎵ц涓殑浠诲姟鍒楄〃 + + Returns: + List[Dict]: 鏇存柊鍚庣殑璺緞鍒楄〃 + """ + if conflict.type == "vertex": + return self._resolve_vertex_conflict(paths, conflict, executing_tasks) + elif conflict.type == "edge": + return self._resolve_edge_conflict(paths, conflict, executing_tasks) + elif conflict.type == "follow": + return self._resolve_following_conflict(paths, conflict, executing_tasks) + + return paths + + def _resolve_vertex_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: + """瑙e喅椤剁偣鍐茬獊 - 鍩轰簬浼樺厛绾ц瘎浼伴�夋嫨璁╂鐨凙GV""" + updated_paths = paths.copy() + + # 鎵惧埌鍐茬獊鐨凙GV璺緞 + agv1_path = None + agv2_path = None + agv1_index = agv2_index = -1 + + for i, path in enumerate(updated_paths): + if path.get('agvId') == conflict.agv1: + agv1_path = path + agv1_index = i + elif path.get('agvId') == conflict.agv2: + agv2_path = path + agv2_index = i + + if not agv1_path or not agv2_path: + return updated_paths + + # 璇勪及涓や釜AGV鐨勪紭鍏堢骇 + agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks) + agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks) + + # 閫夋嫨浼樺厛绾ц緝浣庣殑AGV杩涜绛夊緟 + if agv1_priority.priority_score <= agv2_priority.priority_score: + waiting_agv_path = agv1_path + waiting_agv_index = agv1_index + waiting_agv_id = conflict.agv1 + higher_priority_agv_id = conflict.agv2 + else: + waiting_agv_path = agv2_path + waiting_agv_index = agv2_index + waiting_agv_id = conflict.agv2 + higher_priority_agv_id = conflict.agv1 + + # 璁板綍璇︾粏鐨勯伩璁╁喅绛栦俊鎭� + self.logger.info(f"椤剁偣鍐茬獊閬胯鍐崇瓥 - 浣嶇疆: {conflict.position1}, 鏃堕棿: {conflict.time_step}") + self.logger.info(f" AGV {conflict.agv1}: {agv1_priority.explanation}") + self.logger.info(f" AGV {conflict.agv2}: {agv2_priority.explanation}") + self.logger.info(f" 鍐崇瓥: AGV {waiting_agv_id} 璁╂缁� AGV {higher_priority_agv_id}") + + # 鍦ㄥ啿绐佷綅缃箣鍓嶆彃鍏ョ瓑寰呮楠� + self._insert_wait_step(waiting_agv_path, conflict.time_step) + updated_paths[waiting_agv_index] = waiting_agv_path + + return updated_paths + + def _resolve_edge_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: + """瑙e喅杈瑰啿绐� - 鍩轰簬浼樺厛绾ц瘎浼伴�夋嫨璁╂鐨凙GV""" + updated_paths = paths.copy() + + # 鎵惧埌鍐茬獊鐨凙GV璺緞 + agv1_path = None + agv2_path = None + agv1_index = agv2_index = -1 + + for i, path in enumerate(updated_paths): + if path.get('agvId') == conflict.agv1: + agv1_path = path + agv1_index = i + elif path.get('agvId') == conflict.agv2: + agv2_path = path + agv2_index = i + + if not agv1_path or not agv2_path: + return updated_paths + + # 璇勪及涓や釜AGV鐨勪紭鍏堢骇 + agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks) + agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks) + + # 閫夋嫨浼樺厛绾ц緝浣庣殑AGV杩涜绛夊緟 + if agv1_priority.priority_score <= agv2_priority.priority_score: + waiting_agv_path = agv1_path + waiting_agv_index = agv1_index + waiting_agv_id = conflict.agv1 + higher_priority_agv_id = conflict.agv2 + else: + waiting_agv_path = agv2_path + waiting_agv_index = agv2_index + waiting_agv_id = conflict.agv2 + higher_priority_agv_id = conflict.agv1 + + # 璁板綍璇︾粏鐨勯伩璁╁喅绛栦俊鎭� + self.logger.info(f"杈瑰啿绐侀伩璁╁喅绛� - 浣嶇疆浜ゆ崲: {conflict.position1}<->{conflict.position2}, 鏃堕棿: {conflict.time_step}") + self.logger.info(f" AGV {conflict.agv1}: {agv1_priority.explanation}") + self.logger.info(f" AGV {conflict.agv2}: {agv2_priority.explanation}") + self.logger.info(f" 鍐崇瓥: AGV {waiting_agv_id} 璁╂缁� AGV {higher_priority_agv_id}") + + # 鍦ㄥ啿绐佷綅缃箣鍓嶆彃鍏ョ瓑寰呮楠� + self._insert_wait_step(waiting_agv_path, conflict.time_step) + updated_paths[waiting_agv_index] = waiting_agv_path + + return updated_paths + + def _resolve_following_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]: + """瑙e喅璺熼殢鍐茬獊 - 鍩轰簬浼樺厛绾ц瘎浼伴�夋嫨璁╂鐨凙GV""" + updated_paths = paths.copy() + + # 鎵惧埌鍐茬獊鐨凙GV璺緞 + agv1_path = None + agv2_path = None + agv1_index = agv2_index = -1 + + for i, path in enumerate(updated_paths): + if path.get('agvId') == conflict.agv1: + agv1_path = path + agv1_index = i + elif path.get('agvId') == conflict.agv2: + agv2_path = path + agv2_index = i + + if not agv1_path or not agv2_path: + return updated_paths + + # 璇勪及涓や釜AGV鐨勪紭鍏堢骇 + agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks) + agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks) + + # 閫夋嫨浼樺厛绾ц緝浣庣殑AGV杩涜绛夊緟 + if agv1_priority.priority_score <= agv2_priority.priority_score: + waiting_agv_path = agv1_path + waiting_agv_index = agv1_index + waiting_agv_id = conflict.agv1 + higher_priority_agv_id = conflict.agv2 + else: + waiting_agv_path = agv2_path + waiting_agv_index = agv2_index + waiting_agv_id = conflict.agv2 + higher_priority_agv_id = conflict.agv1 + + # 璁板綍璇︾粏鐨勯伩璁╁喅绛栦俊鎭� + self.logger.info(f"璺熼殢鍐茬獊閬胯鍐崇瓥 - 璺濈杩囪繎, 鏃堕棿: {conflict.time_step}") + self.logger.info(f" AGV {conflict.agv1} (浣嶇疆: {conflict.position1}): {agv1_priority.explanation}") + self.logger.info(f" AGV {conflict.agv2} (浣嶇疆: {conflict.position2}): {agv2_priority.explanation}") + self.logger.info(f" 鍐崇瓥: AGV {waiting_agv_id} 璁╂缁� AGV {higher_priority_agv_id}") + + # 鍦ㄥ啿绐佷綅缃箣鍓嶆彃鍏ョ瓑寰呮楠� + self._insert_wait_step(waiting_agv_path, conflict.time_step) + updated_paths[waiting_agv_index] = waiting_agv_path + + return updated_paths + + def _insert_wait_step(self, path: Dict, time_step: int): + """ + 鍦ㄦ寚瀹氭椂闂存鎻掑叆绛夊緟姝ラ + + Args: + path: AGV璺緞 + time_step: 鎻掑叆浣嶇疆鐨勬椂闂存 + """ + code_list = path.get('codeList', []) + + if time_step < len(code_list) and time_step > 0: + # 鍦ㄦ寚瀹氫綅缃彃鍏ョ瓑寰呮楠わ紙閲嶅鍓嶄竴涓綅缃級 + prev_code = code_list[time_step - 1] + + # 纭繚澶嶅埗鎵�鏈夊瓧娈� + if isinstance(prev_code, dict): + wait_code = prev_code.copy() + else: + wait_code = { + 'code': prev_code.code, + 'direction': prev_code.direction + } + + code_list.insert(time_step, wait_code) + path['codeList'] = code_list + + def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]: + """ + 楠岃瘉骞朵慨姝h矾寰勶紝纭繚AGV鍙兘浠�0銆�90銆�180銆�270搴︾Щ鍔� + + Args: + planned_paths: 瑙勫垝璺緞鍒楄〃 + + Returns: + List[Dict]: 淇鍚庣殑璺緞鍒楄〃 + """ + validated_paths = [] + + for path_data in planned_paths: + validated_path = self._validate_single_path_directions(path_data) + validated_paths.append(validated_path) + + return validated_paths + + def _validate_single_path_directions(self, path_data: Dict) -> Dict: + """ + 楠岃瘉鍗曚釜璺緞鐨勭Щ鍔ㄦ柟鍚� + + Args: + path_data: 鍗曚釜AGV璺緞鏁版嵁 + + Returns: + Dict: 淇鍚庣殑璺緞鏁版嵁 + """ + validated_path = path_data.copy() + code_list = path_data.get('codeList', []) + + if len(code_list) < 2: + return validated_path + + validated_code_list = [] + + for i in range(len(code_list)): + current_code = code_list[i] + + if isinstance(current_code, dict): + position = current_code.get('code', '') + direction = current_code.get('direction', '90') + + # 淇濈暀鍘熸湁鐨勬墍鏈夊瓧娈� + validated_code = current_code.copy() + else: + position = current_code.code + direction = current_code.direction + + # 杞崲涓哄瓧鍏告牸寮忓苟淇濈暀鍩烘湰瀛楁 + validated_code = { + 'code': position, + 'direction': direction + } + + # 濡傛灉涓嶆槸绗竴涓偣锛岃绠楁纭殑鏂瑰悜 + if i > 0: + prev_code = code_list[i - 1] + prev_position = prev_code.get('code', '') if isinstance(prev_code, dict) else prev_code.code + + # 璁$畻绉诲姩鏂瑰悜 + correct_direction = self._calculate_movement_direction(prev_position, position) + if correct_direction: + direction = correct_direction + + # 纭繚鏂瑰悜鏄湁鏁堢殑鍥涙柟鍚戜箣涓� + direction = self._normalize_direction(direction) + + # 鏇存柊鏂瑰悜锛屼繚鐣欏叾浠栨墍鏈夊瓧娈� + validated_code['direction'] = direction + validated_code_list.append(validated_code) + + validated_path['codeList'] = validated_code_list + return validated_path + + def _calculate_movement_direction(self, from_position: str, to_position: str) -> Optional[str]: + """ + 璁$畻浠庝竴涓綅缃埌鍙︿竴涓綅缃殑绉诲姩鏂瑰悜 + + Args: + from_position: 璧峰浣嶇疆鐮� + to_position: 鐩爣浣嶇疆鐮� + + Returns: + Optional[str]: 绉诲姩鏂瑰悜锛�0, 90, 180, 270锛� + """ + from_coord = get_coordinate_from_path_id(from_position, self.path_mapping) + to_coord = get_coordinate_from_path_id(to_position, self.path_mapping) + + if not from_coord or not to_coord: + return None + + dx = to_coord[0] - from_coord[0] + dy = to_coord[1] - from_coord[1] + + # 鍙厑璁稿洓涓柟鍚戠殑绉诲姩 + if dx > 0 and dy == 0: + return "0" # 鍚戜笢 + elif dx < 0 and dy == 0: + return "180" # 鍚戣タ + elif dx == 0 and dy > 0: + return "90" # 鍚戝崡 + elif dx == 0 and dy < 0: + return "270" # 鍚戝寳 + else: + # 涓嶆槸鏍囧噯鍥涙柟鍚戠Щ鍔紝杩斿洖榛樿鏂瑰悜 + return "90" + + def _normalize_direction(self, direction: str) -> str: + """ + 鏍囧噯鍖栨柟鍚戝�硷紝纭繚鍙湁0銆�90銆�180銆�270 + + Args: + direction: 鍘熷鏂瑰悜鍊� + + Returns: + str: 鏍囧噯鍖栧悗鐨勬柟鍚戝�� + """ + try: + angle = int(direction) % 360 + + # 灏嗚搴︽槧灏勫埌鏈�杩戠殑鍥涙柟鍚� + if angle <= 45 or angle > 315: + return "0" + elif 45 < angle <= 135: + return "90" + elif 135 < angle <= 225: + return "180" + else: + return "270" + except: + return "90" # 榛樿鏂瑰悜 \ No newline at end of file -- Gitblit v1.9.1