From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001
From: zhang <zc857179121@qq.com>
Date: 星期三, 02 七月 2025 13:12:26 +0800
Subject: [PATCH] 初版提交

---
 algorithm_system/models/agv_model.py |  572 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 572 insertions(+), 0 deletions(-)

diff --git a/algorithm_system/models/agv_model.py b/algorithm_system/models/agv_model.py
new file mode 100644
index 0000000..6131862
--- /dev/null
+++ b/algorithm_system/models/agv_model.py
@@ -0,0 +1,572 @@
+"""
+AGV妯″瀷 - 鐢ㄤ簬绠楁硶绯荤粺鐨凙GV鏁版嵁寤烘ā
+"""
+import time
+import logging
+from typing import Dict, List, Optional, Tuple, Set
+from dataclasses import dataclass, field
+from enum import Enum
+
+from common.data_models import AGVStatus, BackpackData
+from common.utils import get_coordinate_from_path_id, calculate_manhattan_distance
+
+
+class AGVTaskStatus(Enum):
+    """AGV浠诲姟鐘舵��"""
+    IDLE = "IDLE"           # 绌洪棽
+    ASSIGNED = "ASSIGNED"   # 宸插垎閰嶄换鍔′絾鏈紑濮�
+    EXECUTING = "EXECUTING" # 鎵ц涓�
+    COMPLETED = "COMPLETED" # 宸插畬鎴�
+
+
+@dataclass
+class TaskAssignment:
+    """浠诲姟鍒嗛厤淇℃伅"""
+    task_id: str
+    assigned_time: float
+    priority: int
+    status: AGVTaskStatus = AGVTaskStatus.ASSIGNED
+    start_code: Optional[str] = None
+    end_code: Optional[str] = None
+    estimated_duration: Optional[float] = None
+
+
+class AGVModel:
+    """AGV妯″瀷绫伙紝鐢ㄤ簬绠楁硶璁$畻"""
+    
+    def __init__(self, agv_id: str, path_mapping: Dict[str, Dict[str, int]]):
+        """
+        鍒濆鍖朅GV妯″瀷
+        
+        Args:
+            agv_id: AGV ID
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+        """
+        self.agvId = agv_id
+        self.path_mapping = path_mapping
+        self.logger = logging.getLogger(__name__)
+        
+        # AGV鐘舵�佷俊鎭�
+        self.status: str = "0"  # AGV鐘舵�佺爜
+        self.mapCode: str = ""  # 褰撳墠浣嶇疆鐮�
+        self.coordinates: Optional[Tuple[int, int]] = None  # 褰撳墠鍧愭爣
+        self.backpack: Optional[List[BackpackData]] = None  # 鑳屽寘淇℃伅
+        
+        # 鍏呯數鐩稿叧
+        self.voltage: int = 100  # 褰撳墠鐢甸噺鐧惧垎姣�
+        self.autoCharge: int = 20  # 浣庣數閲忚瀹氶槇鍊�
+        self.lowVol: int = 10  # 鏈�浣庣數閲忛槇鍊�
+        
+        # 浠诲姟鐩稿叧
+        self.assigned_tasks: List[TaskAssignment] = []
+        self.current_task_count: int = 0
+        self.max_capacity: int = 5  # 鏈�澶т换鍔″閲�
+        
+        # 鎬ц兘鐩稿叧
+        self.efficiency_score: float = 1.0  # 鏁堢巼寰楀垎
+        self.last_update_time: float = time.time()
+        
+        # 缁熻淇℃伅
+        self.total_completed_tasks: int = 0
+        self.total_distance_traveled: float = 0.0
+        self.average_completion_time: float = 0.0
+    
+    def update_from_agv_status(self, agv_status: AGVStatus):
+        """浠嶢GV鐘舵�佹洿鏂版ā鍨�"""
+        self.status = agv_status.status
+        if hasattr(agv_status, 'position'):
+            self.mapCode = agv_status.position
+        elif hasattr(agv_status, 'mapCode'):
+            self.mapCode = agv_status.mapCode
+        else:
+            self.mapCode = ""
+        
+        self.backpack = agv_status.backpack
+        self.last_update_time = time.time()
+        
+        raw_voltage = getattr(agv_status, 'vol', 100)
+        raw_auto_charge = getattr(agv_status, 'autoCharge', 20)
+        raw_low_vol = getattr(agv_status, 'lowVol', 10)
+        
+        # 鐢甸噺鏁版嵁鏍囧噯鍖栧鐞�
+        # 濡傛灉鐢靛帇鍊煎ぇ浜�100锛屽彲鑳芥槸姣紡鍊硷紝闇�瑕佽浆鎹负鐧惧垎姣�
+        if raw_voltage > 100:
+            # 鍋囪姝e父鐢靛帇鑼冨洿鏄�3000-5000mV锛岃浆鎹负0-100%
+            # 杩欓噷浣跨敤绠�鍗曠殑绾挎�ф槧灏�
+            normalized_voltage = max(0, min(100, ((raw_voltage - 3000) / 2000) * 100))
+            self.logger.debug(f"AGV {self.agvId} 鐢靛帇鏍囧噯鍖�: {raw_voltage}mV -> {normalized_voltage:.1f}%")
+            self.voltage = int(normalized_voltage)
+        else:
+            self.voltage = raw_voltage
+        
+        # 闃堝�兼爣鍑嗗寲澶勭悊
+        if raw_auto_charge > 100:
+            # 濡傛灉闃堝�间篃鏄浼忓�硷紝鍚屾牱杞崲
+            self.autoCharge = max(0, min(100, ((raw_auto_charge - 3000) / 2000) * 100))
+            self.logger.debug(f"AGV {self.agvId} 鑷姩鍏呯數闃堝�兼爣鍑嗗寲: {raw_auto_charge}mV -> {self.autoCharge:.1f}%")
+        else:
+            self.autoCharge = raw_auto_charge
+            
+        if raw_low_vol > 100:
+            # 濡傛灉鏈�浣庣數閲忛槇鍊间篃鏄浼忓�硷紝鍚屾牱杞崲
+            self.lowVol = max(0, min(100, ((raw_low_vol - 3000) / 2000) * 100))
+            self.logger.debug(f"AGV {self.agvId} 鏈�浣庣數閲忛槇鍊兼爣鍑嗗寲: {raw_low_vol}mV -> {self.lowVol:.1f}%")
+        else:
+            self.lowVol = raw_low_vol
+        
+        # 鏇存柊鍧愭爣
+        if self.mapCode:
+            self.coordinates = get_coordinate_from_path_id(self.mapCode, self.path_mapping)
+        
+        # 鏍规嵁AGV鐘舵�佹洿鏂颁换鍔¤鏁�
+        if self.backpack and self.backpack:
+            self.current_task_count = len([bp for bp in self.backpack if bp.execute])
+        else:
+            self.current_task_count = 0
+    
+    def assign_task(self, task_id: str, priority: int = 5, 
+                   start_code: str = "", end_code: str = "") -> bool:
+        """
+        鍒嗛厤浠诲姟缁橝GV
+        
+        Args:
+            task_id: 浠诲姟ID
+            priority: 浠诲姟浼樺厛绾�
+            start_code: 璧峰浣嶇疆鐮�
+            end_code: 缁撴潫浣嶇疆鐮�
+            
+        Returns:
+            bool: 鏄惁鍒嗛厤鎴愬姛
+        """
+        if self.is_overloaded():
+            self.logger.warning(f"AGV {self.agvId} 宸叉弧杞斤紝鏃犳硶鍒嗛厤鏇村浠诲姟")
+            return False
+        
+        # 鍒涘缓浠诲姟鍒嗛厤璁板綍
+        task_assignment = TaskAssignment(
+            task_id=task_id,
+            assigned_time=time.time(),
+            priority=priority,
+            start_code=start_code,
+            end_code=end_code
+        )
+        
+        self.assigned_tasks.append(task_assignment)
+        self.current_task_count += 1
+        
+        self.logger.info(f"浠诲姟 {task_id} 宸插垎閰嶇粰AGV {self.agvId}")
+        return True
+    
+    def complete_task(self, task_id: str) -> bool:
+        """
+        瀹屾垚浠诲姟
+        
+        Args:
+            task_id: 浠诲姟ID
+            
+        Returns:
+            bool: 鏄惁瀹屾垚鎴愬姛
+        """
+        for task in self.assigned_tasks:
+            if task.task_id == task_id:
+                task.status = AGVTaskStatus.COMPLETED
+                self.current_task_count = max(0, self.current_task_count - 1)
+                self.total_completed_tasks += 1
+                
+                self.logger.info(f"AGV {self.agvId} 瀹屾垚浠诲姟 {task_id}")
+                return True
+        
+        return False
+    
+    def can_accept_task(self, priority: int) -> bool:
+        """
+        妫�鏌ユ槸鍚﹀彲浠ユ帴鍙楁柊浠诲姟
+        
+        Args:
+            priority: 浠诲姟浼樺厛绾�
+            
+        Returns:
+            bool: 鏄惁鍙互鎺ュ彈
+        """
+        # 妫�鏌ュ閲�
+        if self.is_overloaded():
+            self.logger.debug(f"AGV {self.agvId} 浠诲姟宸叉弧杞�({self.current_task_count}/{self.max_capacity})锛屾嫆缁濇柊浠诲姟")
+            return False
+        
+        # 妫�鏌GV鐘舵�侊紙鍏煎鏁存暟鍜屽瓧绗︿覆鏍煎紡锛�
+        # 鐘舵�� 0(绌洪棽), 1(蹇欑), 2(鍏呯數) 鍙互鎺ュ彈浠诲姟
+        # 鐘舵�� 3(鏁呴殰), 4(缁存姢) 涓嶈兘鎺ュ彈浠诲姟
+        status_value = str(self.status)  # 缁熶竴杞崲涓哄瓧绗︿覆杩涜姣旇緝
+        if status_value not in ["0", "1", "2"]:
+            self.logger.debug(f"AGV {self.agvId} 鐘舵�佸紓甯�(status={status_value})锛屾嫆缁濇柊浠诲姟")
+            return False
+        
+        # 妫�鏌ュ厖鐢电姸鎬� - 濡傛灉鐢甸噺杩囦綆蹇呴』鍏呯數锛屼笉鑳芥帴鍙楁柊浠诲姟
+        if self.need_charging():
+            self.logger.debug(f"AGV {self.agvId} 鐢甸噺杩囦綆({self.voltage}% <= {self.lowVol}%)锛屽繀椤诲厖鐢碉紝鎷掔粷鏂颁换鍔�")
+            return False
+        
+        # 濡傛灉鏄珮浼樺厛绾т换鍔★紝鍗充娇浣庣數閲忎篃鍙互鎺ュ彈
+        if priority >= 9:  # 楂樹紭鍏堢骇浠诲姟闃堝��
+            self.logger.debug(f"AGV {self.agvId} 鎺ュ彈楂樹紭鍏堢骇浠诲姟(priority={priority})")
+            return True
+        
+        # 瀵逛簬鏅�氫紭鍏堢骇浠诲姟锛屽鏋滅數閲忎綆浣嗕笉鏄繀椤诲厖鐢碉紝鍙互鏍规嵁浼樺厛绾у喅瀹�
+        if self.can_auto_charge():
+            # 浼樺厛绾ц秺楂橈紝瓒婂�惧悜浜庢帴鍙椾换鍔�
+            if priority >= 5:  # 涓珮浼樺厛绾�
+                self.logger.debug(f"AGV {self.agvId} 鎺ュ彈涓珮浼樺厛绾т换鍔�(priority={priority}, 鐢甸噺={self.voltage}%)")
+                return True
+            elif priority >= 3:  # 涓瓑浼樺厛绾э紝75%姒傜巼鎺ュ彈
+                import random
+                accept = random.random() < 0.75
+                self.logger.debug(f"AGV {self.agvId} 闅忔満鍐冲畾{'鎺ュ彈' if accept else '鎷掔粷'}涓瓑浼樺厛绾т换鍔�(priority={priority}, 鐢甸噺={self.voltage}%)")
+                return accept
+            elif priority >= 1:  # 浣庝紭鍏堢骇锛�50%姒傜巼鎺ュ彈锛堝寘鎷紭鍏堢骇1锛�
+                import random
+                accept = random.random() < 0.5
+                self.logger.debug(f"AGV {self.agvId} 闅忔満鍐冲畾{'鎺ュ彈' if accept else '鎷掔粷'}浣庝紭鍏堢骇浠诲姟(priority={priority}, 鐢甸噺={self.voltage}%)")
+                return accept
+            else:  # 鏋佷綆浼樺厛绾э紙priority=0锛夛紝鎷掔粷
+                self.logger.debug(f"AGV {self.agvId} 鐢甸噺鍋忎綆({self.voltage}%)锛屾嫆缁濇瀬浣庝紭鍏堢骇浠诲姟(priority={priority})")
+                return False
+        
+        # 姝e父鎯呭喌涓嬪彲浠ユ帴鍙椾换鍔�
+        self.logger.debug(f"AGV {self.agvId} 鐘舵�佽壇濂斤紝鍙互鎺ュ彈浠诲姟(鐢甸噺={self.voltage}%, priority={priority})")
+        return True
+    
+    def is_overloaded(self) -> bool:
+        """
+        妫�鏌ユ槸鍚﹁繃杞�
+        
+        Returns:
+            bool: 鏄惁杩囪浇
+        """
+        return self.current_task_count >= self.max_capacity
+    
+    def get_workload_ratio(self) -> float:
+        """
+        鑾峰彇宸ヤ綔璐熻浇姣斾緥
+        
+        Returns:
+            float: 宸ヤ綔璐熻浇姣斾緥 (0.0 - 1.0)
+        """
+        return min(1.0, self.current_task_count / self.max_capacity)
+    
+    def get_task_capacity(self) -> int:
+        """
+        鑾峰彇鍓╀綑浠诲姟瀹归噺
+        
+        Returns:
+            int: 鍓╀綑瀹归噺
+        """
+        return max(0, self.max_capacity - self.current_task_count)
+    
+    def need_charging(self) -> bool:
+        """
+        妫�鏌ユ槸鍚﹂渶瑕佸厖鐢�
+        
+        Returns:
+            bool: 鏄惁闇�瑕佸厖鐢�
+        """
+        return self.voltage <= self.lowVol
+    
+    def can_auto_charge(self) -> bool:
+        """
+        妫�鏌ユ槸鍚﹀彲浠ヨ嚜鍔ㄥ厖鐢碉紙浣庝簬闃堝�间絾涓嶆槸蹇呴』鍏呯數锛�
+        
+        Returns:
+            bool: 鏄惁鍙互鑷姩鍏呯數
+        """
+        return self.lowVol < self.voltage <= self.autoCharge
+    
+    def is_low_power(self) -> bool:
+        """
+        妫�鏌ユ槸鍚︿负浣庣數閲忕姸鎬�
+        
+        Returns:
+            bool: 鏄惁涓轰綆鐢甸噺鐘舵��
+        """
+        return self.voltage <= self.autoCharge
+    
+    def get_charging_priority(self) -> int:
+        """
+        鑾峰彇鍏呯數浼樺厛绾э紙鏁板�艰秺澶т紭鍏堢骇瓒婇珮锛�
+        
+        Returns:
+            int: 鍏呯數浼樺厛绾�
+        """
+        if self.need_charging():
+            return 100  # 蹇呴』鍏呯數锛屾渶楂樹紭鍏堢骇
+        elif self.can_auto_charge():
+            return 50 + (self.autoCharge - self.voltage)  # 鏍规嵁鐢甸噺宸绠椾紭鍏堢骇
+        else:
+            return 0  # 鏃犻渶鍏呯數
+    
+    def calculate_efficiency_score(self, path_mapping: Dict[str, Dict[str, int]]) -> float:
+        """
+        璁$畻AGV鏁堢巼寰楀垎
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏�
+            
+        Returns:
+            float: 鏁堢巼寰楀垎 (0.0 - 1.0)
+        """
+        # 鍩虹鏁堢巼鍒嗘暟
+        base_score = 0.7
+        
+        # 鏍规嵁瀹屾垚浠诲姟鏁伴噺璋冩暣
+        if self.total_completed_tasks > 0:
+            completion_bonus = min(0.2, self.total_completed_tasks * 0.01)
+            base_score += completion_bonus
+        
+        # 鏍规嵁璐熻浇鎯呭喌璋冩暣
+        load_ratio = self.get_workload_ratio()
+        if load_ratio < 0.8:  # 璐熻浇涓嶅お楂樻椂鏁堢巼鏇撮珮
+            load_bonus = (0.8 - load_ratio) * 0.1
+            base_score += load_bonus
+        
+        # 鏍规嵁AGV鐘舵�佽皟鏁�
+        if self.status == "0":  # 姝e父鐘舵��
+            base_score += 0.1
+        elif self.status in ["3", "4"]:  # 寮傚父鐘舵��
+            base_score -= 0.2
+        
+        # 鏃堕棿鍥犲瓙锛氭渶杩戞洿鏂扮殑AGV寰楀垎鏇撮珮
+        time_since_update = time.time() - self.last_update_time
+        if time_since_update < 60:  # 1鍒嗛挓鍐呮洿鏂�
+            time_bonus = (60 - time_since_update) / 600  # 鏈�澶氬姞0.1
+            base_score += time_bonus
+        
+        return max(0.0, min(1.0, base_score))
+    
+    def estimate_travel_time(self, target_code: str) -> float:
+        """
+        浼扮畻鍒扮洰鏍囦綅缃殑琛岄┒鏃堕棿
+        
+        Args:
+            target_code: 鐩爣浣嶇疆鐮�
+            
+        Returns:
+            float: 浼扮畻鏃堕棿锛堢锛�
+        """
+        if not self.coordinates:
+            return float('inf')
+        
+        target_coord = get_coordinate_from_path_id(target_code, self.path_mapping)
+        if not target_coord:
+            return float('inf')
+        
+        # 璁$畻鏇煎搱椤胯窛绂�
+        distance = calculate_manhattan_distance(self.coordinates, target_coord)
+        
+        # 鍋囪AGV骞冲潎閫熷害涓�1鍗曚綅/绉�
+        estimated_time = distance * 1.0
+        
+        # 鑰冭檻褰撳墠璐熻浇瀵归�熷害鐨勫奖鍝�
+        load_factor = 1.0 + (self.get_workload_ratio() * 0.2)
+        
+        return estimated_time * load_factor
+    
+    def get_task_by_id(self, task_id: str) -> Optional[TaskAssignment]:
+        """
+        鏍规嵁浠诲姟ID鑾峰彇浠诲姟鍒嗛厤淇℃伅
+        
+        Args:
+            task_id: 浠诲姟ID
+            
+        Returns:
+            Optional[TaskAssignment]: 浠诲姟鍒嗛厤淇℃伅
+        """
+        for task in self.assigned_tasks:
+            if task.task_id == task_id:
+                return task
+        return None
+    
+    def get_pending_tasks(self) -> List[TaskAssignment]:
+        """
+        鑾峰彇寰呮墽琛岀殑浠诲姟
+        
+        Returns:
+            List[TaskAssignment]: 寰呮墽琛屼换鍔″垪琛�
+        """
+        return [task for task in self.assigned_tasks 
+                if task.status in [AGVTaskStatus.ASSIGNED, AGVTaskStatus.EXECUTING]]
+    
+    def clear_completed_tasks(self):
+        """娓呯悊宸插畬鎴愮殑浠诲姟"""
+        self.assigned_tasks = [task for task in self.assigned_tasks 
+                              if task.status != AGVTaskStatus.COMPLETED]
+    
+    def __str__(self) -> str:
+        """瀛楃涓茶〃绀�"""
+        return (f"AGV({self.agvId}, status={self.status}, "
+                f"pos={self.mapCode}, tasks={self.current_task_count}/{self.max_capacity})")
+
+
+class AGVModelManager:
+    """AGV妯″瀷绠$悊鍣�"""
+    
+    def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
+        """
+        鍒濆鍖朅GV妯″瀷绠$悊鍣�
+        
+        Args:
+            path_mapping: 璺緞鐐规槧灏勫瓧鍏�
+        """
+        self.path_mapping = path_mapping
+        self.agv_models: Dict[str, AGVModel] = {}
+        self.agv_status_data: Dict[str, AGVStatus] = {}  # 瀛樺偍鍘熷AGV鐘舵�佹暟鎹�
+        self.logger = logging.getLogger(__name__)
+    
+    def update_agv_data(self, agv_status_list: List[AGVStatus]):
+        """
+        鏇存柊AGV鏁版嵁
+        
+        Args:
+            agv_status_list: AGV鐘舵�佸垪琛�
+        """
+        # 鑾峰彇褰撳墠鏇存柊鐨凙GV ID鍒楄〃
+        current_agv_ids = {agv_status.agvId for agv_status in agv_status_list}
+        
+        # 绉婚櫎涓嶅啀瀛樺湪鐨凙GV妯″瀷鍜岀姸鎬佹暟鎹�
+        removed_agvs = []
+        for agv_id in list(self.agv_models.keys()):
+            if agv_id not in current_agv_ids:
+                removed_agvs.append(agv_id)
+                del self.agv_models[agv_id]
+                if agv_id in self.agv_status_data:
+                    del self.agv_status_data[agv_id]
+        
+        if removed_agvs:
+            self.logger.info(f"绉婚櫎涓嶅瓨鍦ㄧ殑AGV: {removed_agvs}")
+        
+        # 鏇存柊鎴栧垱寤篈GV妯″瀷鍜岀姸鎬佹暟鎹�
+        for agv_status in agv_status_list:
+            agv_id = agv_status.agvId
+            
+            # 瀛樺偍鍘熷AGV鐘舵�佹暟鎹�
+            self.agv_status_data[agv_id] = agv_status
+            
+            # 濡傛灉AGV妯″瀷涓嶅瓨鍦紝鍒涘缓鏂扮殑
+            if agv_id not in self.agv_models:
+                self.agv_models[agv_id] = AGVModel(agv_id, self.path_mapping)
+                self.logger.info(f"鍒涘缓鏂扮殑AGV妯″瀷: {agv_id}")
+            
+            # 鏇存柊AGV妯″瀷
+            self.agv_models[agv_id].update_from_agv_status(agv_status)
+        
+        self.logger.debug(f"鏇存柊浜� {len(agv_status_list)} 涓狝GV妯″瀷锛屽綋鍓嶆�绘暟: {len(self.agv_models)}")
+    
+    def get_all_agv_status(self) -> List[AGVStatus]:
+        """
+        鑾峰彇鎵�鏈堿GV鐨勫師濮嬬姸鎬佹暟鎹�
+        
+        Returns:
+            List[AGVStatus]: AGV鐘舵�佹暟鎹垪琛�
+        """
+        return list(self.agv_status_data.values())
+    
+    def get_agv_status(self, agv_id: str) -> Optional[AGVStatus]:
+        """
+        鑾峰彇鎸囧畾AGV鐨勫師濮嬬姸鎬佹暟鎹�
+        
+        Args:
+            agv_id: AGV ID
+            
+        Returns:
+            Optional[AGVStatus]: AGV鐘舵�佹暟鎹�
+        """
+        return self.agv_status_data.get(agv_id)
+    
+    def get_agv_model(self, agv_id: str) -> Optional[AGVModel]:
+        """
+        鑾峰彇AGV妯″瀷
+        
+        Args:
+            agv_id: AGV ID
+            
+        Returns:
+            Optional[AGVModel]: AGV妯″瀷
+        """
+        return self.agv_models.get(agv_id)
+    
+    def get_all_agvs(self) -> List[AGVModel]:
+        """
+        鑾峰彇鎵�鏈堿GV妯″瀷
+        
+        Returns:
+            List[AGVModel]: AGV妯″瀷鍒楄〃
+        """
+        return list(self.agv_models.values())
+    
+    def get_available_agvs(self) -> List[AGVModel]:
+        """
+        鑾峰彇鍙敤鐨凙GV妯″瀷锛堟湭婊¤浇涓旂姸鎬佹甯革級
+        
+        Returns:
+            List[AGVModel]: 鍙敤AGV妯″瀷鍒楄〃
+        """
+        available_agvs = []
+        for agv in self.agv_models.values():
+            if not agv.is_overloaded() and agv.can_accept_task(5):
+                available_agvs.append(agv)
+        
+        return available_agvs
+    
+    def get_agvs_by_status(self, status: str) -> List[AGVModel]:
+        """
+        鏍规嵁鐘舵�佽幏鍙朅GV鍒楄〃
+        
+        Args:
+            status: AGV鐘舵��
+            
+        Returns:
+            List[AGVModel]: 绗﹀悎鐘舵�佺殑AGV鍒楄〃
+        """
+        return [agv for agv in self.agv_models.values() if agv.status == status]
+    
+    def cleanup_old_agvs(self, max_age_seconds: float = 300):
+        """
+        娓呯悊闀挎椂闂存湭鏇存柊鐨凙GV
+        
+        Args:
+            max_age_seconds: 鏈�澶у厑璁哥殑鏈洿鏂版椂闂达紙绉掞級
+        """
+        current_time = time.time()
+        old_agvs = []
+        
+        for agv_id, agv in self.agv_models.items():
+            if current_time - agv.last_update_time > max_age_seconds:
+                old_agvs.append(agv_id)
+        
+        for agv_id in old_agvs:
+            del self.agv_models[agv_id]
+            self.logger.info(f"娓呯悊闀挎椂闂存湭鏇存柊鐨凙GV: {agv_id}")
+    
+    def get_statistics(self) -> Dict[str, any]:
+        """
+        鑾峰彇AGV缁熻淇℃伅
+        
+        Returns:
+            Dict: 缁熻淇℃伅
+        """
+        total_agvs = len(self.agv_models)
+        available_agvs = len(self.get_available_agvs())
+        total_tasks = sum(agv.current_task_count for agv in self.agv_models.values())
+        total_capacity = sum(agv.max_capacity for agv in self.agv_models.values())
+        
+        avg_efficiency = 0.0
+        if self.agv_models:
+            avg_efficiency = sum(agv.calculate_efficiency_score(self.path_mapping) 
+                               for agv in self.agv_models.values()) / total_agvs
+        
+        return {
+            "total_agvs": total_agvs,
+            "available_agvs": available_agvs,
+            "total_assigned_tasks": total_tasks,
+            "total_capacity": total_capacity,
+            "capacity_utilization": total_tasks / total_capacity if total_capacity > 0 else 0.0,
+            "average_efficiency": avg_efficiency
+        } 
\ No newline at end of file

--
Gitblit v1.9.1