From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001 From: zhang <zc857179121@qq.com> Date: 星期三, 02 七月 2025 13:12:26 +0800 Subject: [PATCH] 初版提交 --- algorithm_system/models/agv_model.py | 572 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 572 insertions(+), 0 deletions(-) diff --git a/algorithm_system/models/agv_model.py b/algorithm_system/models/agv_model.py new file mode 100644 index 0000000..6131862 --- /dev/null +++ b/algorithm_system/models/agv_model.py @@ -0,0 +1,572 @@ +""" +AGV妯″瀷 - 鐢ㄤ簬绠楁硶绯荤粺鐨凙GV鏁版嵁寤烘ā +""" +import time +import logging +from typing import Dict, List, Optional, Tuple, Set +from dataclasses import dataclass, field +from enum import Enum + +from common.data_models import AGVStatus, BackpackData +from common.utils import get_coordinate_from_path_id, calculate_manhattan_distance + + +class AGVTaskStatus(Enum): + """AGV浠诲姟鐘舵��""" + IDLE = "IDLE" # 绌洪棽 + ASSIGNED = "ASSIGNED" # 宸插垎閰嶄换鍔′絾鏈紑濮� + EXECUTING = "EXECUTING" # 鎵ц涓� + COMPLETED = "COMPLETED" # 宸插畬鎴� + + +@dataclass +class TaskAssignment: + """浠诲姟鍒嗛厤淇℃伅""" + task_id: str + assigned_time: float + priority: int + status: AGVTaskStatus = AGVTaskStatus.ASSIGNED + start_code: Optional[str] = None + end_code: Optional[str] = None + estimated_duration: Optional[float] = None + + +class AGVModel: + """AGV妯″瀷绫伙紝鐢ㄤ簬绠楁硶璁$畻""" + + def __init__(self, agv_id: str, path_mapping: Dict[str, Dict[str, int]]): + """ + 鍒濆鍖朅GV妯″瀷 + + Args: + agv_id: AGV ID + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + """ + self.agvId = agv_id + self.path_mapping = path_mapping + self.logger = logging.getLogger(__name__) + + # AGV鐘舵�佷俊鎭� + self.status: str = "0" # AGV鐘舵�佺爜 + self.mapCode: str = "" # 褰撳墠浣嶇疆鐮� + self.coordinates: Optional[Tuple[int, int]] = None # 褰撳墠鍧愭爣 + self.backpack: Optional[List[BackpackData]] = None # 鑳屽寘淇℃伅 + + # 鍏呯數鐩稿叧 + self.voltage: int = 100 # 褰撳墠鐢甸噺鐧惧垎姣� + self.autoCharge: int = 20 # 浣庣數閲忚瀹氶槇鍊� + self.lowVol: int = 10 # 鏈�浣庣數閲忛槇鍊� + + # 浠诲姟鐩稿叧 + self.assigned_tasks: List[TaskAssignment] = [] + self.current_task_count: int = 0 + self.max_capacity: int = 5 # 鏈�澶т换鍔″閲� + + # 鎬ц兘鐩稿叧 + self.efficiency_score: float = 1.0 # 鏁堢巼寰楀垎 + self.last_update_time: float = time.time() + + # 缁熻淇℃伅 + self.total_completed_tasks: int = 0 + self.total_distance_traveled: float = 0.0 + self.average_completion_time: float = 0.0 + + def update_from_agv_status(self, agv_status: AGVStatus): + """浠嶢GV鐘舵�佹洿鏂版ā鍨�""" + self.status = agv_status.status + if hasattr(agv_status, 'position'): + self.mapCode = agv_status.position + elif hasattr(agv_status, 'mapCode'): + self.mapCode = agv_status.mapCode + else: + self.mapCode = "" + + self.backpack = agv_status.backpack + self.last_update_time = time.time() + + raw_voltage = getattr(agv_status, 'vol', 100) + raw_auto_charge = getattr(agv_status, 'autoCharge', 20) + raw_low_vol = getattr(agv_status, 'lowVol', 10) + + # 鐢甸噺鏁版嵁鏍囧噯鍖栧鐞� + # 濡傛灉鐢靛帇鍊煎ぇ浜�100锛屽彲鑳芥槸姣紡鍊硷紝闇�瑕佽浆鎹负鐧惧垎姣� + if raw_voltage > 100: + # 鍋囪姝e父鐢靛帇鑼冨洿鏄�3000-5000mV锛岃浆鎹负0-100% + # 杩欓噷浣跨敤绠�鍗曠殑绾挎�ф槧灏� + normalized_voltage = max(0, min(100, ((raw_voltage - 3000) / 2000) * 100)) + self.logger.debug(f"AGV {self.agvId} 鐢靛帇鏍囧噯鍖�: {raw_voltage}mV -> {normalized_voltage:.1f}%") + self.voltage = int(normalized_voltage) + else: + self.voltage = raw_voltage + + # 闃堝�兼爣鍑嗗寲澶勭悊 + if raw_auto_charge > 100: + # 濡傛灉闃堝�间篃鏄浼忓�硷紝鍚屾牱杞崲 + self.autoCharge = max(0, min(100, ((raw_auto_charge - 3000) / 2000) * 100)) + self.logger.debug(f"AGV {self.agvId} 鑷姩鍏呯數闃堝�兼爣鍑嗗寲: {raw_auto_charge}mV -> {self.autoCharge:.1f}%") + else: + self.autoCharge = raw_auto_charge + + if raw_low_vol > 100: + # 濡傛灉鏈�浣庣數閲忛槇鍊间篃鏄浼忓�硷紝鍚屾牱杞崲 + self.lowVol = max(0, min(100, ((raw_low_vol - 3000) / 2000) * 100)) + self.logger.debug(f"AGV {self.agvId} 鏈�浣庣數閲忛槇鍊兼爣鍑嗗寲: {raw_low_vol}mV -> {self.lowVol:.1f}%") + else: + self.lowVol = raw_low_vol + + # 鏇存柊鍧愭爣 + if self.mapCode: + self.coordinates = get_coordinate_from_path_id(self.mapCode, self.path_mapping) + + # 鏍规嵁AGV鐘舵�佹洿鏂颁换鍔¤鏁� + if self.backpack and self.backpack: + self.current_task_count = len([bp for bp in self.backpack if bp.execute]) + else: + self.current_task_count = 0 + + def assign_task(self, task_id: str, priority: int = 5, + start_code: str = "", end_code: str = "") -> bool: + """ + 鍒嗛厤浠诲姟缁橝GV + + Args: + task_id: 浠诲姟ID + priority: 浠诲姟浼樺厛绾� + start_code: 璧峰浣嶇疆鐮� + end_code: 缁撴潫浣嶇疆鐮� + + Returns: + bool: 鏄惁鍒嗛厤鎴愬姛 + """ + if self.is_overloaded(): + self.logger.warning(f"AGV {self.agvId} 宸叉弧杞斤紝鏃犳硶鍒嗛厤鏇村浠诲姟") + return False + + # 鍒涘缓浠诲姟鍒嗛厤璁板綍 + task_assignment = TaskAssignment( + task_id=task_id, + assigned_time=time.time(), + priority=priority, + start_code=start_code, + end_code=end_code + ) + + self.assigned_tasks.append(task_assignment) + self.current_task_count += 1 + + self.logger.info(f"浠诲姟 {task_id} 宸插垎閰嶇粰AGV {self.agvId}") + return True + + def complete_task(self, task_id: str) -> bool: + """ + 瀹屾垚浠诲姟 + + Args: + task_id: 浠诲姟ID + + Returns: + bool: 鏄惁瀹屾垚鎴愬姛 + """ + for task in self.assigned_tasks: + if task.task_id == task_id: + task.status = AGVTaskStatus.COMPLETED + self.current_task_count = max(0, self.current_task_count - 1) + self.total_completed_tasks += 1 + + self.logger.info(f"AGV {self.agvId} 瀹屾垚浠诲姟 {task_id}") + return True + + return False + + def can_accept_task(self, priority: int) -> bool: + """ + 妫�鏌ユ槸鍚﹀彲浠ユ帴鍙楁柊浠诲姟 + + Args: + priority: 浠诲姟浼樺厛绾� + + Returns: + bool: 鏄惁鍙互鎺ュ彈 + """ + # 妫�鏌ュ閲� + if self.is_overloaded(): + self.logger.debug(f"AGV {self.agvId} 浠诲姟宸叉弧杞�({self.current_task_count}/{self.max_capacity})锛屾嫆缁濇柊浠诲姟") + return False + + # 妫�鏌GV鐘舵�侊紙鍏煎鏁存暟鍜屽瓧绗︿覆鏍煎紡锛� + # 鐘舵�� 0(绌洪棽), 1(蹇欑), 2(鍏呯數) 鍙互鎺ュ彈浠诲姟 + # 鐘舵�� 3(鏁呴殰), 4(缁存姢) 涓嶈兘鎺ュ彈浠诲姟 + status_value = str(self.status) # 缁熶竴杞崲涓哄瓧绗︿覆杩涜姣旇緝 + if status_value not in ["0", "1", "2"]: + self.logger.debug(f"AGV {self.agvId} 鐘舵�佸紓甯�(status={status_value})锛屾嫆缁濇柊浠诲姟") + return False + + # 妫�鏌ュ厖鐢电姸鎬� - 濡傛灉鐢甸噺杩囦綆蹇呴』鍏呯數锛屼笉鑳芥帴鍙楁柊浠诲姟 + if self.need_charging(): + self.logger.debug(f"AGV {self.agvId} 鐢甸噺杩囦綆({self.voltage}% <= {self.lowVol}%)锛屽繀椤诲厖鐢碉紝鎷掔粷鏂颁换鍔�") + return False + + # 濡傛灉鏄珮浼樺厛绾т换鍔★紝鍗充娇浣庣數閲忎篃鍙互鎺ュ彈 + if priority >= 9: # 楂樹紭鍏堢骇浠诲姟闃堝�� + self.logger.debug(f"AGV {self.agvId} 鎺ュ彈楂樹紭鍏堢骇浠诲姟(priority={priority})") + return True + + # 瀵逛簬鏅�氫紭鍏堢骇浠诲姟锛屽鏋滅數閲忎綆浣嗕笉鏄繀椤诲厖鐢碉紝鍙互鏍规嵁浼樺厛绾у喅瀹� + if self.can_auto_charge(): + # 浼樺厛绾ц秺楂橈紝瓒婂�惧悜浜庢帴鍙椾换鍔� + if priority >= 5: # 涓珮浼樺厛绾� + self.logger.debug(f"AGV {self.agvId} 鎺ュ彈涓珮浼樺厛绾т换鍔�(priority={priority}, 鐢甸噺={self.voltage}%)") + return True + elif priority >= 3: # 涓瓑浼樺厛绾э紝75%姒傜巼鎺ュ彈 + import random + accept = random.random() < 0.75 + self.logger.debug(f"AGV {self.agvId} 闅忔満鍐冲畾{'鎺ュ彈' if accept else '鎷掔粷'}涓瓑浼樺厛绾т换鍔�(priority={priority}, 鐢甸噺={self.voltage}%)") + return accept + elif priority >= 1: # 浣庝紭鍏堢骇锛�50%姒傜巼鎺ュ彈锛堝寘鎷紭鍏堢骇1锛� + import random + accept = random.random() < 0.5 + self.logger.debug(f"AGV {self.agvId} 闅忔満鍐冲畾{'鎺ュ彈' if accept else '鎷掔粷'}浣庝紭鍏堢骇浠诲姟(priority={priority}, 鐢甸噺={self.voltage}%)") + return accept + else: # 鏋佷綆浼樺厛绾э紙priority=0锛夛紝鎷掔粷 + self.logger.debug(f"AGV {self.agvId} 鐢甸噺鍋忎綆({self.voltage}%)锛屾嫆缁濇瀬浣庝紭鍏堢骇浠诲姟(priority={priority})") + return False + + # 姝e父鎯呭喌涓嬪彲浠ユ帴鍙椾换鍔� + self.logger.debug(f"AGV {self.agvId} 鐘舵�佽壇濂斤紝鍙互鎺ュ彈浠诲姟(鐢甸噺={self.voltage}%, priority={priority})") + return True + + def is_overloaded(self) -> bool: + """ + 妫�鏌ユ槸鍚﹁繃杞� + + Returns: + bool: 鏄惁杩囪浇 + """ + return self.current_task_count >= self.max_capacity + + def get_workload_ratio(self) -> float: + """ + 鑾峰彇宸ヤ綔璐熻浇姣斾緥 + + Returns: + float: 宸ヤ綔璐熻浇姣斾緥 (0.0 - 1.0) + """ + return min(1.0, self.current_task_count / self.max_capacity) + + def get_task_capacity(self) -> int: + """ + 鑾峰彇鍓╀綑浠诲姟瀹归噺 + + Returns: + int: 鍓╀綑瀹归噺 + """ + return max(0, self.max_capacity - self.current_task_count) + + def need_charging(self) -> bool: + """ + 妫�鏌ユ槸鍚﹂渶瑕佸厖鐢� + + Returns: + bool: 鏄惁闇�瑕佸厖鐢� + """ + return self.voltage <= self.lowVol + + def can_auto_charge(self) -> bool: + """ + 妫�鏌ユ槸鍚﹀彲浠ヨ嚜鍔ㄥ厖鐢碉紙浣庝簬闃堝�间絾涓嶆槸蹇呴』鍏呯數锛� + + Returns: + bool: 鏄惁鍙互鑷姩鍏呯數 + """ + return self.lowVol < self.voltage <= self.autoCharge + + def is_low_power(self) -> bool: + """ + 妫�鏌ユ槸鍚︿负浣庣數閲忕姸鎬� + + Returns: + bool: 鏄惁涓轰綆鐢甸噺鐘舵�� + """ + return self.voltage <= self.autoCharge + + def get_charging_priority(self) -> int: + """ + 鑾峰彇鍏呯數浼樺厛绾э紙鏁板�艰秺澶т紭鍏堢骇瓒婇珮锛� + + Returns: + int: 鍏呯數浼樺厛绾� + """ + if self.need_charging(): + return 100 # 蹇呴』鍏呯數锛屾渶楂樹紭鍏堢骇 + elif self.can_auto_charge(): + return 50 + (self.autoCharge - self.voltage) # 鏍规嵁鐢甸噺宸绠椾紭鍏堢骇 + else: + return 0 # 鏃犻渶鍏呯數 + + def calculate_efficiency_score(self, path_mapping: Dict[str, Dict[str, int]]) -> float: + """ + 璁$畻AGV鏁堢巼寰楀垎 + + Args: + path_mapping: 璺緞鐐规槧灏� + + Returns: + float: 鏁堢巼寰楀垎 (0.0 - 1.0) + """ + # 鍩虹鏁堢巼鍒嗘暟 + base_score = 0.7 + + # 鏍规嵁瀹屾垚浠诲姟鏁伴噺璋冩暣 + if self.total_completed_tasks > 0: + completion_bonus = min(0.2, self.total_completed_tasks * 0.01) + base_score += completion_bonus + + # 鏍规嵁璐熻浇鎯呭喌璋冩暣 + load_ratio = self.get_workload_ratio() + if load_ratio < 0.8: # 璐熻浇涓嶅お楂樻椂鏁堢巼鏇撮珮 + load_bonus = (0.8 - load_ratio) * 0.1 + base_score += load_bonus + + # 鏍规嵁AGV鐘舵�佽皟鏁� + if self.status == "0": # 姝e父鐘舵�� + base_score += 0.1 + elif self.status in ["3", "4"]: # 寮傚父鐘舵�� + base_score -= 0.2 + + # 鏃堕棿鍥犲瓙锛氭渶杩戞洿鏂扮殑AGV寰楀垎鏇撮珮 + time_since_update = time.time() - self.last_update_time + if time_since_update < 60: # 1鍒嗛挓鍐呮洿鏂� + time_bonus = (60 - time_since_update) / 600 # 鏈�澶氬姞0.1 + base_score += time_bonus + + return max(0.0, min(1.0, base_score)) + + def estimate_travel_time(self, target_code: str) -> float: + """ + 浼扮畻鍒扮洰鏍囦綅缃殑琛岄┒鏃堕棿 + + Args: + target_code: 鐩爣浣嶇疆鐮� + + Returns: + float: 浼扮畻鏃堕棿锛堢锛� + """ + if not self.coordinates: + return float('inf') + + target_coord = get_coordinate_from_path_id(target_code, self.path_mapping) + if not target_coord: + return float('inf') + + # 璁$畻鏇煎搱椤胯窛绂� + distance = calculate_manhattan_distance(self.coordinates, target_coord) + + # 鍋囪AGV骞冲潎閫熷害涓�1鍗曚綅/绉� + estimated_time = distance * 1.0 + + # 鑰冭檻褰撳墠璐熻浇瀵归�熷害鐨勫奖鍝� + load_factor = 1.0 + (self.get_workload_ratio() * 0.2) + + return estimated_time * load_factor + + def get_task_by_id(self, task_id: str) -> Optional[TaskAssignment]: + """ + 鏍规嵁浠诲姟ID鑾峰彇浠诲姟鍒嗛厤淇℃伅 + + Args: + task_id: 浠诲姟ID + + Returns: + Optional[TaskAssignment]: 浠诲姟鍒嗛厤淇℃伅 + """ + for task in self.assigned_tasks: + if task.task_id == task_id: + return task + return None + + def get_pending_tasks(self) -> List[TaskAssignment]: + """ + 鑾峰彇寰呮墽琛岀殑浠诲姟 + + Returns: + List[TaskAssignment]: 寰呮墽琛屼换鍔″垪琛� + """ + return [task for task in self.assigned_tasks + if task.status in [AGVTaskStatus.ASSIGNED, AGVTaskStatus.EXECUTING]] + + def clear_completed_tasks(self): + """娓呯悊宸插畬鎴愮殑浠诲姟""" + self.assigned_tasks = [task for task in self.assigned_tasks + if task.status != AGVTaskStatus.COMPLETED] + + def __str__(self) -> str: + """瀛楃涓茶〃绀�""" + return (f"AGV({self.agvId}, status={self.status}, " + f"pos={self.mapCode}, tasks={self.current_task_count}/{self.max_capacity})") + + +class AGVModelManager: + """AGV妯″瀷绠$悊鍣�""" + + def __init__(self, path_mapping: Dict[str, Dict[str, int]]): + """ + 鍒濆鍖朅GV妯″瀷绠$悊鍣� + + Args: + path_mapping: 璺緞鐐规槧灏勫瓧鍏� + """ + self.path_mapping = path_mapping + self.agv_models: Dict[str, AGVModel] = {} + self.agv_status_data: Dict[str, AGVStatus] = {} # 瀛樺偍鍘熷AGV鐘舵�佹暟鎹� + self.logger = logging.getLogger(__name__) + + def update_agv_data(self, agv_status_list: List[AGVStatus]): + """ + 鏇存柊AGV鏁版嵁 + + Args: + agv_status_list: AGV鐘舵�佸垪琛� + """ + # 鑾峰彇褰撳墠鏇存柊鐨凙GV ID鍒楄〃 + current_agv_ids = {agv_status.agvId for agv_status in agv_status_list} + + # 绉婚櫎涓嶅啀瀛樺湪鐨凙GV妯″瀷鍜岀姸鎬佹暟鎹� + removed_agvs = [] + for agv_id in list(self.agv_models.keys()): + if agv_id not in current_agv_ids: + removed_agvs.append(agv_id) + del self.agv_models[agv_id] + if agv_id in self.agv_status_data: + del self.agv_status_data[agv_id] + + if removed_agvs: + self.logger.info(f"绉婚櫎涓嶅瓨鍦ㄧ殑AGV: {removed_agvs}") + + # 鏇存柊鎴栧垱寤篈GV妯″瀷鍜岀姸鎬佹暟鎹� + for agv_status in agv_status_list: + agv_id = agv_status.agvId + + # 瀛樺偍鍘熷AGV鐘舵�佹暟鎹� + self.agv_status_data[agv_id] = agv_status + + # 濡傛灉AGV妯″瀷涓嶅瓨鍦紝鍒涘缓鏂扮殑 + if agv_id not in self.agv_models: + self.agv_models[agv_id] = AGVModel(agv_id, self.path_mapping) + self.logger.info(f"鍒涘缓鏂扮殑AGV妯″瀷: {agv_id}") + + # 鏇存柊AGV妯″瀷 + self.agv_models[agv_id].update_from_agv_status(agv_status) + + self.logger.debug(f"鏇存柊浜� {len(agv_status_list)} 涓狝GV妯″瀷锛屽綋鍓嶆�绘暟: {len(self.agv_models)}") + + def get_all_agv_status(self) -> List[AGVStatus]: + """ + 鑾峰彇鎵�鏈堿GV鐨勫師濮嬬姸鎬佹暟鎹� + + Returns: + List[AGVStatus]: AGV鐘舵�佹暟鎹垪琛� + """ + return list(self.agv_status_data.values()) + + def get_agv_status(self, agv_id: str) -> Optional[AGVStatus]: + """ + 鑾峰彇鎸囧畾AGV鐨勫師濮嬬姸鎬佹暟鎹� + + Args: + agv_id: AGV ID + + Returns: + Optional[AGVStatus]: AGV鐘舵�佹暟鎹� + """ + return self.agv_status_data.get(agv_id) + + def get_agv_model(self, agv_id: str) -> Optional[AGVModel]: + """ + 鑾峰彇AGV妯″瀷 + + Args: + agv_id: AGV ID + + Returns: + Optional[AGVModel]: AGV妯″瀷 + """ + return self.agv_models.get(agv_id) + + def get_all_agvs(self) -> List[AGVModel]: + """ + 鑾峰彇鎵�鏈堿GV妯″瀷 + + Returns: + List[AGVModel]: AGV妯″瀷鍒楄〃 + """ + return list(self.agv_models.values()) + + def get_available_agvs(self) -> List[AGVModel]: + """ + 鑾峰彇鍙敤鐨凙GV妯″瀷锛堟湭婊¤浇涓旂姸鎬佹甯革級 + + Returns: + List[AGVModel]: 鍙敤AGV妯″瀷鍒楄〃 + """ + available_agvs = [] + for agv in self.agv_models.values(): + if not agv.is_overloaded() and agv.can_accept_task(5): + available_agvs.append(agv) + + return available_agvs + + def get_agvs_by_status(self, status: str) -> List[AGVModel]: + """ + 鏍规嵁鐘舵�佽幏鍙朅GV鍒楄〃 + + Args: + status: AGV鐘舵�� + + Returns: + List[AGVModel]: 绗﹀悎鐘舵�佺殑AGV鍒楄〃 + """ + return [agv for agv in self.agv_models.values() if agv.status == status] + + def cleanup_old_agvs(self, max_age_seconds: float = 300): + """ + 娓呯悊闀挎椂闂存湭鏇存柊鐨凙GV + + Args: + max_age_seconds: 鏈�澶у厑璁哥殑鏈洿鏂版椂闂达紙绉掞級 + """ + current_time = time.time() + old_agvs = [] + + for agv_id, agv in self.agv_models.items(): + if current_time - agv.last_update_time > max_age_seconds: + old_agvs.append(agv_id) + + for agv_id in old_agvs: + del self.agv_models[agv_id] + self.logger.info(f"娓呯悊闀挎椂闂存湭鏇存柊鐨凙GV: {agv_id}") + + def get_statistics(self) -> Dict[str, any]: + """ + 鑾峰彇AGV缁熻淇℃伅 + + Returns: + Dict: 缁熻淇℃伅 + """ + total_agvs = len(self.agv_models) + available_agvs = len(self.get_available_agvs()) + total_tasks = sum(agv.current_task_count for agv in self.agv_models.values()) + total_capacity = sum(agv.max_capacity for agv in self.agv_models.values()) + + avg_efficiency = 0.0 + if self.agv_models: + avg_efficiency = sum(agv.calculate_efficiency_score(self.path_mapping) + for agv in self.agv_models.values()) / total_agvs + + return { + "total_agvs": total_agvs, + "available_agvs": available_agvs, + "total_assigned_tasks": total_tasks, + "total_capacity": total_capacity, + "capacity_utilization": total_tasks / total_capacity if total_capacity > 0 else 0.0, + "average_efficiency": avg_efficiency + } \ No newline at end of file -- Gitblit v1.9.1