"""
|
监控RCS系统状态并生成路径
|
"""
|
import time
|
import threading
|
import logging
|
from typing import Dict, List, Optional, Any
|
import queue
|
|
from common.data_models import AGVStatus, PlannedPath, from_dict
|
from common.api_client import RCSAPIClient
|
from algorithm_system.algorithms.path_planning import BatchPathPlanner
|
from algorithm_system.models.agv_model import AGVModelManager
|
from common.utils import load_path_mapping, generate_segment_id
|
|
try:
|
from config.settings import RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL
|
except ImportError:
|
RCS_SERVER_HOST = "10.10.10.156"
|
RCS_SERVER_PORT = 8088
|
MONITOR_POLLING_INTERVAL = 5.0
|
logging.warning("无法从config.settings导入配置,使用默认值")
|
|
|
class PathMonitorService:
|
"""监控RCS状态并生成路径"""
|
|
def __init__(self,
|
rcs_host: str = None,
|
rcs_port: int = None,
|
poll_interval: float = None,
|
path_algorithm: str = "A_STAR",
|
auto_send_paths: bool = True):
|
#初始化路径监控服务
|
self.logger = logging.getLogger(__name__)
|
|
self.rcs_host = rcs_host or RCS_SERVER_HOST
|
self.rcs_port = rcs_port or RCS_SERVER_PORT
|
self.poll_interval = poll_interval or MONITOR_POLLING_INTERVAL
|
self.path_algorithm = path_algorithm
|
self.auto_send_paths = auto_send_paths
|
|
self.rcs_client = RCSAPIClient(self.rcs_host, self.rcs_port, timeout=10)
|
self.path_mapping = load_path_mapping()
|
self.agv_manager = AGVModelManager(self.path_mapping)
|
|
self.path_planner = BatchPathPlanner(
|
self.path_mapping,
|
self.path_algorithm,
|
self.agv_manager
|
)
|
|
self.is_running = False
|
self.monitor_thread = None
|
self._stop_event = threading.Event()
|
|
self.stats = {
|
'poll_count': 0, # 轮询次数
|
'successful_polls': 0, # 成功轮询次数
|
'path_generations': 0, # 路径生成次数
|
'last_poll_time': None, # 最后轮询时间
|
'last_generation_time': None # 最后生成时间
|
}
|
|
self.logger.info(f"路径监控服务初始化完成 - 轮询间隔: {poll_interval}s, 算法: {path_algorithm}")
|
|
def start_monitoring(self):
|
"""启动监控服务"""
|
if self.is_running:
|
self.logger.warning("监控服务已在运行中")
|
return
|
|
self.is_running = True
|
self._stop_event.clear()
|
|
self.monitor_thread = threading.Thread(
|
target=self._monitoring_loop,
|
name="PathMonitorThread",
|
daemon=True
|
)
|
self.monitor_thread.start()
|
|
self.logger.info("监控服务已启动")
|
|
def stop_monitoring(self):
|
"""停止监控服务"""
|
if not self.is_running:
|
self.logger.warning("监控服务未在运行")
|
return
|
|
self.is_running = False
|
self._stop_event.set()
|
|
if self.monitor_thread and self.monitor_thread.is_alive():
|
self.monitor_thread.join(timeout=5.0)
|
|
self.logger.info("监控服务已停止")
|
|
def _monitoring_loop(self):
|
"""监控主循环"""
|
self.logger.info("开始监控循环...")
|
|
while self.is_running and not self._stop_event.is_set():
|
try:
|
self._perform_monitoring_cycle()
|
|
if not self._stop_event.wait(self.poll_interval):
|
continue
|
else:
|
break
|
|
except Exception as e:
|
self.logger.error(f"监控循环异常: {e}")
|
self._stop_event.wait(min(self.poll_interval * 2, 10.0))
|
|
self.logger.info("监控循环已结束")
|
|
def _perform_monitoring_cycle(self):
|
"""执行一次监控周期"""
|
start_time = time.time()
|
|
self.stats['poll_count'] += 1
|
self.stats['last_poll_time'] = start_time
|
|
try:
|
self.logger.info(f"开始路径监控周期 #{self.stats['poll_count']}")
|
|
# 1. 获取当前AGV状态和任务状态
|
current_agv_status, current_task_status = self._fetch_rcs_status()
|
|
if not current_agv_status:
|
self.logger.debug("未获取到AGV状态数据,跳过路径生成")
|
return
|
|
# 2. 直接根据当前状态生成免碰撞路径
|
self.logger.info(f"[路径生成] 开始为 {len(current_agv_status)} 个AGV生成路径")
|
|
generated_paths = self._generate_collision_free_paths(current_agv_status)
|
|
# 3. 发送路径到RCS(根据配置决定)
|
if generated_paths:
|
if self.auto_send_paths:
|
self._send_paths_to_rcs(generated_paths)
|
else:
|
self.logger.debug(f"路径生成完成但未发送到RCS - AGV数量: {len(current_agv_status)}, 路径数: {len(generated_paths)}")
|
|
self.logger.info(f"路径生成完成 - AGV数量: {len(current_agv_status)}, 路径数: {len(generated_paths)}, 自动发送: {self.auto_send_paths}")
|
else:
|
self.logger.debug("未生成任何路径(可能所有AGV都处于空闲状态)")
|
|
# 更新成功统计
|
self.stats['successful_polls'] += 1
|
self.stats['path_generations'] += 1
|
self.stats['last_generation_time'] = time.time()
|
|
generation_time = (time.time() - start_time) * 1000
|
self.logger.debug(f"监控周期完成 - 耗时: {generation_time:.2f}ms")
|
|
except Exception as e:
|
self.logger.error(f"监控周期执行失败: {e}")
|
|
def _generate_unique_seg_id(self, agv_id: str, task_id: str = '') -> str:
|
"""生成segId"""
|
try:
|
if task_id:
|
# 有任务时使用task_id
|
return generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2")
|
else:
|
# 无任务时使用时间戳作为目标位置,确保不同时间有不同segId
|
import time
|
time_target = f"IDLE_{int(time.time() / 3600)}" # 按小时分组
|
return generate_segment_id(agv_id=agv_id, target_position=time_target, action_type="4")
|
|
except Exception as e:
|
self.logger.error(f"生成segId失败: {e}")
|
# 降级方案:使用common.utils的函数生成备用ID
|
import time
|
fallback_target = f"FALLBACK_{int(time.time())}"
|
return generate_segment_id(agv_id=agv_id, target_position=fallback_target, action_type="1")
|
|
def _fetch_rcs_status(self) -> tuple[List[AGVStatus], Dict]:
|
"""从RCS获取当前状态"""
|
agv_status_list = []
|
task_status = {}
|
|
try:
|
# 获取AGV状态 - 使用空参数获取所有AGV状态和任务状态
|
self.logger.info(" 轮询目标: 使用空参数(agvId=None, mapId=None)获取RCS所有AGV状态和任务状态")
|
agv_response = self.rcs_client.get_agv_status(agv_id=None, map_id=None)
|
|
if agv_response.code == 200 and agv_response.data:
|
self.logger.info(f"成功获取AGV状态 - 数量: {len(agv_response.data)}")
|
|
# 统计信息
|
total_tasks = 0
|
executing_tasks = 0
|
loaded_tasks = 0
|
|
self.logger.info("=" * 80)
|
self.logger.info("[详细AGV状态信息]")
|
self.logger.info("=" * 80)
|
|
for i, agv_data in enumerate(agv_response.data, 1):
|
try:
|
# 转换为AGVStatus对象
|
agv_status = from_dict(AGVStatus, agv_data)
|
agv_status_list.append(agv_status)
|
|
# 打印AGV基本信息
|
self.logger.info(f"[AGV #{i}] {agv_status.agvId}")
|
self.logger.info(f" 状态: {agv_status.status} | 位置: {agv_status.position} | 方向: {agv_status.direction}")
|
self.logger.info(f" 电压: {agv_status.vol} | 错误码: {agv_status.error} | 空背篓: {agv_status.empty}")
|
|
# 分析背篓信息
|
if agv_status.backpack:
|
self.logger.info(f" [背篓信息] ({len(agv_status.backpack)} 个):")
|
for backpack_item in agv_status.backpack:
|
status_text = "执行中" if backpack_item.execute else ("已装载" if backpack_item.loaded else "空闲")
|
task_info = f"任务: {backpack_item.taskId}" if backpack_item.taskId else "无任务"
|
|
self.logger.info(f" [位置{backpack_item.index}] {task_info} | "
|
f"状态: {status_text} | 已装载: {backpack_item.loaded} | 执行中: {backpack_item.execute}")
|
|
# 统计任务
|
if backpack_item.taskId:
|
total_tasks += 1
|
if backpack_item.execute:
|
executing_tasks += 1
|
if backpack_item.loaded:
|
loaded_tasks += 1
|
|
# 提取任务状态信息
|
task_status[backpack_item.taskId] = {
|
'agvId': agv_status.agvId,
|
'loaded': backpack_item.loaded,
|
'execute': backpack_item.execute,
|
'index': backpack_item.index
|
}
|
else:
|
self.logger.info(f" [背篓信息] 无背篓信息")
|
|
self.logger.info("-" * 60)
|
except Exception as e:
|
self.logger.warning(f"解析AGV状态数据失败: {agv_data} - {e}")
|
|
# 打印统计摘要
|
self.logger.info("[任务统计摘要]")
|
self.logger.info(f" 总AGV数量: {len(agv_status_list)}")
|
self.logger.info(f" 总任务数量: {total_tasks}")
|
self.logger.info(f" 执行中任务: {executing_tasks}")
|
self.logger.info(f" 已装载任务: {loaded_tasks}")
|
self.logger.info(f" 未装载任务: {total_tasks - loaded_tasks}")
|
self.logger.info("=" * 80)
|
|
else:
|
self.logger.warning(f"获取AGV状态失败: {agv_response.msg if agv_response else '无响应'}")
|
|
except Exception as e:
|
self.logger.error(f"从RCS获取状态失败: {e}")
|
|
return agv_status_list, task_status
|
|
|
|
def _generate_collision_free_paths(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
|
"""为所有AGV生成免碰撞路径"""
|
try:
|
self.logger.debug(f"开始为 {len(agv_status_list)} 个AGV生成免碰撞路径")
|
|
# 使用批量路径规划器生成路径
|
result = self.path_planner.plan_all_agv_paths(
|
agv_status_list=agv_status_list,
|
include_idle_agv=False, # 只为执行任务的AGV生成路径
|
constraints=None
|
)
|
|
planned_paths = result.get('plannedPaths', result.get('planned_paths', []))
|
|
# 诊断:显示路径规划结果的所有字段
|
self.logger.debug(f"路径规划器返回的字段: {list(result.keys())}")
|
self.logger.debug(f"planned_paths字段存在: {'planned_paths' in result}")
|
self.logger.debug(f"plannedPaths字段存在: {'plannedPaths' in result}")
|
self.logger.debug(f"最终获取到的路径数量: {len(planned_paths)}")
|
|
if planned_paths:
|
self.logger.info(f"成功生成 {len(planned_paths)} 条免碰撞路径")
|
|
# 记录每个AGV的路径信息
|
for path_info in planned_paths:
|
agv_id = path_info.get('agvId', 'unknown')
|
code_list = path_info.get('codeList', [])
|
self.logger.debug(f"AGV {agv_id} 路径长度: {len(code_list)}")
|
else:
|
self.logger.info("未生成任何路径(可能所有AGV都处于空闲状态)")
|
|
return planned_paths
|
|
except Exception as e:
|
self.logger.error(f"生成免碰撞路径失败: {e}")
|
return []
|
|
|
|
def _send_paths_to_rcs(self, generated_paths: List[Dict]):
|
"""将生成的路径批量发送到RCS系统"""
|
try:
|
self.logger.info(f"开始批量发送 {len(generated_paths)} 条路径到RCS系统")
|
|
# 构建批量路径数据
|
agv_paths = []
|
import uuid
|
import time as time_module
|
|
for path_info in generated_paths:
|
agv_id = path_info.get('agvId')
|
code_list = path_info.get('codeList', [])
|
|
if agv_id and code_list:
|
path_task_id = ''
|
for path_code in code_list:
|
if path_code.get('taskId'):
|
path_task_id = path_code.get('taskId')
|
break
|
|
navigation_data = []
|
for path_code in code_list:
|
nav_item = {
|
'code': path_code.get('code', ''),
|
'direction': path_code.get('direction', '90'),
|
'type': path_code.get('type')
|
}
|
|
if path_code.get('taskId'):
|
nav_item['taskId'] = path_code.get('taskId')
|
|
if path_code.get('posType'):
|
nav_item['posType'] = path_code.get('posType')
|
|
if path_code.get('lev') is not None:
|
nav_item['lev'] = path_code.get('lev')
|
|
navigation_data.append(nav_item)
|
|
auto_seg_id = self._generate_unique_seg_id(agv_id, path_task_id)
|
|
agv_path_data = {
|
'agvId': agv_id,
|
'segId': auto_seg_id,
|
'codeList': navigation_data
|
}
|
|
agv_paths.append(agv_path_data)
|
|
if not agv_paths:
|
self.logger.warning("没有有效的路径数据需要发送")
|
return
|
|
try:
|
import requests
|
import json
|
|
rcs_url = f"http://{self.rcs_host}:{self.rcs_port}/api/open/algorithm/zkd/navigation/v1"
|
|
self.logger.info(f"算法系统发送路径规划结果到RCS:")
|
self.logger.info(f" AGV数量: {len(agv_paths)}")
|
self.logger.info(f" 目标URL: {rcs_url}")
|
|
successful_sends = 0
|
|
for agv_path in agv_paths:
|
agv_id = agv_path['agvId']
|
seg_id = agv_path['segId']
|
code_list = agv_path['codeList']
|
|
# 构造标准导航请求数据(agvId, segId, codeList格式)
|
navigation_data = {
|
'agvId': agv_id,
|
'segId': seg_id,
|
'codeList': code_list
|
}
|
|
# 显示路径摘要
|
start_code = code_list[0].get('code', '') if code_list else ''
|
end_code = code_list[-1].get('code', '') if code_list else ''
|
task_codes = [nc for nc in code_list if nc.get('taskId')]
|
task_id = task_codes[0].get('taskId', '') if task_codes else ''
|
|
self.logger.info(f" AGV: {agv_id}, 任务: {task_id}, 路径: {start_code} -> {end_code} ({len(code_list)}点)")
|
|
# 详细显示路径JSON数据
|
self.logger.info(f"发送路径JSON数据:")
|
self.logger.info(json.dumps(navigation_data, ensure_ascii=False, indent=2))
|
|
try:
|
response = requests.post(
|
rcs_url,
|
json=navigation_data,
|
timeout=10,
|
headers={'Content-Type': 'application/json'}
|
)
|
|
if response.status_code == 200:
|
response_data = response.json()
|
self.logger.info(f" 成功发送路径到RCS - AGV: {agv_id}, 响应: {response_data.get('msg', 'OK')}")
|
successful_sends += 1
|
else:
|
self.logger.warning(f" 发送路径到RCS失败 - AGV: {agv_id}, HTTP状态: {response.status_code}")
|
self.logger.warning(f" 响应内容: {response.text}")
|
|
except Exception as e:
|
self.logger.warning(f" 发送导航指令异常 - AGV: {agv_id}, 错误: {e}")
|
|
self.logger.info(f" 路径发送完成 - 成功: {successful_sends}/{len(agv_paths)}")
|
|
except Exception as e:
|
self.logger.error(f" 发送导航指令异常: {e}")
|
|
except Exception as e:
|
self.logger.error(f"发送路径到RCS异常: {e}")
|
|
def get_monitoring_status(self) -> Dict[str, Any]:
|
"""获取监控服务状态"""
|
return {
|
'is_running': self.is_running,
|
'rcs_host': self.rcs_host,
|
'rcs_port': self.rcs_port,
|
'poll_interval': self.poll_interval,
|
'path_algorithm': self.path_algorithm,
|
'auto_send_paths': self.auto_send_paths,
|
'stats': self.stats.copy()
|
}
|
|
def reset_stats(self):
|
"""重置统计信息"""
|
self.stats = {
|
'poll_count': 0,
|
'successful_polls': 0,
|
'path_generations': 0,
|
'last_poll_time': None,
|
'last_generation_time': None
|
}
|
self.logger.info("监控统计信息已重置")
|