From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001
From: zhang <zc857179121@qq.com>
Date: 星期三, 02 七月 2025 13:12:26 +0800
Subject: [PATCH] 初版提交

---
 algorithm_system/path_monitor.py |  442 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 442 insertions(+), 0 deletions(-)

diff --git a/algorithm_system/path_monitor.py b/algorithm_system/path_monitor.py
new file mode 100644
index 0000000..33623db
--- /dev/null
+++ b/algorithm_system/path_monitor.py
@@ -0,0 +1,442 @@
+"""
+鐩戞帶RCS绯荤粺鐘舵�佸苟鐢熸垚璺緞
+"""
+import time
+import threading
+import logging
+from typing import Dict, List, Optional, Any
+import queue
+
+from common.data_models import AGVStatus, PlannedPath, from_dict
+from common.api_client import RCSAPIClient
+from algorithm_system.algorithms.path_planning import BatchPathPlanner
+from algorithm_system.models.agv_model import AGVModelManager
+from common.utils import load_path_mapping, generate_segment_id
+
+try:
+    from config.settings import RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL
+except ImportError:
+    RCS_SERVER_HOST = "10.10.10.156"
+    RCS_SERVER_PORT = 8088
+    MONITOR_POLLING_INTERVAL = 5.0
+    logging.warning("鏃犳硶浠巆onfig.settings瀵煎叆閰嶇疆锛屼娇鐢ㄩ粯璁ゅ��")
+
+
+class PathMonitorService:
+    """鐩戞帶RCS鐘舵�佸苟鐢熸垚璺緞"""
+    
+    def __init__(self, 
+                 rcs_host: str = None, 
+                 rcs_port: int = None,
+                 poll_interval: float = None,
+                 path_algorithm: str = "A_STAR",
+                 auto_send_paths: bool = True):
+        #鍒濆鍖栬矾寰勭洃鎺ф湇鍔�
+        self.logger = logging.getLogger(__name__)
+        
+        self.rcs_host = rcs_host or RCS_SERVER_HOST
+        self.rcs_port = rcs_port or RCS_SERVER_PORT
+        self.poll_interval = poll_interval or MONITOR_POLLING_INTERVAL
+        self.path_algorithm = path_algorithm
+        self.auto_send_paths = auto_send_paths
+        
+        self.rcs_client = RCSAPIClient(self.rcs_host, self.rcs_port, timeout=10)
+        self.path_mapping = load_path_mapping()
+        self.agv_manager = AGVModelManager(self.path_mapping)
+        
+        self.path_planner = BatchPathPlanner(
+            self.path_mapping, 
+            self.path_algorithm,
+            self.agv_manager
+        )
+        
+        self.is_running = False
+        self.monitor_thread = None
+        self._stop_event = threading.Event()
+        
+        self.stats = {
+            'poll_count': 0,           # 杞娆℃暟
+            'successful_polls': 0,     # 鎴愬姛杞娆℃暟  
+            'path_generations': 0,     # 璺緞鐢熸垚娆℃暟
+            'last_poll_time': None,    # 鏈�鍚庤疆璇㈡椂闂�
+            'last_generation_time': None  # 鏈�鍚庣敓鎴愭椂闂�
+        }
+        
+        self.logger.info(f"璺緞鐩戞帶鏈嶅姟鍒濆鍖栧畬鎴� - 杞闂撮殧: {poll_interval}s, 绠楁硶: {path_algorithm}")
+    
+    def start_monitoring(self):
+        """鍚姩鐩戞帶鏈嶅姟"""
+        if self.is_running:
+            self.logger.warning("鐩戞帶鏈嶅姟宸插湪杩愯涓�")
+            return
+        
+        self.is_running = True
+        self._stop_event.clear()
+        
+        self.monitor_thread = threading.Thread(
+            target=self._monitoring_loop,
+            name="PathMonitorThread",
+            daemon=True
+        )
+        self.monitor_thread.start()
+        
+        self.logger.info("鐩戞帶鏈嶅姟宸插惎鍔�")
+    
+    def stop_monitoring(self):
+        """鍋滄鐩戞帶鏈嶅姟"""
+        if not self.is_running:
+            self.logger.warning("鐩戞帶鏈嶅姟鏈湪杩愯")
+            return
+        
+        self.is_running = False
+        self._stop_event.set()
+        
+        if self.monitor_thread and self.monitor_thread.is_alive():
+            self.monitor_thread.join(timeout=5.0)
+        
+        self.logger.info("鐩戞帶鏈嶅姟宸插仠姝�")
+    
+    def _monitoring_loop(self):
+        """鐩戞帶涓诲惊鐜�"""
+        self.logger.info("寮�濮嬬洃鎺у惊鐜�...")
+        
+        while self.is_running and not self._stop_event.is_set():
+            try:
+                self._perform_monitoring_cycle()
+                
+                if not self._stop_event.wait(self.poll_interval):
+                    continue
+                else:
+                    break
+                    
+            except Exception as e:
+                self.logger.error(f"鐩戞帶寰幆寮傚父: {e}")
+                self._stop_event.wait(min(self.poll_interval * 2, 10.0))
+        
+        self.logger.info("鐩戞帶寰幆宸茬粨鏉�")
+    
+    def _perform_monitoring_cycle(self):
+        """鎵ц涓�娆$洃鎺у懆鏈�"""
+        start_time = time.time()
+        
+        self.stats['poll_count'] += 1
+        self.stats['last_poll_time'] = start_time
+        
+        try:
+            self.logger.info(f"寮�濮嬭矾寰勭洃鎺у懆鏈� #{self.stats['poll_count']}")
+            
+            # 1. 鑾峰彇褰撳墠AGV鐘舵�佸拰浠诲姟鐘舵��
+            current_agv_status, current_task_status = self._fetch_rcs_status()
+            
+            if not current_agv_status:
+                self.logger.debug("鏈幏鍙栧埌AGV鐘舵�佹暟鎹紝璺宠繃璺緞鐢熸垚")
+                return
+            
+            # 2. 鐩存帴鏍规嵁褰撳墠鐘舵�佺敓鎴愬厤纰版挒璺緞
+            self.logger.info(f"[璺緞鐢熸垚] 寮�濮嬩负 {len(current_agv_status)} 涓狝GV鐢熸垚璺緞")
+            
+            generated_paths = self._generate_collision_free_paths(current_agv_status)
+            
+            # 3. 鍙戦�佽矾寰勫埌RCS锛堟牴鎹厤缃喅瀹氾級
+            if generated_paths:
+                if self.auto_send_paths:
+                    self._send_paths_to_rcs(generated_paths)
+                else:
+                    self.logger.debug(f"璺緞鐢熸垚瀹屾垚浣嗘湭鍙戦�佸埌RCS - AGV鏁伴噺: {len(current_agv_status)}, 璺緞鏁�: {len(generated_paths)}")
+                
+                self.logger.info(f"璺緞鐢熸垚瀹屾垚 - AGV鏁伴噺: {len(current_agv_status)}, 璺緞鏁�: {len(generated_paths)}, 鑷姩鍙戦��: {self.auto_send_paths}")
+            else:
+                self.logger.debug("鏈敓鎴愪换浣曡矾寰勶紙鍙兘鎵�鏈堿GV閮藉浜庣┖闂茬姸鎬侊級")
+            
+            # 鏇存柊鎴愬姛缁熻
+            self.stats['successful_polls'] += 1
+            self.stats['path_generations'] += 1
+            self.stats['last_generation_time'] = time.time()
+            
+            generation_time = (time.time() - start_time) * 1000
+            self.logger.debug(f"鐩戞帶鍛ㄦ湡瀹屾垚 - 鑰楁椂: {generation_time:.2f}ms")
+            
+        except Exception as e:
+            self.logger.error(f"鐩戞帶鍛ㄦ湡鎵ц澶辫触: {e}")
+    
+    def _generate_unique_seg_id(self, agv_id: str, task_id: str = '') -> str:
+        """鐢熸垚segId"""
+        try:
+            if task_id:
+                # 鏈変换鍔℃椂浣跨敤task_id
+                return generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2")
+            else:
+                # 鏃犱换鍔℃椂浣跨敤鏃堕棿鎴充綔涓虹洰鏍囦綅缃紝纭繚涓嶅悓鏃堕棿鏈変笉鍚宻egId
+                import time
+                time_target = f"IDLE_{int(time.time() / 3600)}"  # 鎸夊皬鏃跺垎缁�
+                return generate_segment_id(agv_id=agv_id, target_position=time_target, action_type="4")
+            
+        except Exception as e:
+            self.logger.error(f"鐢熸垚segId澶辫触: {e}")
+            # 闄嶇骇鏂规锛氫娇鐢╟ommon.utils鐨勫嚱鏁扮敓鎴愬鐢↖D
+            import time
+            fallback_target = f"FALLBACK_{int(time.time())}"
+            return generate_segment_id(agv_id=agv_id, target_position=fallback_target, action_type="1")
+    
+    def _fetch_rcs_status(self) -> tuple[List[AGVStatus], Dict]:
+        """浠嶳CS鑾峰彇褰撳墠鐘舵��"""
+        agv_status_list = []
+        task_status = {}
+        
+        try:
+            # 鑾峰彇AGV鐘舵�� - 浣跨敤绌哄弬鏁拌幏鍙栨墍鏈堿GV鐘舵�佸拰浠诲姟鐘舵��
+            self.logger.info(" 杞鐩爣: 浣跨敤绌哄弬鏁�(agvId=None, mapId=None)鑾峰彇RCS鎵�鏈堿GV鐘舵�佸拰浠诲姟鐘舵��")
+            agv_response = self.rcs_client.get_agv_status(agv_id=None, map_id=None)
+            
+            if agv_response.code == 200 and agv_response.data:
+                self.logger.info(f"鎴愬姛鑾峰彇AGV鐘舵�� - 鏁伴噺: {len(agv_response.data)}")
+                
+                # 缁熻淇℃伅
+                total_tasks = 0
+                executing_tasks = 0
+                loaded_tasks = 0
+                
+                self.logger.info("=" * 80)
+                self.logger.info("[璇︾粏AGV鐘舵�佷俊鎭痌")
+                self.logger.info("=" * 80)
+                
+                for i, agv_data in enumerate(agv_response.data, 1):
+                    try:
+                        # 杞崲涓篈GVStatus瀵硅薄
+                        agv_status = from_dict(AGVStatus, agv_data)
+                        agv_status_list.append(agv_status)
+                        
+                        # 鎵撳嵃AGV鍩烘湰淇℃伅
+                        self.logger.info(f"[AGV #{i}] {agv_status.agvId}")
+                        self.logger.info(f"   鐘舵��: {agv_status.status} | 浣嶇疆: {agv_status.position} | 鏂瑰悜: {agv_status.direction}")
+                        self.logger.info(f"   鐢靛帇: {agv_status.vol} | 閿欒鐮�: {agv_status.error} | 绌鸿儗绡�: {agv_status.empty}")
+                        
+                        # 鍒嗘瀽鑳岀瘬淇℃伅
+                        if agv_status.backpack:
+                            self.logger.info(f"   [鑳岀瘬淇℃伅] ({len(agv_status.backpack)} 涓�):")
+                            for backpack_item in agv_status.backpack:
+                                status_text = "鎵ц涓�" if backpack_item.execute else ("宸茶杞�" if backpack_item.loaded else "绌洪棽")
+                                task_info = f"浠诲姟: {backpack_item.taskId}" if backpack_item.taskId else "鏃犱换鍔�"
+                                
+                                self.logger.info(f"     [浣嶇疆{backpack_item.index}] {task_info} | "
+                                                f"鐘舵��: {status_text} | 宸茶杞�: {backpack_item.loaded} | 鎵ц涓�: {backpack_item.execute}")
+                                
+                                # 缁熻浠诲姟
+                                if backpack_item.taskId:
+                                    total_tasks += 1
+                                    if backpack_item.execute:
+                                        executing_tasks += 1
+                                    if backpack_item.loaded:
+                                        loaded_tasks += 1
+                                    
+                                    # 鎻愬彇浠诲姟鐘舵�佷俊鎭�
+                                    task_status[backpack_item.taskId] = {
+                                        'agvId': agv_status.agvId,
+                                        'loaded': backpack_item.loaded,
+                                        'execute': backpack_item.execute,
+                                        'index': backpack_item.index
+                                    }
+                        else:
+                            self.logger.info(f"   [鑳岀瘬淇℃伅] 鏃犺儗绡撲俊鎭�")
+                        
+                        self.logger.info("-" * 60)
+                    except Exception as e:
+                        self.logger.warning(f"瑙f瀽AGV鐘舵�佹暟鎹け璐�: {agv_data} - {e}")
+                
+                # 鎵撳嵃缁熻鎽樿
+                self.logger.info("[浠诲姟缁熻鎽樿]")
+                self.logger.info(f"   鎬籄GV鏁伴噺: {len(agv_status_list)}")
+                self.logger.info(f"   鎬讳换鍔℃暟閲�: {total_tasks}")
+                self.logger.info(f"   鎵ц涓换鍔�: {executing_tasks}")
+                self.logger.info(f"   宸茶杞戒换鍔�: {loaded_tasks}")
+                self.logger.info(f"   鏈杞戒换鍔�: {total_tasks - loaded_tasks}")
+                self.logger.info("=" * 80)
+                
+            else:
+                self.logger.warning(f"鑾峰彇AGV鐘舵�佸け璐�: {agv_response.msg if agv_response else '鏃犲搷搴�'}")
+        
+        except Exception as e:
+            self.logger.error(f"浠嶳CS鑾峰彇鐘舵�佸け璐�: {e}")
+        
+        return agv_status_list, task_status
+    
+
+    
+    def _generate_collision_free_paths(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
+        """涓烘墍鏈堿GV鐢熸垚鍏嶇鎾炶矾寰�"""
+        try:
+            self.logger.debug(f"寮�濮嬩负 {len(agv_status_list)} 涓狝GV鐢熸垚鍏嶇鎾炶矾寰�")
+            
+            # 浣跨敤鎵归噺璺緞瑙勫垝鍣ㄧ敓鎴愯矾寰�
+            result = self.path_planner.plan_all_agv_paths(
+                agv_status_list=agv_status_list,
+                include_idle_agv=False,  # 鍙负鎵ц浠诲姟鐨凙GV鐢熸垚璺緞
+                constraints=None
+            )
+
+            planned_paths = result.get('plannedPaths', result.get('planned_paths', []))
+            
+            # 璇婃柇锛氭樉绀鸿矾寰勮鍒掔粨鏋滅殑鎵�鏈夊瓧娈�
+            self.logger.debug(f"璺緞瑙勫垝鍣ㄨ繑鍥炵殑瀛楁: {list(result.keys())}")
+            self.logger.debug(f"planned_paths瀛楁瀛樺湪: {'planned_paths' in result}")
+            self.logger.debug(f"plannedPaths瀛楁瀛樺湪: {'plannedPaths' in result}")
+            self.logger.debug(f"鏈�缁堣幏鍙栧埌鐨勮矾寰勬暟閲�: {len(planned_paths)}")
+            
+            if planned_paths:
+                self.logger.info(f"鎴愬姛鐢熸垚 {len(planned_paths)} 鏉″厤纰版挒璺緞")
+                
+                # 璁板綍姣忎釜AGV鐨勮矾寰勪俊鎭�
+                for path_info in planned_paths:
+                    agv_id = path_info.get('agvId', 'unknown')
+                    code_list = path_info.get('codeList', [])
+                    self.logger.debug(f"AGV {agv_id} 璺緞闀垮害: {len(code_list)}")
+            else:
+                self.logger.info("鏈敓鎴愪换浣曡矾寰勶紙鍙兘鎵�鏈堿GV閮藉浜庣┖闂茬姸鎬侊級")
+            
+            return planned_paths
+            
+        except Exception as e:
+            self.logger.error(f"鐢熸垚鍏嶇鎾炶矾寰勫け璐�: {e}")
+            return []
+    
+
+    
+    def _send_paths_to_rcs(self, generated_paths: List[Dict]):
+        """灏嗙敓鎴愮殑璺緞鎵归噺鍙戦�佸埌RCS绯荤粺"""
+        try:
+            self.logger.info(f"寮�濮嬫壒閲忓彂閫� {len(generated_paths)} 鏉¤矾寰勫埌RCS绯荤粺")
+            
+            # 鏋勫缓鎵归噺璺緞鏁版嵁
+            agv_paths = []
+            import uuid
+            import time as time_module
+            
+            for path_info in generated_paths:
+                agv_id = path_info.get('agvId')
+                code_list = path_info.get('codeList', [])
+                
+                if agv_id and code_list:
+                    path_task_id = ''
+                    for path_code in code_list:
+                        if path_code.get('taskId'):
+                            path_task_id = path_code.get('taskId')
+                            break
+                    
+                    navigation_data = []
+                    for path_code in code_list:
+                        nav_item = {
+                            'code': path_code.get('code', ''),
+                            'direction': path_code.get('direction', '90'),
+                            'type': path_code.get('type')
+                        }
+                        
+                        if path_code.get('taskId'):
+                            nav_item['taskId'] = path_code.get('taskId')
+                        
+                        if path_code.get('posType'):
+                            nav_item['posType'] = path_code.get('posType')
+                        
+                        if path_code.get('lev') is not None:
+                            nav_item['lev'] = path_code.get('lev')
+                            
+                        navigation_data.append(nav_item)
+                    
+                    auto_seg_id = self._generate_unique_seg_id(agv_id, path_task_id)
+                    
+                    agv_path_data = {
+                        'agvId': agv_id,
+                        'segId': auto_seg_id,
+                        'codeList': navigation_data
+                    }
+                    
+                    agv_paths.append(agv_path_data)
+            
+            if not agv_paths:
+                self.logger.warning("娌℃湁鏈夋晥鐨勮矾寰勬暟鎹渶瑕佸彂閫�")
+                return
+            
+            try:
+                import requests
+                import json
+                
+                rcs_url = f"http://{self.rcs_host}:{self.rcs_port}/api/open/algorithm/zkd/navigation/v1"
+                
+                self.logger.info(f"绠楁硶绯荤粺鍙戦�佽矾寰勮鍒掔粨鏋滃埌RCS:")
+                self.logger.info(f"   AGV鏁伴噺: {len(agv_paths)}")
+                self.logger.info(f"   鐩爣URL: {rcs_url}")
+                
+                successful_sends = 0
+                
+                for agv_path in agv_paths:
+                    agv_id = agv_path['agvId']
+                    seg_id = agv_path['segId']
+                    code_list = agv_path['codeList']
+                    
+                    # 鏋勯�犳爣鍑嗗鑸姹傛暟鎹紙agvId, segId, codeList鏍煎紡锛�
+                    navigation_data = {
+                        'agvId': agv_id,
+                        'segId': seg_id,
+                        'codeList': code_list
+                    }
+                    
+                    # 鏄剧ず璺緞鎽樿
+                    start_code = code_list[0].get('code', '') if code_list else ''
+                    end_code = code_list[-1].get('code', '') if code_list else ''
+                    task_codes = [nc for nc in code_list if nc.get('taskId')]
+                    task_id = task_codes[0].get('taskId', '') if task_codes else ''
+                    
+                    self.logger.info(f"    AGV: {agv_id}, 浠诲姟: {task_id}, 璺緞: {start_code} -> {end_code} ({len(code_list)}鐐�)")
+                    
+                    # 璇︾粏鏄剧ず璺緞JSON鏁版嵁
+                    self.logger.info(f"鍙戦�佽矾寰凧SON鏁版嵁:")
+                    self.logger.info(json.dumps(navigation_data, ensure_ascii=False, indent=2))
+                    
+                    try:
+                        response = requests.post(
+                            rcs_url,
+                            json=navigation_data,
+                            timeout=10,
+                            headers={'Content-Type': 'application/json'}
+                        )
+                        
+                        if response.status_code == 200:
+                            response_data = response.json()
+                            self.logger.info(f" 鎴愬姛鍙戦�佽矾寰勫埌RCS - AGV: {agv_id}, 鍝嶅簲: {response_data.get('msg', 'OK')}")
+                            successful_sends += 1
+                        else:
+                            self.logger.warning(f" 鍙戦�佽矾寰勫埌RCS澶辫触 - AGV: {agv_id}, HTTP鐘舵��: {response.status_code}")
+                            self.logger.warning(f"   鍝嶅簲鍐呭: {response.text}")
+                            
+                    except Exception as e:
+                        self.logger.warning(f" 鍙戦�佸鑸寚浠ゅ紓甯� - AGV: {agv_id}, 閿欒: {e}")
+                
+                self.logger.info(f" 璺緞鍙戦�佸畬鎴� - 鎴愬姛: {successful_sends}/{len(agv_paths)}")
+                    
+            except Exception as e:
+                self.logger.error(f" 鍙戦�佸鑸寚浠ゅ紓甯�: {e}")
+        
+        except Exception as e:
+            self.logger.error(f"鍙戦�佽矾寰勫埌RCS寮傚父: {e}")
+    
+    def get_monitoring_status(self) -> Dict[str, Any]:
+        """鑾峰彇鐩戞帶鏈嶅姟鐘舵��"""
+        return {
+            'is_running': self.is_running,
+            'rcs_host': self.rcs_host,
+            'rcs_port': self.rcs_port,
+            'poll_interval': self.poll_interval,
+            'path_algorithm': self.path_algorithm,
+            'auto_send_paths': self.auto_send_paths,
+            'stats': self.stats.copy()
+        }
+    
+    def reset_stats(self):
+        """閲嶇疆缁熻淇℃伅"""
+        self.stats = {
+            'poll_count': 0,
+            'successful_polls': 0,
+            'path_generations': 0,
+            'last_poll_time': None,
+            'last_generation_time': None
+        }
+        self.logger.info("鐩戞帶缁熻淇℃伅宸查噸缃�") 
\ No newline at end of file

--
Gitblit v1.9.1