""" 算法系统服务器 """ import json import logging import os import time from typing import Dict, List, Optional, Any, Tuple from flask import Flask, request, jsonify from common.data_models import ( TaskData, AGVStatus, TaskAssignment, PlannedPath, create_success_response, create_error_response, ResponseCode, to_dict, from_dict ) from common.utils import load_path_mapping from algorithm_system.models.agv_model import AGVModelManager from algorithm_system.algorithms.task_allocation import TaskAllocationFactory from algorithm_system.algorithms.path_planning import PathPlanningFactory from common.api_client import RCSAPIClient from algorithm_system.path_monitor import PathMonitorService try: from config.settings import ( ALGORITHM_SERVER_HOST, ALGORITHM_SERVER_PORT, RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL ) except ImportError: ALGORITHM_SERVER_HOST = "10.10.10.239" ALGORITHM_SERVER_PORT = 8002 RCS_SERVER_HOST = "10.10.10.156" RCS_SERVER_PORT = 8088 MONITOR_POLLING_INTERVAL = 5.0 logging.warning("无法从config.settings导入配置,使用默认值") class AlgorithmServer: """算法系统服务器""" def __init__(self, host: str = None, port: int = None, enable_path_monitor: bool = True, monitor_interval: float = None): """初始化算法服务器""" self.host = host or ALGORITHM_SERVER_HOST self.port = port or ALGORITHM_SERVER_PORT self.enable_path_monitor = enable_path_monitor self.monitor_interval = monitor_interval or MONITOR_POLLING_INTERVAL self.logger = logging.getLogger(__name__) self.app = Flask(__name__) self.path_mapping = load_path_mapping() self.agv_manager = AGVModelManager(self.path_mapping) self.rcs_client = RCSAPIClient() self.task_allocation_algorithm = "LOAD_BALANCED" # 默认任务分配 self.path_planning_algorithm = "A_STAR" # 默认路径规划 self.path_monitor = None if self.enable_path_monitor: self.path_monitor = PathMonitorService( rcs_host=RCS_SERVER_HOST, rcs_port=RCS_SERVER_PORT, poll_interval=self.monitor_interval, path_algorithm=self.path_planning_algorithm, auto_send_paths=True # 自动发送路径到RCS ) self.logger.info(f"路径监控服务已初始化 - 轮询间隔: {self.monitor_interval}s, 自动发送路径: True") self._setup_routes() self.logger.info("算法系统服务器初始化完成") def _setup_routes(self): """设置API路由""" @self.app.route('/open/task/send/v1', methods=['POST']) def task_send_endpoint(): """任务下发接口""" return self._handle_task_send_request() @self.app.route('/open/path/plan/v1', methods=['POST']) def path_plan_endpoint(): """路径规划接口""" return self._handle_path_planning_request() @self.app.route('/open/path/batch/plan/v1', methods=['POST']) def batch_path_plan_endpoint(): """批量路径规划接口""" return self._handle_batch_path_planning_request() @self.app.route('/open/algorithm/config/v1', methods=['POST']) def algorithm_config_endpoint(): """算法配置接口""" return self._handle_algorithm_config_request() @self.app.route('/monitor/path/start/v1', methods=['POST']) def start_path_monitor_endpoint(): """启动路径监控服务接口""" return self._handle_start_path_monitor_request() @self.app.route('/monitor/path/stop/v1', methods=['POST']) def stop_path_monitor_endpoint(): """停止路径监控服务接口""" return self._handle_stop_path_monitor_request() @self.app.route('/monitor/path/status/v1', methods=['GET']) def path_monitor_status_endpoint(): """路径监控服务状态查询接口""" return self._handle_path_monitor_status_request() @self.app.route('/health', methods=['GET']) def health_check(): """健康检查接口""" monitor_status = None if self.path_monitor: monitor_status = self.path_monitor.get_monitoring_status() return jsonify({ "status": "healthy", "service": "algorithm_system", "timestamp": time.time(), "path_mapping_loaded": len(self.path_mapping) > 0, "algorithms": { "task_allocation": self.task_allocation_algorithm, "path_planning": self.path_planning_algorithm }, "agv_count": len(self.agv_manager.get_all_agvs()), "path_monitor": monitor_status, "available_endpoints": [ "POST /open/task/send/v1 - 任务分配接口", "POST /open/path/plan/v1 - 单路径规划接口", "POST /open/path/batch/plan/v1 - 批量路径规划接口", "POST /open/algorithm/config/v1 - 算法配置接口", "POST /monitor/path/start/v1 - 启动路径监控服务", "POST /monitor/path/stop/v1 - 停止路径监控服务", "GET /monitor/path/status/v1 - 路径监控服务状态查询", "GET /health - 健康检查接口" ] }) @self.app.errorhandler(404) def not_found(error): """404错误处理""" return jsonify(to_dict(create_error_response(404, "接口不存在"))), 404 @self.app.errorhandler(500) def internal_error(error): """500错误处理""" self.logger.error(f"内部服务器错误: {error}") return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "内部服务器错误"))), 500 def _handle_task_send_request(self) -> Dict[str, Any]: """处理任务下发请求""" try: if not request.is_json: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误"))) request_data = request.get_json() if not request_data: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据为空"))) if isinstance(request_data, dict) and "tasks" in request_data: task_data = request_data.get("tasks", []) agv_status_data = request_data.get("agvStatus", []) self.logger.info(f"收到结构化任务分配请求,任务数量: {len(task_data)}, AGV数量: {len(agv_status_data)}") elif isinstance(request_data, list): task_data = request_data agv_status_data = [] self.logger.info(f"收到任务分配请求,任务数量: {len(task_data)}") else: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "任务数据格式无效"))) if not task_data or not isinstance(task_data, list): return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "任务数据无效"))) tasks = [] for task_dict in task_data: try: task = TaskData( taskId=task_dict.get("taskId", ""), start=task_dict.get("start", ""), end=task_dict.get("end", ""), type=task_dict.get("type", "1"), priority=task_dict.get("priority", 5) ) tasks.append(task) except Exception as e: self.logger.warning(f"解析任务数据失败: {task_dict} - {e}") continue if not tasks: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "没有有效的任务数据"))) agv_status_list = [] if agv_status_data: self.logger.info("使用请求中提供的AGV状态数据") for agv_data in agv_status_data: try: agv_status = from_dict(AGVStatus, agv_data) agv_status_list.append(agv_status) except Exception as e: self.logger.warning(f"解析请求中的AGV状态数据失败: {agv_data} - {e}") continue else: self.logger.info("从RCS系统获取AGV状态数据") agv_response = self.rcs_client.get_agv_status() if agv_response.code == ResponseCode.SUCCESS and agv_response.data: for agv_data in agv_response.data: try: agv_status = from_dict(AGVStatus, agv_data) agv_status_list.append(agv_status) except Exception as e: self.logger.warning(f"解析RCS返回的AGV状态数据失败: {agv_data} - {e}") continue else: error_msg = f"无法获取AGV状态: {agv_response.msg if agv_response else 'RCS系统连接失败'}" self.logger.error(error_msg) use_fallback_allocation = getattr(self, 'use_fallback_allocation', True) if use_fallback_allocation: self.logger.warning("使用轮询分配作为备用方案") return self._simple_round_robin_allocation(tasks) else: return jsonify(to_dict(create_error_response( ResponseCode.SERVER_ERROR, error_msg ))) if not agv_status_list: return jsonify(to_dict(create_error_response(ResponseCode.NO_DATA, "没有可用的AGV状态数据"))) self.agv_manager.update_agv_data(agv_status_list) self.logger.info(f"更新了 {len(agv_status_list)} 个AGV状态") all_agvs = self.agv_manager.get_all_agvs() self.logger.info(f"算法系统中当前有 {len(all_agvs)} 个AGV模型") available_count = 0 for agv in all_agvs: can_accept = agv.can_accept_task(5) self.logger.debug(f"AGV {agv.agvId}: status={agv.status}, can_accept_task={can_accept}, " f"is_overloaded={agv.is_overloaded()}, task_count={agv.current_task_count}") if can_accept: available_count += 1 self.logger.info(f"其中 {available_count} 个AGV可以接受任务") if available_count == 0: self.logger.warning("没有AGV可以接受任务!详细状态:") for agv in all_agvs: status_str = str(agv.status) self.logger.warning(f" AGV {agv.agvId}: 状态={status_str}, 类型={type(agv.status)}, " f"任务数={agv.current_task_count}/{agv.max_capacity}") allocator = TaskAllocationFactory.create_allocator( self.task_allocation_algorithm, self.agv_manager ) start_time = time.time() assignments = allocator.allocate_tasks(tasks) end_time = time.time() allocation_time = (end_time - start_time) * 1000 # 转换为毫秒 assignment_data = [to_dict(assignment) for assignment in assignments] self.logger.info(f"任务分配完成,分配了 {len(assignments)} 个任务,耗时: {allocation_time:.2f}ms") response = create_success_response(assignment_data) return jsonify(to_dict(response)) except Exception as e: self.logger.error(f"处理任务分配请求失败: {e}", exc_info=True) return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}"))) def _handle_path_planning_request(self) -> Dict[str, Any]: """处理路径规划请求""" try: # 验证请求数据 if not request.is_json: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误"))) request_data = request.get_json() if not request_data: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据为空"))) agv_id = request_data.get("agvId") start_code = request_data.get("start") end_code = request_data.get("end") constraints = request_data.get("constraints", []) if not agv_id or not start_code or not end_code: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "缺少必要参数"))) self.logger.info(f"收到路径规划请求 - AGV: {agv_id}, 从 {start_code} 到 {end_code}") # 创建路径规划器 path_planner = PathPlanningFactory.create_path_planner( self.path_planning_algorithm, self.path_mapping ) # 执行路径规划 start_time = time.time() planned_path = path_planner.plan_path(start_code, end_code, constraints) end_time = time.time() planning_time = (end_time - start_time) * 1000 # 转换为毫秒 if planned_path: # 设置AGV ID planned_path.agvId = agv_id # 转换为响应格式 path_data = to_dict(planned_path) self.logger.info(f"路径规划完成 - AGV: {agv_id}, 路径长度: {len(planned_path.codeList)}, 耗时: {planning_time:.2f}ms") response = create_success_response(path_data) return jsonify(to_dict(response)) else: self.logger.warning(f"路径规划失败 - AGV: {agv_id}, 从 {start_code} 到 {end_code}") return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "无法规划路径"))) except Exception as e: self.logger.error(f"处理路径规划请求失败: {e}", exc_info=True) return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}"))) def _handle_batch_path_planning_request(self) -> Dict[str, Any]: """处理批量路径规划请求""" try: if not request.is_json: request_data = {} else: request_data = request.get_json() or {} constraints = request_data.get("constraints", []) include_idle_agv = request_data.get("includeIdleAgv", False) use_enhanced_planning = request_data.get("useEnhancedPlanning", True) agv_status_from_request = ( request_data.get("agvStatusList", []) or request_data.get("agvs", []) ) self.logger.info(f"收到批量路径规划请求,包含空闲AGV: {include_idle_agv}, " f"请求中AGV数量: {len(agv_status_from_request)}, " f"使用增强规划: {use_enhanced_planning}") agv_status_list = [] if agv_status_from_request: self.logger.info("使用请求中提供的AGV状态数据进行路径规划") for agv_data in agv_status_from_request: try: agv_status = from_dict(AGVStatus, agv_data) agv_status_list.append(agv_status) except Exception as e: self.logger.warning(f"解析请求中的AGV状态数据失败: {agv_data} - {e}") continue else: self.logger.info("从RCS系统获取AGV状态数据进行路径规划") agv_response = self.rcs_client.get_agv_status() if agv_response.code != ResponseCode.SUCCESS or not agv_response.data: error_msg = f"无法获取AGV状态进行路径规划: {agv_response.msg if agv_response else 'RCS系统连接失败'}" self.logger.error(error_msg) return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, error_msg))) for agv_data in agv_response.data: try: agv_status = from_dict(AGVStatus, agv_data) agv_status_list.append(agv_status) except Exception as e: self.logger.warning(f"解析RCS返回的AGV状态数据失败: {agv_data} - {e}") continue if not agv_status_list: error_msg = "没有可用的AGV状态数据进行路径规划" self.logger.error(error_msg) return jsonify(to_dict(create_error_response(ResponseCode.NO_DATA, error_msg))) # 更新AGV模型管理器 self.agv_manager.update_agv_data(agv_status_list) self.logger.info(f"强制使用路径规划以确保包含新字段") result = self._enhanced_batch_path_planning(agv_status_list, include_idle_agv, constraints) response = create_success_response(result) return jsonify(to_dict(response)) except Exception as e: self.logger.error(f"处理批量路径规划请求失败: {e}", exc_info=True) return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}"))) def _enhanced_batch_path_planning(self, agv_status_list: List[AGVStatus], include_idle_agv: bool, constraints: List[Tuple[int, int, float]]) -> Dict[str, Any]: """批量路径规划""" from algorithm_system.algorithms.path_planning import PathPlanningFactory # 创建批量路径规划器 batch_planner = PathPlanningFactory.create_batch_path_planner( algorithm_type=self.path_planning_algorithm, path_mapping=self.path_mapping ) # 执行批量路径规划 result = batch_planner.plan_all_agv_paths( agv_status_list=agv_status_list, include_idle_agv=include_idle_agv, constraints=constraints ) # 提取plannedPaths作为最终返回数据 planned_paths = result.get('plannedPaths', []) self.logger.info(f"批量路径规划完成 - 总AGV: {result['totalAgvs']}, " f"执行任务: {result['executingTasksCount']}, " f"规划路径: {result['plannedPathsCount']}, " f"冲突检测: {result['conflictsDetected']}, " f"总耗时: {result['totalPlanningTime']:.2f}ms") # 只返回路径数组,不包含其他统计信息 return planned_paths def _handle_algorithm_config_request(self) -> Dict[str, Any]: """ 处理算法配置请求 Returns: Dict: 响应数据 """ try: # 验证请求数据 if not request.is_json: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误"))) config_data = request.get_json() if not config_data: return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "配置数据为空"))) # 更新算法配置 if "task_allocation_algorithm" in config_data: self.task_allocation_algorithm = config_data["task_allocation_algorithm"] self.logger.info(f"任务分配算法更新为: {self.task_allocation_algorithm}") if "path_planning_algorithm" in config_data: self.path_planning_algorithm = config_data["path_planning_algorithm"] self.logger.info(f"路径规划算法更新为: {self.path_planning_algorithm}") # 返回当前配置 current_config = { "task_allocation_algorithm": self.task_allocation_algorithm, "path_planning_algorithm": self.path_planning_algorithm } response = create_success_response(current_config) return jsonify(to_dict(response)) except Exception as e: self.logger.error(f"处理算法配置请求失败: {e}", exc_info=True) return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}"))) def _simple_round_robin_allocation(self, tasks: List[TaskData]) -> Dict[str, Any]: """简单的轮询分配""" assignments = [] # 尝试从RCS系统获取AGV数量 agv_count = 5 # 默认值 try: agv_response = self.rcs_client.get_agv_status() if agv_response.code == ResponseCode.SUCCESS and agv_response.data: agv_count = len(agv_response.data) self.logger.info(f"从RCS系统获取到AGV数量: {agv_count}") else: # 如果RCS返回无数据,使用配置的默认值 from config.settings import DEFAULT_AGV_COUNT agv_count = DEFAULT_AGV_COUNT self.logger.warning(f"无法从RCS获取AGV数量,使用默认值: {agv_count}") except Exception as e: # 如果完全无法连接RCS,使用配置的默认值 from config.settings import DEFAULT_AGV_COUNT agv_count = DEFAULT_AGV_COUNT self.logger.error(f"获取AGV数量失败,使用默认值: {agv_count} - {e}") for i, task in enumerate(tasks): agv_id = f"AGV_{10001 + (i % agv_count)}" assignment = TaskAssignment( taskId=task.taskId, agvId=agv_id ) assignments.append(assignment) assignment_data = [to_dict(assignment) for assignment in assignments] self.logger.info(f"使用简单轮询分配了 {len(assignments)} 个任务,AGV数量: {agv_count}") response = create_success_response(assignment_data) return jsonify(to_dict(response)) def _handle_start_path_monitor_request(self) -> Dict[str, Any]: """处理启动路径监控服务请求""" try: if not self.path_monitor: return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化"))) if self.path_monitor.is_running: return jsonify(to_dict(create_success_response({"message": "路径监控服务已在运行中"}))) # 启动路径监控服务 self.path_monitor.start_monitoring() self.logger.info("路径监控服务已通过API启动") response = create_success_response({ "message": "路径监控服务启动成功", "monitor_status": self.path_monitor.get_monitoring_status() }) return jsonify(to_dict(response)) except Exception as e: self.logger.error(f"启动路径监控服务失败: {e}") return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"启动路径监控服务失败: {str(e)}"))) def _handle_stop_path_monitor_request(self) -> Dict[str, Any]: """处理停止路径监控服务请求""" try: if not self.path_monitor: return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化"))) if not self.path_monitor.is_running: return jsonify(to_dict(create_success_response({"message": "路径监控服务未在运行"}))) # 停止路径监控服务 self.path_monitor.stop_monitoring() self.logger.info("路径监控服务已通过API停止") response = create_success_response({ "message": "路径监控服务停止成功", "monitor_status": self.path_monitor.get_monitoring_status() }) return jsonify(to_dict(response)) except Exception as e: self.logger.error(f"停止路径监控服务失败: {e}") return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"停止路径监控服务失败: {str(e)}"))) def _handle_path_monitor_status_request(self) -> Dict[str, Any]: """处理路径监控服务状态查询请求""" try: if not self.path_monitor: return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化"))) monitor_status = self.path_monitor.get_monitoring_status() response = create_success_response({ "monitor_status": monitor_status, "description": "路径监控服务状态信息" }) return jsonify(to_dict(response)) except Exception as e: self.logger.error(f"获取路径监控服务状态失败: {e}") return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"获取路径监控服务状态失败: {str(e)}"))) def start_server(self): """启动算法服务器""" self.logger.info(f"启动算法系统服务器: http://{self.host}:{self.port}") try: # 如果启用了路径监控服务,自动启动 if self.path_monitor and self.enable_path_monitor: self.path_monitor.start_monitoring() self.logger.info("路径监控服务已自动启动") self.app.run( host=self.host, port=self.port, debug=False, threaded=True ) except Exception as e: self.logger.error(f"启动算法服务器失败: {e}") # 确保在启动失败时停止路径监控服务 if self.path_monitor and self.path_monitor.is_running: self.path_monitor.stop_monitoring() raise def stop_server(self): """停止算法服务器""" self.logger.info("停止算法服务器") # 停止路径监控服务 if self.path_monitor and self.path_monitor.is_running: self.path_monitor.stop_monitoring() self.logger.info("路径监控服务已停止") def get_server_status(self) -> Dict[str, Any]: """获取服务器状态""" monitor_status = None if self.path_monitor: monitor_status = self.path_monitor.get_monitoring_status() return { "host": self.host, "port": self.port, "path_mapping_loaded": len(self.path_mapping) > 0, "agv_count": len(self.agv_manager.get_all_agvs()), "algorithms": { "task_allocation": self.task_allocation_algorithm, "path_planning": self.path_planning_algorithm }, "path_monitor": monitor_status, "timestamp": time.time() }