"""
|
算法系统服务器
|
"""
|
import json
|
import logging
|
import os
|
import time
|
from typing import Dict, List, Optional, Any, Tuple
|
from flask import Flask, request, jsonify
|
|
from common.data_models import (
|
TaskData, AGVStatus, TaskAssignment, PlannedPath,
|
create_success_response, create_error_response, ResponseCode, to_dict, from_dict
|
)
|
from common.utils import load_path_mapping
|
from algorithm_system.models.agv_model import AGVModelManager
|
from algorithm_system.algorithms.task_allocation import TaskAllocationFactory
|
from algorithm_system.algorithms.path_planning import PathPlanningFactory
|
from common.api_client import RCSAPIClient
|
from algorithm_system.path_monitor import PathMonitorService
|
|
try:
|
from config.settings import (
|
ALGORITHM_SERVER_HOST, ALGORITHM_SERVER_PORT,
|
RCS_SERVER_HOST, RCS_SERVER_PORT,
|
MONITOR_POLLING_INTERVAL
|
)
|
except ImportError:
|
ALGORITHM_SERVER_HOST = "10.10.10.239"
|
ALGORITHM_SERVER_PORT = 8002
|
RCS_SERVER_HOST = "10.10.10.156"
|
RCS_SERVER_PORT = 8088
|
MONITOR_POLLING_INTERVAL = 5.0
|
logging.warning("无法从config.settings导入配置,使用默认值")
|
|
|
class AlgorithmServer:
|
"""算法系统服务器"""
|
|
def __init__(self, host: str = None, port: int = None, enable_path_monitor: bool = True,
|
monitor_interval: float = None):
|
"""初始化算法服务器"""
|
self.host = host or ALGORITHM_SERVER_HOST
|
self.port = port or ALGORITHM_SERVER_PORT
|
self.enable_path_monitor = enable_path_monitor
|
self.monitor_interval = monitor_interval or MONITOR_POLLING_INTERVAL
|
self.logger = logging.getLogger(__name__)
|
|
self.app = Flask(__name__)
|
|
self.path_mapping = load_path_mapping()
|
|
self.agv_manager = AGVModelManager(self.path_mapping)
|
|
self.rcs_client = RCSAPIClient()
|
|
self.task_allocation_algorithm = "LOAD_BALANCED" # 默认任务分配
|
self.path_planning_algorithm = "A_STAR" # 默认路径规划
|
|
self.path_monitor = None
|
if self.enable_path_monitor:
|
self.path_monitor = PathMonitorService(
|
rcs_host=RCS_SERVER_HOST,
|
rcs_port=RCS_SERVER_PORT,
|
poll_interval=self.monitor_interval,
|
path_algorithm=self.path_planning_algorithm,
|
auto_send_paths=True # 自动发送路径到RCS
|
)
|
self.logger.info(f"路径监控服务已初始化 - 轮询间隔: {self.monitor_interval}s, 自动发送路径: True")
|
|
self._setup_routes()
|
|
self.logger.info("算法系统服务器初始化完成")
|
|
def _setup_routes(self):
|
"""设置API路由"""
|
|
@self.app.route('/open/task/send/v1', methods=['POST'])
|
def task_send_endpoint():
|
"""任务下发接口"""
|
return self._handle_task_send_request()
|
|
@self.app.route('/open/path/plan/v1', methods=['POST'])
|
def path_plan_endpoint():
|
"""路径规划接口"""
|
return self._handle_path_planning_request()
|
|
@self.app.route('/open/path/batch/plan/v1', methods=['POST'])
|
def batch_path_plan_endpoint():
|
"""批量路径规划接口"""
|
return self._handle_batch_path_planning_request()
|
|
@self.app.route('/open/algorithm/config/v1', methods=['POST'])
|
def algorithm_config_endpoint():
|
"""算法配置接口"""
|
return self._handle_algorithm_config_request()
|
|
@self.app.route('/monitor/path/start/v1', methods=['POST'])
|
def start_path_monitor_endpoint():
|
"""启动路径监控服务接口"""
|
return self._handle_start_path_monitor_request()
|
|
@self.app.route('/monitor/path/stop/v1', methods=['POST'])
|
def stop_path_monitor_endpoint():
|
"""停止路径监控服务接口"""
|
return self._handle_stop_path_monitor_request()
|
|
@self.app.route('/monitor/path/status/v1', methods=['GET'])
|
def path_monitor_status_endpoint():
|
"""路径监控服务状态查询接口"""
|
return self._handle_path_monitor_status_request()
|
|
@self.app.route('/health', methods=['GET'])
|
def health_check():
|
"""健康检查接口"""
|
monitor_status = None
|
if self.path_monitor:
|
monitor_status = self.path_monitor.get_monitoring_status()
|
|
return jsonify({
|
"status": "healthy",
|
"service": "algorithm_system",
|
"timestamp": time.time(),
|
"path_mapping_loaded": len(self.path_mapping) > 0,
|
"algorithms": {
|
"task_allocation": self.task_allocation_algorithm,
|
"path_planning": self.path_planning_algorithm
|
},
|
"agv_count": len(self.agv_manager.get_all_agvs()),
|
"path_monitor": monitor_status,
|
"available_endpoints": [
|
"POST /open/task/send/v1 - 任务分配接口",
|
"POST /open/path/plan/v1 - 单路径规划接口",
|
"POST /open/path/batch/plan/v1 - 批量路径规划接口",
|
"POST /open/algorithm/config/v1 - 算法配置接口",
|
"POST /monitor/path/start/v1 - 启动路径监控服务",
|
"POST /monitor/path/stop/v1 - 停止路径监控服务",
|
"GET /monitor/path/status/v1 - 路径监控服务状态查询",
|
"GET /health - 健康检查接口"
|
]
|
})
|
|
@self.app.errorhandler(404)
|
def not_found(error):
|
"""404错误处理"""
|
return jsonify(to_dict(create_error_response(404, "接口不存在"))), 404
|
|
@self.app.errorhandler(500)
|
def internal_error(error):
|
"""500错误处理"""
|
self.logger.error(f"内部服务器错误: {error}")
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "内部服务器错误"))), 500
|
|
def _handle_task_send_request(self) -> Dict[str, Any]:
|
"""处理任务下发请求"""
|
try:
|
if not request.is_json:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误")))
|
|
request_data = request.get_json()
|
if not request_data:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据为空")))
|
|
if isinstance(request_data, dict) and "tasks" in request_data:
|
task_data = request_data.get("tasks", [])
|
agv_status_data = request_data.get("agvStatus", [])
|
self.logger.info(f"收到结构化任务分配请求,任务数量: {len(task_data)}, AGV数量: {len(agv_status_data)}")
|
elif isinstance(request_data, list):
|
task_data = request_data
|
agv_status_data = []
|
self.logger.info(f"收到任务分配请求,任务数量: {len(task_data)}")
|
else:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "任务数据格式无效")))
|
|
if not task_data or not isinstance(task_data, list):
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "任务数据无效")))
|
|
tasks = []
|
for task_dict in task_data:
|
try:
|
task = TaskData(
|
taskId=task_dict.get("taskId", ""),
|
start=task_dict.get("start", ""),
|
end=task_dict.get("end", ""),
|
type=task_dict.get("type", "1"),
|
priority=task_dict.get("priority", 5)
|
)
|
tasks.append(task)
|
except Exception as e:
|
self.logger.warning(f"解析任务数据失败: {task_dict} - {e}")
|
continue
|
|
if not tasks:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "没有有效的任务数据")))
|
|
agv_status_list = []
|
if agv_status_data:
|
self.logger.info("使用请求中提供的AGV状态数据")
|
for agv_data in agv_status_data:
|
try:
|
agv_status = from_dict(AGVStatus, agv_data)
|
agv_status_list.append(agv_status)
|
except Exception as e:
|
self.logger.warning(f"解析请求中的AGV状态数据失败: {agv_data} - {e}")
|
continue
|
else:
|
self.logger.info("从RCS系统获取AGV状态数据")
|
agv_response = self.rcs_client.get_agv_status()
|
if agv_response.code == ResponseCode.SUCCESS and agv_response.data:
|
for agv_data in agv_response.data:
|
try:
|
agv_status = from_dict(AGVStatus, agv_data)
|
agv_status_list.append(agv_status)
|
except Exception as e:
|
self.logger.warning(f"解析RCS返回的AGV状态数据失败: {agv_data} - {e}")
|
continue
|
else:
|
error_msg = f"无法获取AGV状态: {agv_response.msg if agv_response else 'RCS系统连接失败'}"
|
self.logger.error(error_msg)
|
|
use_fallback_allocation = getattr(self, 'use_fallback_allocation', True)
|
|
if use_fallback_allocation:
|
self.logger.warning("使用轮询分配作为备用方案")
|
return self._simple_round_robin_allocation(tasks)
|
else:
|
return jsonify(to_dict(create_error_response(
|
ResponseCode.SERVER_ERROR,
|
error_msg
|
)))
|
|
if not agv_status_list:
|
return jsonify(to_dict(create_error_response(ResponseCode.NO_DATA, "没有可用的AGV状态数据")))
|
|
self.agv_manager.update_agv_data(agv_status_list)
|
self.logger.info(f"更新了 {len(agv_status_list)} 个AGV状态")
|
|
all_agvs = self.agv_manager.get_all_agvs()
|
self.logger.info(f"算法系统中当前有 {len(all_agvs)} 个AGV模型")
|
|
available_count = 0
|
for agv in all_agvs:
|
can_accept = agv.can_accept_task(5)
|
self.logger.debug(f"AGV {agv.agvId}: status={agv.status}, can_accept_task={can_accept}, "
|
f"is_overloaded={agv.is_overloaded()}, task_count={agv.current_task_count}")
|
if can_accept:
|
available_count += 1
|
|
self.logger.info(f"其中 {available_count} 个AGV可以接受任务")
|
|
if available_count == 0:
|
self.logger.warning("没有AGV可以接受任务!详细状态:")
|
for agv in all_agvs:
|
status_str = str(agv.status)
|
self.logger.warning(f" AGV {agv.agvId}: 状态={status_str}, 类型={type(agv.status)}, "
|
f"任务数={agv.current_task_count}/{agv.max_capacity}")
|
|
allocator = TaskAllocationFactory.create_allocator(
|
self.task_allocation_algorithm,
|
self.agv_manager
|
)
|
|
start_time = time.time()
|
assignments = allocator.allocate_tasks(tasks)
|
end_time = time.time()
|
|
allocation_time = (end_time - start_time) * 1000 # 转换为毫秒
|
|
assignment_data = [to_dict(assignment) for assignment in assignments]
|
|
self.logger.info(f"任务分配完成,分配了 {len(assignments)} 个任务,耗时: {allocation_time:.2f}ms")
|
|
response = create_success_response(assignment_data)
|
return jsonify(to_dict(response))
|
|
except Exception as e:
|
self.logger.error(f"处理任务分配请求失败: {e}", exc_info=True)
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
|
|
def _handle_path_planning_request(self) -> Dict[str, Any]:
|
"""处理路径规划请求"""
|
try:
|
# 验证请求数据
|
if not request.is_json:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误")))
|
|
request_data = request.get_json()
|
if not request_data:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据为空")))
|
|
agv_id = request_data.get("agvId")
|
start_code = request_data.get("start")
|
end_code = request_data.get("end")
|
constraints = request_data.get("constraints", [])
|
|
if not agv_id or not start_code or not end_code:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "缺少必要参数")))
|
|
self.logger.info(f"收到路径规划请求 - AGV: {agv_id}, 从 {start_code} 到 {end_code}")
|
|
# 创建路径规划器
|
path_planner = PathPlanningFactory.create_path_planner(
|
self.path_planning_algorithm,
|
self.path_mapping
|
)
|
|
# 执行路径规划
|
start_time = time.time()
|
planned_path = path_planner.plan_path(start_code, end_code, constraints)
|
end_time = time.time()
|
|
planning_time = (end_time - start_time) * 1000 # 转换为毫秒
|
|
if planned_path:
|
# 设置AGV ID
|
planned_path.agvId = agv_id
|
|
# 转换为响应格式
|
path_data = to_dict(planned_path)
|
|
self.logger.info(f"路径规划完成 - AGV: {agv_id}, 路径长度: {len(planned_path.codeList)}, 耗时: {planning_time:.2f}ms")
|
|
response = create_success_response(path_data)
|
return jsonify(to_dict(response))
|
else:
|
self.logger.warning(f"路径规划失败 - AGV: {agv_id}, 从 {start_code} 到 {end_code}")
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "无法规划路径")))
|
|
except Exception as e:
|
self.logger.error(f"处理路径规划请求失败: {e}", exc_info=True)
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
|
|
def _handle_batch_path_planning_request(self) -> Dict[str, Any]:
|
"""处理批量路径规划请求"""
|
try:
|
if not request.is_json:
|
request_data = {}
|
else:
|
request_data = request.get_json() or {}
|
|
constraints = request_data.get("constraints", [])
|
include_idle_agv = request_data.get("includeIdleAgv", False)
|
use_enhanced_planning = request_data.get("useEnhancedPlanning", True)
|
|
agv_status_from_request = (
|
request_data.get("agvStatusList", []) or
|
request_data.get("agvs", [])
|
)
|
|
self.logger.info(f"收到批量路径规划请求,包含空闲AGV: {include_idle_agv}, "
|
f"请求中AGV数量: {len(agv_status_from_request)}, "
|
f"使用增强规划: {use_enhanced_planning}")
|
|
agv_status_list = []
|
if agv_status_from_request:
|
self.logger.info("使用请求中提供的AGV状态数据进行路径规划")
|
for agv_data in agv_status_from_request:
|
try:
|
agv_status = from_dict(AGVStatus, agv_data)
|
agv_status_list.append(agv_status)
|
except Exception as e:
|
self.logger.warning(f"解析请求中的AGV状态数据失败: {agv_data} - {e}")
|
continue
|
else:
|
self.logger.info("从RCS系统获取AGV状态数据进行路径规划")
|
agv_response = self.rcs_client.get_agv_status()
|
if agv_response.code != ResponseCode.SUCCESS or not agv_response.data:
|
error_msg = f"无法获取AGV状态进行路径规划: {agv_response.msg if agv_response else 'RCS系统连接失败'}"
|
self.logger.error(error_msg)
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, error_msg)))
|
|
for agv_data in agv_response.data:
|
try:
|
agv_status = from_dict(AGVStatus, agv_data)
|
agv_status_list.append(agv_status)
|
except Exception as e:
|
self.logger.warning(f"解析RCS返回的AGV状态数据失败: {agv_data} - {e}")
|
continue
|
|
if not agv_status_list:
|
error_msg = "没有可用的AGV状态数据进行路径规划"
|
self.logger.error(error_msg)
|
return jsonify(to_dict(create_error_response(ResponseCode.NO_DATA, error_msg)))
|
|
# 更新AGV模型管理器
|
self.agv_manager.update_agv_data(agv_status_list)
|
|
self.logger.info(f"强制使用路径规划以确保包含新字段")
|
result = self._enhanced_batch_path_planning(agv_status_list, include_idle_agv, constraints)
|
|
response = create_success_response(result)
|
return jsonify(to_dict(response))
|
|
except Exception as e:
|
self.logger.error(f"处理批量路径规划请求失败: {e}", exc_info=True)
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
|
|
def _enhanced_batch_path_planning(self, agv_status_list: List[AGVStatus],
|
include_idle_agv: bool,
|
constraints: List[Tuple[int, int, float]]) -> Dict[str, Any]:
|
"""批量路径规划"""
|
from algorithm_system.algorithms.path_planning import PathPlanningFactory
|
|
# 创建批量路径规划器
|
batch_planner = PathPlanningFactory.create_batch_path_planner(
|
algorithm_type=self.path_planning_algorithm,
|
path_mapping=self.path_mapping
|
)
|
|
# 执行批量路径规划
|
result = batch_planner.plan_all_agv_paths(
|
agv_status_list=agv_status_list,
|
include_idle_agv=include_idle_agv,
|
constraints=constraints
|
)
|
|
# 提取plannedPaths作为最终返回数据
|
planned_paths = result.get('plannedPaths', [])
|
|
self.logger.info(f"批量路径规划完成 - 总AGV: {result['totalAgvs']}, "
|
f"执行任务: {result['executingTasksCount']}, "
|
f"规划路径: {result['plannedPathsCount']}, "
|
f"冲突检测: {result['conflictsDetected']}, "
|
f"总耗时: {result['totalPlanningTime']:.2f}ms")
|
|
# 只返回路径数组,不包含其他统计信息
|
return planned_paths
|
|
def _handle_algorithm_config_request(self) -> Dict[str, Any]:
|
"""
|
处理算法配置请求
|
|
Returns:
|
Dict: 响应数据
|
"""
|
try:
|
# 验证请求数据
|
if not request.is_json:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误")))
|
|
config_data = request.get_json()
|
if not config_data:
|
return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "配置数据为空")))
|
|
# 更新算法配置
|
if "task_allocation_algorithm" in config_data:
|
self.task_allocation_algorithm = config_data["task_allocation_algorithm"]
|
self.logger.info(f"任务分配算法更新为: {self.task_allocation_algorithm}")
|
|
if "path_planning_algorithm" in config_data:
|
self.path_planning_algorithm = config_data["path_planning_algorithm"]
|
self.logger.info(f"路径规划算法更新为: {self.path_planning_algorithm}")
|
|
# 返回当前配置
|
current_config = {
|
"task_allocation_algorithm": self.task_allocation_algorithm,
|
"path_planning_algorithm": self.path_planning_algorithm
|
}
|
|
response = create_success_response(current_config)
|
return jsonify(to_dict(response))
|
|
except Exception as e:
|
self.logger.error(f"处理算法配置请求失败: {e}", exc_info=True)
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
|
|
def _simple_round_robin_allocation(self, tasks: List[TaskData]) -> Dict[str, Any]:
|
"""简单的轮询分配"""
|
assignments = []
|
|
# 尝试从RCS系统获取AGV数量
|
agv_count = 5 # 默认值
|
try:
|
agv_response = self.rcs_client.get_agv_status()
|
if agv_response.code == ResponseCode.SUCCESS and agv_response.data:
|
agv_count = len(agv_response.data)
|
self.logger.info(f"从RCS系统获取到AGV数量: {agv_count}")
|
else:
|
# 如果RCS返回无数据,使用配置的默认值
|
from config.settings import DEFAULT_AGV_COUNT
|
agv_count = DEFAULT_AGV_COUNT
|
self.logger.warning(f"无法从RCS获取AGV数量,使用默认值: {agv_count}")
|
except Exception as e:
|
# 如果完全无法连接RCS,使用配置的默认值
|
from config.settings import DEFAULT_AGV_COUNT
|
agv_count = DEFAULT_AGV_COUNT
|
self.logger.error(f"获取AGV数量失败,使用默认值: {agv_count} - {e}")
|
|
for i, task in enumerate(tasks):
|
agv_id = f"AGV_{10001 + (i % agv_count)}"
|
assignment = TaskAssignment(
|
taskId=task.taskId,
|
agvId=agv_id
|
)
|
assignments.append(assignment)
|
|
assignment_data = [to_dict(assignment) for assignment in assignments]
|
|
self.logger.info(f"使用简单轮询分配了 {len(assignments)} 个任务,AGV数量: {agv_count}")
|
|
response = create_success_response(assignment_data)
|
return jsonify(to_dict(response))
|
|
def _handle_start_path_monitor_request(self) -> Dict[str, Any]:
|
"""处理启动路径监控服务请求"""
|
try:
|
if not self.path_monitor:
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化")))
|
|
if self.path_monitor.is_running:
|
return jsonify(to_dict(create_success_response({"message": "路径监控服务已在运行中"})))
|
|
# 启动路径监控服务
|
self.path_monitor.start_monitoring()
|
|
self.logger.info("路径监控服务已通过API启动")
|
|
response = create_success_response({
|
"message": "路径监控服务启动成功",
|
"monitor_status": self.path_monitor.get_monitoring_status()
|
})
|
|
return jsonify(to_dict(response))
|
|
except Exception as e:
|
self.logger.error(f"启动路径监控服务失败: {e}")
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"启动路径监控服务失败: {str(e)}")))
|
|
def _handle_stop_path_monitor_request(self) -> Dict[str, Any]:
|
"""处理停止路径监控服务请求"""
|
try:
|
if not self.path_monitor:
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化")))
|
|
if not self.path_monitor.is_running:
|
return jsonify(to_dict(create_success_response({"message": "路径监控服务未在运行"})))
|
|
# 停止路径监控服务
|
self.path_monitor.stop_monitoring()
|
|
self.logger.info("路径监控服务已通过API停止")
|
|
response = create_success_response({
|
"message": "路径监控服务停止成功",
|
"monitor_status": self.path_monitor.get_monitoring_status()
|
})
|
|
return jsonify(to_dict(response))
|
|
except Exception as e:
|
self.logger.error(f"停止路径监控服务失败: {e}")
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"停止路径监控服务失败: {str(e)}")))
|
|
def _handle_path_monitor_status_request(self) -> Dict[str, Any]:
|
"""处理路径监控服务状态查询请求"""
|
try:
|
if not self.path_monitor:
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化")))
|
|
monitor_status = self.path_monitor.get_monitoring_status()
|
|
response = create_success_response({
|
"monitor_status": monitor_status,
|
"description": "路径监控服务状态信息"
|
})
|
|
return jsonify(to_dict(response))
|
|
except Exception as e:
|
self.logger.error(f"获取路径监控服务状态失败: {e}")
|
return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"获取路径监控服务状态失败: {str(e)}")))
|
|
def start_server(self):
|
"""启动算法服务器"""
|
self.logger.info(f"启动算法系统服务器: http://{self.host}:{self.port}")
|
|
try:
|
# 如果启用了路径监控服务,自动启动
|
if self.path_monitor and self.enable_path_monitor:
|
self.path_monitor.start_monitoring()
|
self.logger.info("路径监控服务已自动启动")
|
|
self.app.run(
|
host=self.host,
|
port=self.port,
|
debug=False,
|
threaded=True
|
)
|
except Exception as e:
|
self.logger.error(f"启动算法服务器失败: {e}")
|
# 确保在启动失败时停止路径监控服务
|
if self.path_monitor and self.path_monitor.is_running:
|
self.path_monitor.stop_monitoring()
|
raise
|
|
def stop_server(self):
|
"""停止算法服务器"""
|
self.logger.info("停止算法服务器")
|
|
# 停止路径监控服务
|
if self.path_monitor and self.path_monitor.is_running:
|
self.path_monitor.stop_monitoring()
|
self.logger.info("路径监控服务已停止")
|
|
|
def get_server_status(self) -> Dict[str, Any]:
|
"""获取服务器状态"""
|
monitor_status = None
|
if self.path_monitor:
|
monitor_status = self.path_monitor.get_monitoring_status()
|
|
return {
|
"host": self.host,
|
"port": self.port,
|
"path_mapping_loaded": len(self.path_mapping) > 0,
|
"agv_count": len(self.agv_manager.get_all_agvs()),
|
"algorithms": {
|
"task_allocation": self.task_allocation_algorithm,
|
"path_planning": self.path_planning_algorithm
|
},
|
"path_monitor": monitor_status,
|
"timestamp": time.time()
|
}
|