""" 监控RCS系统状态并生成路径 """ import time import threading import logging from typing import Dict, List, Optional, Any import queue from common.data_models import AGVStatus, PlannedPath, from_dict from common.api_client import RCSAPIClient from algorithm_system.algorithms.path_planning import BatchPathPlanner from algorithm_system.models.agv_model import AGVModelManager from common.utils import load_path_mapping, generate_segment_id try: from config.settings import RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL except ImportError: RCS_SERVER_HOST = "10.10.10.156" RCS_SERVER_PORT = 8088 MONITOR_POLLING_INTERVAL = 5.0 logging.warning("无法从config.settings导入配置,使用默认值") class PathMonitorService: """监控RCS状态并生成路径""" def __init__(self, rcs_host: str = None, rcs_port: int = None, poll_interval: float = None, path_algorithm: str = "A_STAR", auto_send_paths: bool = True): #初始化路径监控服务 self.logger = logging.getLogger(__name__) self.rcs_host = rcs_host or RCS_SERVER_HOST self.rcs_port = rcs_port or RCS_SERVER_PORT self.poll_interval = poll_interval or MONITOR_POLLING_INTERVAL self.path_algorithm = path_algorithm self.auto_send_paths = auto_send_paths self.rcs_client = RCSAPIClient(self.rcs_host, self.rcs_port, timeout=10) self.path_mapping = load_path_mapping() self.agv_manager = AGVModelManager(self.path_mapping) self.path_planner = BatchPathPlanner( self.path_mapping, self.path_algorithm, self.agv_manager ) self.is_running = False self.monitor_thread = None self._stop_event = threading.Event() self.stats = { 'poll_count': 0, # 轮询次数 'successful_polls': 0, # 成功轮询次数 'path_generations': 0, # 路径生成次数 'last_poll_time': None, # 最后轮询时间 'last_generation_time': None # 最后生成时间 } self.logger.info(f"路径监控服务初始化完成 - 轮询间隔: {poll_interval}s, 算法: {path_algorithm}") def start_monitoring(self): """启动监控服务""" if self.is_running: self.logger.warning("监控服务已在运行中") return self.is_running = True self._stop_event.clear() self.monitor_thread = threading.Thread( target=self._monitoring_loop, name="PathMonitorThread", daemon=True ) self.monitor_thread.start() self.logger.info("监控服务已启动") def stop_monitoring(self): """停止监控服务""" if not self.is_running: self.logger.warning("监控服务未在运行") return self.is_running = False self._stop_event.set() if self.monitor_thread and self.monitor_thread.is_alive(): self.monitor_thread.join(timeout=5.0) self.logger.info("监控服务已停止") def _monitoring_loop(self): """监控主循环""" self.logger.info("开始监控循环...") while self.is_running and not self._stop_event.is_set(): try: self._perform_monitoring_cycle() if not self._stop_event.wait(self.poll_interval): continue else: break except Exception as e: self.logger.error(f"监控循环异常: {e}") self._stop_event.wait(min(self.poll_interval * 2, 10.0)) self.logger.info("监控循环已结束") def _perform_monitoring_cycle(self): """执行一次监控周期""" start_time = time.time() self.stats['poll_count'] += 1 self.stats['last_poll_time'] = start_time try: self.logger.info(f"开始路径监控周期 #{self.stats['poll_count']}") # 1. 获取当前AGV状态和任务状态 current_agv_status, current_task_status = self._fetch_rcs_status() if not current_agv_status: self.logger.debug("未获取到AGV状态数据,跳过路径生成") return # 2. 直接根据当前状态生成免碰撞路径 self.logger.info(f"[路径生成] 开始为 {len(current_agv_status)} 个AGV生成路径") generated_paths = self._generate_collision_free_paths(current_agv_status) # 3. 发送路径到RCS(根据配置决定) if generated_paths: if self.auto_send_paths: self._send_paths_to_rcs(generated_paths) else: self.logger.debug(f"路径生成完成但未发送到RCS - AGV数量: {len(current_agv_status)}, 路径数: {len(generated_paths)}") self.logger.info(f"路径生成完成 - AGV数量: {len(current_agv_status)}, 路径数: {len(generated_paths)}, 自动发送: {self.auto_send_paths}") else: self.logger.debug("未生成任何路径(可能所有AGV都处于空闲状态)") # 更新成功统计 self.stats['successful_polls'] += 1 self.stats['path_generations'] += 1 self.stats['last_generation_time'] = time.time() generation_time = (time.time() - start_time) * 1000 self.logger.debug(f"监控周期完成 - 耗时: {generation_time:.2f}ms") except Exception as e: self.logger.error(f"监控周期执行失败: {e}") def _generate_unique_seg_id(self, agv_id: str, task_id: str = '') -> str: """生成segId""" try: if task_id: # 有任务时使用task_id return generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2") else: # 无任务时使用时间戳作为目标位置,确保不同时间有不同segId import time time_target = f"IDLE_{int(time.time() / 3600)}" # 按小时分组 return generate_segment_id(agv_id=agv_id, target_position=time_target, action_type="4") except Exception as e: self.logger.error(f"生成segId失败: {e}") # 降级方案:使用common.utils的函数生成备用ID import time fallback_target = f"FALLBACK_{int(time.time())}" return generate_segment_id(agv_id=agv_id, target_position=fallback_target, action_type="1") def _fetch_rcs_status(self) -> tuple[List[AGVStatus], Dict]: """从RCS获取当前状态""" agv_status_list = [] task_status = {} try: # 获取AGV状态 - 使用空参数获取所有AGV状态和任务状态 self.logger.info(" 轮询目标: 使用空参数(agvId=None, mapId=None)获取RCS所有AGV状态和任务状态") agv_response = self.rcs_client.get_agv_status(agv_id=None, map_id=None) if agv_response.code == 200 and agv_response.data: self.logger.info(f"成功获取AGV状态 - 数量: {len(agv_response.data)}") # 统计信息 total_tasks = 0 executing_tasks = 0 loaded_tasks = 0 self.logger.info("=" * 80) self.logger.info("[详细AGV状态信息]") self.logger.info("=" * 80) for i, agv_data in enumerate(agv_response.data, 1): try: # 转换为AGVStatus对象 agv_status = from_dict(AGVStatus, agv_data) agv_status_list.append(agv_status) # 打印AGV基本信息 self.logger.info(f"[AGV #{i}] {agv_status.agvId}") self.logger.info(f" 状态: {agv_status.status} | 位置: {agv_status.position} | 方向: {agv_status.direction}") self.logger.info(f" 电压: {agv_status.vol} | 错误码: {agv_status.error} | 空背篓: {agv_status.empty}") # 分析背篓信息 if agv_status.backpack: self.logger.info(f" [背篓信息] ({len(agv_status.backpack)} 个):") for backpack_item in agv_status.backpack: status_text = "执行中" if backpack_item.execute else ("已装载" if backpack_item.loaded else "空闲") task_info = f"任务: {backpack_item.taskId}" if backpack_item.taskId else "无任务" self.logger.info(f" [位置{backpack_item.index}] {task_info} | " f"状态: {status_text} | 已装载: {backpack_item.loaded} | 执行中: {backpack_item.execute}") # 统计任务 if backpack_item.taskId: total_tasks += 1 if backpack_item.execute: executing_tasks += 1 if backpack_item.loaded: loaded_tasks += 1 # 提取任务状态信息 task_status[backpack_item.taskId] = { 'agvId': agv_status.agvId, 'loaded': backpack_item.loaded, 'execute': backpack_item.execute, 'index': backpack_item.index } else: self.logger.info(f" [背篓信息] 无背篓信息") self.logger.info("-" * 60) except Exception as e: self.logger.warning(f"解析AGV状态数据失败: {agv_data} - {e}") # 打印统计摘要 self.logger.info("[任务统计摘要]") self.logger.info(f" 总AGV数量: {len(agv_status_list)}") self.logger.info(f" 总任务数量: {total_tasks}") self.logger.info(f" 执行中任务: {executing_tasks}") self.logger.info(f" 已装载任务: {loaded_tasks}") self.logger.info(f" 未装载任务: {total_tasks - loaded_tasks}") self.logger.info("=" * 80) else: self.logger.warning(f"获取AGV状态失败: {agv_response.msg if agv_response else '无响应'}") except Exception as e: self.logger.error(f"从RCS获取状态失败: {e}") return agv_status_list, task_status def _generate_collision_free_paths(self, agv_status_list: List[AGVStatus]) -> List[Dict]: """为所有AGV生成免碰撞路径""" try: self.logger.debug(f"开始为 {len(agv_status_list)} 个AGV生成免碰撞路径") # 使用批量路径规划器生成路径 result = self.path_planner.plan_all_agv_paths( agv_status_list=agv_status_list, include_idle_agv=False, # 只为执行任务的AGV生成路径 constraints=None ) planned_paths = result.get('plannedPaths', result.get('planned_paths', [])) # 诊断:显示路径规划结果的所有字段 self.logger.debug(f"路径规划器返回的字段: {list(result.keys())}") self.logger.debug(f"planned_paths字段存在: {'planned_paths' in result}") self.logger.debug(f"plannedPaths字段存在: {'plannedPaths' in result}") self.logger.debug(f"最终获取到的路径数量: {len(planned_paths)}") if planned_paths: self.logger.info(f"成功生成 {len(planned_paths)} 条免碰撞路径") # 记录每个AGV的路径信息 for path_info in planned_paths: agv_id = path_info.get('agvId', 'unknown') code_list = path_info.get('codeList', []) self.logger.debug(f"AGV {agv_id} 路径长度: {len(code_list)}") else: self.logger.info("未生成任何路径(可能所有AGV都处于空闲状态)") return planned_paths except Exception as e: self.logger.error(f"生成免碰撞路径失败: {e}") return [] def _send_paths_to_rcs(self, generated_paths: List[Dict]): """将生成的路径批量发送到RCS系统""" try: self.logger.info(f"开始批量发送 {len(generated_paths)} 条路径到RCS系统") # 构建批量路径数据 agv_paths = [] import uuid import time as time_module for path_info in generated_paths: agv_id = path_info.get('agvId') code_list = path_info.get('codeList', []) if agv_id and code_list: path_task_id = '' for path_code in code_list: if path_code.get('taskId'): path_task_id = path_code.get('taskId') break navigation_data = [] for path_code in code_list: nav_item = { 'code': path_code.get('code', ''), 'direction': path_code.get('direction', '90'), 'type': path_code.get('type') } if path_code.get('taskId'): nav_item['taskId'] = path_code.get('taskId') if path_code.get('posType'): nav_item['posType'] = path_code.get('posType') if path_code.get('lev') is not None: nav_item['lev'] = path_code.get('lev') navigation_data.append(nav_item) auto_seg_id = self._generate_unique_seg_id(agv_id, path_task_id) agv_path_data = { 'agvId': agv_id, 'segId': auto_seg_id, 'codeList': navigation_data } agv_paths.append(agv_path_data) if not agv_paths: self.logger.warning("没有有效的路径数据需要发送") return try: import requests import json rcs_url = f"http://{self.rcs_host}:{self.rcs_port}/api/open/algorithm/zkd/navigation/v1" self.logger.info(f"算法系统发送路径规划结果到RCS:") self.logger.info(f" AGV数量: {len(agv_paths)}") self.logger.info(f" 目标URL: {rcs_url}") successful_sends = 0 for agv_path in agv_paths: agv_id = agv_path['agvId'] seg_id = agv_path['segId'] code_list = agv_path['codeList'] # 构造标准导航请求数据(agvId, segId, codeList格式) navigation_data = { 'agvId': agv_id, 'segId': seg_id, 'codeList': code_list } # 显示路径摘要 start_code = code_list[0].get('code', '') if code_list else '' end_code = code_list[-1].get('code', '') if code_list else '' task_codes = [nc for nc in code_list if nc.get('taskId')] task_id = task_codes[0].get('taskId', '') if task_codes else '' self.logger.info(f" AGV: {agv_id}, 任务: {task_id}, 路径: {start_code} -> {end_code} ({len(code_list)}点)") # 详细显示路径JSON数据 self.logger.info(f"发送路径JSON数据:") self.logger.info(json.dumps(navigation_data, ensure_ascii=False, indent=2)) try: response = requests.post( rcs_url, json=navigation_data, timeout=10, headers={'Content-Type': 'application/json'} ) if response.status_code == 200: response_data = response.json() self.logger.info(f" 成功发送路径到RCS - AGV: {agv_id}, 响应: {response_data.get('msg', 'OK')}") successful_sends += 1 else: self.logger.warning(f" 发送路径到RCS失败 - AGV: {agv_id}, HTTP状态: {response.status_code}") self.logger.warning(f" 响应内容: {response.text}") except Exception as e: self.logger.warning(f" 发送导航指令异常 - AGV: {agv_id}, 错误: {e}") self.logger.info(f" 路径发送完成 - 成功: {successful_sends}/{len(agv_paths)}") except Exception as e: self.logger.error(f" 发送导航指令异常: {e}") except Exception as e: self.logger.error(f"发送路径到RCS异常: {e}") def get_monitoring_status(self) -> Dict[str, Any]: """获取监控服务状态""" return { 'is_running': self.is_running, 'rcs_host': self.rcs_host, 'rcs_port': self.rcs_port, 'poll_interval': self.poll_interval, 'path_algorithm': self.path_algorithm, 'auto_send_paths': self.auto_send_paths, 'stats': self.stats.copy() } def reset_stats(self): """重置统计信息""" self.stats = { 'poll_count': 0, 'successful_polls': 0, 'path_generations': 0, 'last_poll_time': None, 'last_generation_time': None } self.logger.info("监控统计信息已重置")