zhang
2025-07-02 2fa19599467263dcf582bb12906e03328e03b4a4
初版提交
22个文件已添加
32215 ■■■■■ 已修改文件
.idea/.gitignore 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/misc.xml 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
RCS与算法的接口协议v1.1.docx 补丁 | 查看 | 原始文档 | blame | 历史
ZY_algorithm_system.zip 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/__init__.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/algorithm_server.py 623 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/algorithms/__init__.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/algorithms/collision_detection.py 701 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/algorithms/path_planning.py 1131 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/algorithms/task_allocation.py 687 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/models/__init__.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/models/agv_model.py 572 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/path_monitor.py 442 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/__init__.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/api_client.py 124 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/data_models.py 184 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
common/utils.py 286 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/environment.py 360 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/settings.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
environment.json 14960 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
path_mapping.json 11880 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
run_algorithm.py 216 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/.gitignore
New file
@@ -0,0 +1,10 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Environment-dependent path to Maven home directory
/mavenHomeManager.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
.idea/misc.xml
New file
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="KubernetesApiProvider">{}</component>
</project>
RCSÓëËã·¨µÄ½Ó¿ÚЭÒév1.1.docx
Binary files differ
ZY_algorithm_system.zip
Binary files differ
algorithm_system/__init__.py
New file
@@ -0,0 +1,2 @@
# Algorithm System Module
__version__ = "1.0.0"
algorithm_system/algorithm_server.py
New file
@@ -0,0 +1,623 @@
"""
算法系统服务器
"""
import json
import logging
import os
import time
from typing import Dict, List, Optional, Any, Tuple
from flask import Flask, request, jsonify
from common.data_models import (
    TaskData, AGVStatus, TaskAssignment, PlannedPath,
    create_success_response, create_error_response, ResponseCode, to_dict, from_dict
)
from common.utils import load_path_mapping
from algorithm_system.models.agv_model import AGVModelManager
from algorithm_system.algorithms.task_allocation import TaskAllocationFactory
from algorithm_system.algorithms.path_planning import PathPlanningFactory
from common.api_client import RCSAPIClient
from algorithm_system.path_monitor import PathMonitorService
try:
    from config.settings import (
        ALGORITHM_SERVER_HOST, ALGORITHM_SERVER_PORT,
        RCS_SERVER_HOST, RCS_SERVER_PORT,
        MONITOR_POLLING_INTERVAL
    )
except ImportError:
    ALGORITHM_SERVER_HOST = "10.10.10.239"
    ALGORITHM_SERVER_PORT = 8002
    RCS_SERVER_HOST = "10.10.10.156"
    RCS_SERVER_PORT = 8088
    MONITOR_POLLING_INTERVAL = 5.0
    logging.warning("无法从config.settings导入配置,使用默认值")
class AlgorithmServer:
    """算法系统服务器"""
    def __init__(self, host: str = None, port: int = None, enable_path_monitor: bool = True,
                 monitor_interval: float = None):
        """初始化算法服务器"""
        self.host = host or ALGORITHM_SERVER_HOST
        self.port = port or ALGORITHM_SERVER_PORT
        self.enable_path_monitor = enable_path_monitor
        self.monitor_interval = monitor_interval or MONITOR_POLLING_INTERVAL
        self.logger = logging.getLogger(__name__)
        self.app = Flask(__name__)
        self.path_mapping = load_path_mapping()
        self.agv_manager = AGVModelManager(self.path_mapping)
        self.rcs_client = RCSAPIClient()
        self.task_allocation_algorithm = "LOAD_BALANCED"  # é»˜è®¤ä»»åŠ¡åˆ†é…
        self.path_planning_algorithm = "A_STAR"  # é»˜è®¤è·¯å¾„规划
        self.path_monitor = None
        if self.enable_path_monitor:
            self.path_monitor = PathMonitorService(
                rcs_host=RCS_SERVER_HOST,
                rcs_port=RCS_SERVER_PORT,
                poll_interval=self.monitor_interval,
                path_algorithm=self.path_planning_algorithm,
                auto_send_paths=True  # è‡ªåŠ¨å‘é€è·¯å¾„åˆ°RCS
            )
            self.logger.info(f"路径监控服务已初始化 - è½®è¯¢é—´éš”: {self.monitor_interval}s, è‡ªåŠ¨å‘é€è·¯å¾„: True")
        self._setup_routes()
        self.logger.info("算法系统服务器初始化完成")
    def _setup_routes(self):
        """设置API路由"""
        @self.app.route('/open/task/send/v1', methods=['POST'])
        def task_send_endpoint():
            """任务下发接口"""
            return self._handle_task_send_request()
        @self.app.route('/open/path/plan/v1', methods=['POST'])
        def path_plan_endpoint():
            """路径规划接口"""
            return self._handle_path_planning_request()
        @self.app.route('/open/path/batch/plan/v1', methods=['POST'])
        def batch_path_plan_endpoint():
            """批量路径规划接口"""
            return self._handle_batch_path_planning_request()
        @self.app.route('/open/algorithm/config/v1', methods=['POST'])
        def algorithm_config_endpoint():
            """算法配置接口"""
            return self._handle_algorithm_config_request()
        @self.app.route('/monitor/path/start/v1', methods=['POST'])
        def start_path_monitor_endpoint():
            """启动路径监控服务接口"""
            return self._handle_start_path_monitor_request()
        @self.app.route('/monitor/path/stop/v1', methods=['POST'])
        def stop_path_monitor_endpoint():
            """停止路径监控服务接口"""
            return self._handle_stop_path_monitor_request()
        @self.app.route('/monitor/path/status/v1', methods=['GET'])
        def path_monitor_status_endpoint():
            """路径监控服务状态查询接口"""
            return self._handle_path_monitor_status_request()
        @self.app.route('/health', methods=['GET'])
        def health_check():
            """健康检查接口"""
            monitor_status = None
            if self.path_monitor:
                monitor_status = self.path_monitor.get_monitoring_status()
            return jsonify({
                "status": "healthy",
                "service": "algorithm_system",
                "timestamp": time.time(),
                "path_mapping_loaded": len(self.path_mapping) > 0,
                "algorithms": {
                    "task_allocation": self.task_allocation_algorithm,
                    "path_planning": self.path_planning_algorithm
                },
                "agv_count": len(self.agv_manager.get_all_agvs()),
                "path_monitor": monitor_status,
                "available_endpoints": [
                    "POST /open/task/send/v1 - ä»»åŠ¡åˆ†é…æŽ¥å£",
                    "POST /open/path/plan/v1 - å•路径规划接口",
                    "POST /open/path/batch/plan/v1 - æ‰¹é‡è·¯å¾„规划接口",
                    "POST /open/algorithm/config/v1 - ç®—法配置接口",
                    "POST /monitor/path/start/v1 - å¯åŠ¨è·¯å¾„ç›‘æŽ§æœåŠ¡",
                    "POST /monitor/path/stop/v1 - åœæ­¢è·¯å¾„监控服务",
                    "GET /monitor/path/status/v1 - è·¯å¾„监控服务状态查询",
                    "GET /health - å¥åº·æ£€æŸ¥æŽ¥å£"
                ]
            })
        @self.app.errorhandler(404)
        def not_found(error):
            """404错误处理"""
            return jsonify(to_dict(create_error_response(404, "接口不存在"))), 404
        @self.app.errorhandler(500)
        def internal_error(error):
            """500错误处理"""
            self.logger.error(f"内部服务器错误: {error}")
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "内部服务器错误"))), 500
    def _handle_task_send_request(self) -> Dict[str, Any]:
        """处理任务下发请求"""
        try:
            if not request.is_json:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误")))
            request_data = request.get_json()
            if not request_data:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据为空")))
            if isinstance(request_data, dict) and "tasks" in request_data:
                task_data = request_data.get("tasks", [])
                agv_status_data = request_data.get("agvStatus", [])
                self.logger.info(f"收到结构化任务分配请求,任务数量: {len(task_data)}, AGV数量: {len(agv_status_data)}")
            elif isinstance(request_data, list):
                task_data = request_data
                agv_status_data = []
                self.logger.info(f"收到任务分配请求,任务数量: {len(task_data)}")
            else:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "任务数据格式无效")))
            if not task_data or not isinstance(task_data, list):
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "任务数据无效")))
            tasks = []
            for task_dict in task_data:
                try:
                    task = TaskData(
                        taskId=task_dict.get("taskId", ""),
                        start=task_dict.get("start", ""),
                        end=task_dict.get("end", ""),
                        type=task_dict.get("type", "1"),
                        priority=task_dict.get("priority", 5)
                    )
                    tasks.append(task)
                except Exception as e:
                    self.logger.warning(f"解析任务数据失败: {task_dict} - {e}")
                    continue
            if not tasks:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "没有有效的任务数据")))
            agv_status_list = []
            if agv_status_data:
                self.logger.info("使用请求中提供的AGV状态数据")
                for agv_data in agv_status_data:
                    try:
                        agv_status = from_dict(AGVStatus, agv_data)
                        agv_status_list.append(agv_status)
                    except Exception as e:
                        self.logger.warning(f"解析请求中的AGV状态数据失败: {agv_data} - {e}")
                        continue
            else:
                self.logger.info("从RCS系统获取AGV状态数据")
                agv_response = self.rcs_client.get_agv_status()
                if agv_response.code == ResponseCode.SUCCESS and agv_response.data:
                    for agv_data in agv_response.data:
                        try:
                            agv_status = from_dict(AGVStatus, agv_data)
                            agv_status_list.append(agv_status)
                        except Exception as e:
                            self.logger.warning(f"解析RCS返回的AGV状态数据失败: {agv_data} - {e}")
                            continue
                else:
                    error_msg = f"无法获取AGV状态: {agv_response.msg if agv_response else 'RCS系统连接失败'}"
                    self.logger.error(error_msg)
                    use_fallback_allocation = getattr(self, 'use_fallback_allocation', True)
                    if use_fallback_allocation:
                        self.logger.warning("使用轮询分配作为备用方案")
                        return self._simple_round_robin_allocation(tasks)
                    else:
                        return jsonify(to_dict(create_error_response(
                            ResponseCode.SERVER_ERROR,
                            error_msg
                        )))
            if not agv_status_list:
                return jsonify(to_dict(create_error_response(ResponseCode.NO_DATA, "没有可用的AGV状态数据")))
            self.agv_manager.update_agv_data(agv_status_list)
            self.logger.info(f"更新了 {len(agv_status_list)} ä¸ªAGV状态")
            all_agvs = self.agv_manager.get_all_agvs()
            self.logger.info(f"算法系统中当前有 {len(all_agvs)} ä¸ªAGV模型")
            available_count = 0
            for agv in all_agvs:
                can_accept = agv.can_accept_task(5)
                self.logger.debug(f"AGV {agv.agvId}: status={agv.status}, can_accept_task={can_accept}, "
                                f"is_overloaded={agv.is_overloaded()}, task_count={agv.current_task_count}")
                if can_accept:
                    available_count += 1
            self.logger.info(f"其中 {available_count} ä¸ªAGV可以接受任务")
            if available_count == 0:
                self.logger.warning("没有AGV可以接受任务!详细状态:")
                for agv in all_agvs:
                    status_str = str(agv.status)
                    self.logger.warning(f"  AGV {agv.agvId}: çŠ¶æ€={status_str}, ç±»åž‹={type(agv.status)}, "
                                      f"任务数={agv.current_task_count}/{agv.max_capacity}")
            allocator = TaskAllocationFactory.create_allocator(
                self.task_allocation_algorithm,
                self.agv_manager
            )
            start_time = time.time()
            assignments = allocator.allocate_tasks(tasks)
            end_time = time.time()
            allocation_time = (end_time - start_time) * 1000  # è½¬æ¢ä¸ºæ¯«ç§’
            assignment_data = [to_dict(assignment) for assignment in assignments]
            self.logger.info(f"任务分配完成,分配了 {len(assignments)} ä¸ªä»»åŠ¡ï¼Œè€—æ—¶: {allocation_time:.2f}ms")
            response = create_success_response(assignment_data)
            return jsonify(to_dict(response))
        except Exception as e:
            self.logger.error(f"处理任务分配请求失败: {e}", exc_info=True)
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
    def _handle_path_planning_request(self) -> Dict[str, Any]:
        """处理路径规划请求"""
        try:
            # éªŒè¯è¯·æ±‚数据
            if not request.is_json:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误")))
            request_data = request.get_json()
            if not request_data:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据为空")))
            agv_id = request_data.get("agvId")
            start_code = request_data.get("start")
            end_code = request_data.get("end")
            constraints = request_data.get("constraints", [])
            if not agv_id or not start_code or not end_code:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "缺少必要参数")))
            self.logger.info(f"收到路径规划请求 - AGV: {agv_id}, ä»Ž {start_code} åˆ° {end_code}")
            # åˆ›å»ºè·¯å¾„规划器
            path_planner = PathPlanningFactory.create_path_planner(
                self.path_planning_algorithm,
                self.path_mapping
            )
            # æ‰§è¡Œè·¯å¾„规划
            start_time = time.time()
            planned_path = path_planner.plan_path(start_code, end_code, constraints)
            end_time = time.time()
            planning_time = (end_time - start_time) * 1000  # è½¬æ¢ä¸ºæ¯«ç§’
            if planned_path:
                # è®¾ç½®AGV ID
                planned_path.agvId = agv_id
                # è½¬æ¢ä¸ºå“åº”格式
                path_data = to_dict(planned_path)
                self.logger.info(f"路径规划完成 - AGV: {agv_id}, è·¯å¾„长度: {len(planned_path.codeList)}, è€—æ—¶: {planning_time:.2f}ms")
                response = create_success_response(path_data)
                return jsonify(to_dict(response))
            else:
                self.logger.warning(f"路径规划失败 - AGV: {agv_id}, ä»Ž {start_code} åˆ° {end_code}")
                return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "无法规划路径")))
        except Exception as e:
            self.logger.error(f"处理路径规划请求失败: {e}", exc_info=True)
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
    def _handle_batch_path_planning_request(self) -> Dict[str, Any]:
        """处理批量路径规划请求"""
        try:
            if not request.is_json:
                request_data = {}
            else:
                request_data = request.get_json() or {}
            constraints = request_data.get("constraints", [])
            include_idle_agv = request_data.get("includeIdleAgv", False)
            use_enhanced_planning = request_data.get("useEnhancedPlanning", True)
            agv_status_from_request = (
                request_data.get("agvStatusList", []) or
                request_data.get("agvs", [])
            )
            self.logger.info(f"收到批量路径规划请求,包含空闲AGV: {include_idle_agv}, "
                           f"请求中AGV数量: {len(agv_status_from_request)}, "
                           f"使用增强规划: {use_enhanced_planning}")
            agv_status_list = []
            if agv_status_from_request:
                self.logger.info("使用请求中提供的AGV状态数据进行路径规划")
                for agv_data in agv_status_from_request:
                    try:
                        agv_status = from_dict(AGVStatus, agv_data)
                        agv_status_list.append(agv_status)
                    except Exception as e:
                        self.logger.warning(f"解析请求中的AGV状态数据失败: {agv_data} - {e}")
                        continue
            else:
                self.logger.info("从RCS系统获取AGV状态数据进行路径规划")
                agv_response = self.rcs_client.get_agv_status()
                if agv_response.code != ResponseCode.SUCCESS or not agv_response.data:
                    error_msg = f"无法获取AGV状态进行路径规划: {agv_response.msg if agv_response else 'RCS系统连接失败'}"
                    self.logger.error(error_msg)
                    return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, error_msg)))
                for agv_data in agv_response.data:
                    try:
                        agv_status = from_dict(AGVStatus, agv_data)
                        agv_status_list.append(agv_status)
                    except Exception as e:
                        self.logger.warning(f"解析RCS返回的AGV状态数据失败: {agv_data} - {e}")
                        continue
            if not agv_status_list:
                error_msg = "没有可用的AGV状态数据进行路径规划"
                self.logger.error(error_msg)
                return jsonify(to_dict(create_error_response(ResponseCode.NO_DATA, error_msg)))
            # æ›´æ–°AGV模型管理器
            self.agv_manager.update_agv_data(agv_status_list)
            self.logger.info(f"强制使用路径规划以确保包含新字段")
            result = self._enhanced_batch_path_planning(agv_status_list, include_idle_agv, constraints)
            response = create_success_response(result)
            return jsonify(to_dict(response))
        except Exception as e:
            self.logger.error(f"处理批量路径规划请求失败: {e}", exc_info=True)
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
    def _enhanced_batch_path_planning(self, agv_status_list: List[AGVStatus],
                                    include_idle_agv: bool,
                                    constraints: List[Tuple[int, int, float]]) -> Dict[str, Any]:
        """批量路径规划"""
        from algorithm_system.algorithms.path_planning import PathPlanningFactory
        # åˆ›å»ºæ‰¹é‡è·¯å¾„规划器
        batch_planner = PathPlanningFactory.create_batch_path_planner(
            algorithm_type=self.path_planning_algorithm,
            path_mapping=self.path_mapping
        )
        # æ‰§è¡Œæ‰¹é‡è·¯å¾„规划
        result = batch_planner.plan_all_agv_paths(
            agv_status_list=agv_status_list,
            include_idle_agv=include_idle_agv,
            constraints=constraints
        )
        # æå–plannedPaths作为最终返回数据
        planned_paths = result.get('plannedPaths', [])
        self.logger.info(f"批量路径规划完成 - æ€»AGV: {result['totalAgvs']}, "
                        f"执行任务: {result['executingTasksCount']}, "
                        f"规划路径: {result['plannedPathsCount']}, "
                        f"冲突检测: {result['conflictsDetected']}, "
                        f"总耗时: {result['totalPlanningTime']:.2f}ms")
        # åªè¿”回路径数组,不包含其他统计信息
        return planned_paths
    def _handle_algorithm_config_request(self) -> Dict[str, Any]:
        """
        å¤„理算法配置请求
        Returns:
            Dict: å“åº”数据
        """
        try:
            # éªŒè¯è¯·æ±‚数据
            if not request.is_json:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "请求数据格式错误")))
            config_data = request.get_json()
            if not config_data:
                return jsonify(to_dict(create_error_response(ResponseCode.PARAM_EMPTY, "配置数据为空")))
            # æ›´æ–°ç®—法配置
            if "task_allocation_algorithm" in config_data:
                self.task_allocation_algorithm = config_data["task_allocation_algorithm"]
                self.logger.info(f"任务分配算法更新为: {self.task_allocation_algorithm}")
            if "path_planning_algorithm" in config_data:
                self.path_planning_algorithm = config_data["path_planning_algorithm"]
                self.logger.info(f"路径规划算法更新为: {self.path_planning_algorithm}")
            # è¿”回当前配置
            current_config = {
                "task_allocation_algorithm": self.task_allocation_algorithm,
                "path_planning_algorithm": self.path_planning_algorithm
            }
            response = create_success_response(current_config)
            return jsonify(to_dict(response))
        except Exception as e:
            self.logger.error(f"处理算法配置请求失败: {e}", exc_info=True)
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"服务器错误: {str(e)}")))
    def _simple_round_robin_allocation(self, tasks: List[TaskData]) -> Dict[str, Any]:
        """简单的轮询分配"""
        assignments = []
        # å°è¯•从RCS系统获取AGV数量
        agv_count = 5  # é»˜è®¤å€¼
        try:
            agv_response = self.rcs_client.get_agv_status()
            if agv_response.code == ResponseCode.SUCCESS and agv_response.data:
                agv_count = len(agv_response.data)
                self.logger.info(f"从RCS系统获取到AGV数量: {agv_count}")
            else:
                # å¦‚æžœRCS返回无数据,使用配置的默认值
                from config.settings import DEFAULT_AGV_COUNT
                agv_count = DEFAULT_AGV_COUNT
                self.logger.warning(f"无法从RCS获取AGV数量,使用默认值: {agv_count}")
        except Exception as e:
            # å¦‚果完全无法连接RCS,使用配置的默认值
            from config.settings import DEFAULT_AGV_COUNT
            agv_count = DEFAULT_AGV_COUNT
            self.logger.error(f"获取AGV数量失败,使用默认值: {agv_count} - {e}")
        for i, task in enumerate(tasks):
            agv_id = f"AGV_{10001 + (i % agv_count)}"
            assignment = TaskAssignment(
                taskId=task.taskId,
                agvId=agv_id
            )
            assignments.append(assignment)
        assignment_data = [to_dict(assignment) for assignment in assignments]
        self.logger.info(f"使用简单轮询分配了 {len(assignments)} ä¸ªä»»åŠ¡ï¼ŒAGV数量: {agv_count}")
        response = create_success_response(assignment_data)
        return jsonify(to_dict(response))
    def _handle_start_path_monitor_request(self) -> Dict[str, Any]:
        """处理启动路径监控服务请求"""
        try:
            if not self.path_monitor:
                return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化")))
            if self.path_monitor.is_running:
                return jsonify(to_dict(create_success_response({"message": "路径监控服务已在运行中"})))
            # å¯åŠ¨è·¯å¾„ç›‘æŽ§æœåŠ¡
            self.path_monitor.start_monitoring()
            self.logger.info("路径监控服务已通过API启动")
            response = create_success_response({
                "message": "路径监控服务启动成功",
                "monitor_status": self.path_monitor.get_monitoring_status()
            })
            return jsonify(to_dict(response))
        except Exception as e:
            self.logger.error(f"启动路径监控服务失败: {e}")
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"启动路径监控服务失败: {str(e)}")))
    def _handle_stop_path_monitor_request(self) -> Dict[str, Any]:
        """处理停止路径监控服务请求"""
        try:
            if not self.path_monitor:
                return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化")))
            if not self.path_monitor.is_running:
                return jsonify(to_dict(create_success_response({"message": "路径监控服务未在运行"})))
            # åœæ­¢è·¯å¾„监控服务
            self.path_monitor.stop_monitoring()
            self.logger.info("路径监控服务已通过API停止")
            response = create_success_response({
                "message": "路径监控服务停止成功",
                "monitor_status": self.path_monitor.get_monitoring_status()
            })
            return jsonify(to_dict(response))
        except Exception as e:
            self.logger.error(f"停止路径监控服务失败: {e}")
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"停止路径监控服务失败: {str(e)}")))
    def _handle_path_monitor_status_request(self) -> Dict[str, Any]:
        """处理路径监控服务状态查询请求"""
        try:
            if not self.path_monitor:
                return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, "路径监控服务未初始化")))
            monitor_status = self.path_monitor.get_monitoring_status()
            response = create_success_response({
                "monitor_status": monitor_status,
                "description": "路径监控服务状态信息"
            })
            return jsonify(to_dict(response))
        except Exception as e:
            self.logger.error(f"获取路径监控服务状态失败: {e}")
            return jsonify(to_dict(create_error_response(ResponseCode.SERVER_ERROR, f"获取路径监控服务状态失败: {str(e)}")))
    def start_server(self):
        """启动算法服务器"""
        self.logger.info(f"启动算法系统服务器: http://{self.host}:{self.port}")
        try:
            # å¦‚果启用了路径监控服务,自动启动
            if self.path_monitor and self.enable_path_monitor:
                self.path_monitor.start_monitoring()
                self.logger.info("路径监控服务已自动启动")
            self.app.run(
                host=self.host,
                port=self.port,
                debug=False,
                threaded=True
            )
        except Exception as e:
            self.logger.error(f"启动算法服务器失败: {e}")
            # ç¡®ä¿åœ¨å¯åŠ¨å¤±è´¥æ—¶åœæ­¢è·¯å¾„ç›‘æŽ§æœåŠ¡
            if self.path_monitor and self.path_monitor.is_running:
                self.path_monitor.stop_monitoring()
            raise
    def stop_server(self):
        """停止算法服务器"""
        self.logger.info("停止算法服务器")
        # åœæ­¢è·¯å¾„监控服务
        if self.path_monitor and self.path_monitor.is_running:
            self.path_monitor.stop_monitoring()
            self.logger.info("路径监控服务已停止")
    def get_server_status(self) -> Dict[str, Any]:
        """获取服务器状态"""
        monitor_status = None
        if self.path_monitor:
            monitor_status = self.path_monitor.get_monitoring_status()
        return {
            "host": self.host,
            "port": self.port,
            "path_mapping_loaded": len(self.path_mapping) > 0,
            "agv_count": len(self.agv_manager.get_all_agvs()),
            "algorithms": {
                "task_allocation": self.task_allocation_algorithm,
                "path_planning": self.path_planning_algorithm
            },
            "path_monitor": monitor_status,
            "timestamp": time.time()
        }
algorithm_system/algorithms/__init__.py
New file
@@ -0,0 +1,2 @@
# Algorithm System Algorithms Module
__version__ = "1.0.0"
algorithm_system/algorithms/collision_detection.py
New file
@@ -0,0 +1,701 @@
"""
AGV路径碰撞检测和解决模块
"""
import math
import logging
from typing import Dict, List, Tuple, Optional, Set
from dataclasses import dataclass
from collections import defaultdict
from common.data_models import PlannedPath, PathCode, AGVActionTypeEnum
from common.utils import get_coordinate_from_path_id
@dataclass
class SpaceTimeNode:
    """时空节点 - è¡¨ç¤ºAGV在特定时间和位置的状态"""
    agv_id: str
    position: str  # ä½ç½®ç 
    coordinates: Tuple[int, int]  # åæ ‡
    time_step: int  # æ—¶é—´æ­¥
    direction: str  # æ–¹å‘
@dataclass
class Conflict:
    """冲突描述"""
    type: str  # å†²çªç±»åž‹: "vertex", "edge", "follow"
    agv1: str
    agv2: str
    time_step: int
    position1: str
    position2: str
    description: str
@dataclass
class AGVPriority:
    """AGV优先级评估结果"""
    agv_id: str
    priority_score: float  # ä¼˜å…ˆçº§åˆ†æ•°ï¼Œè¶Šé«˜è¶Šä¼˜å…ˆ
    task_status: str  # ä»»åŠ¡çŠ¶æ€
    action_type: str  # åŠ¨ä½œç±»åž‹
    task_priority: int  # ä»»åŠ¡ä¼˜å…ˆçº§
    voltage: int  # ç”µé‡
    explanation: str  # ä¼˜å…ˆçº§è¯´æ˜Ž
class CollisionDetector:
    """AGV路径碰撞检测器"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]],
                 min_distance: float = 3.0, time_buffer: int = 1):
        """
        åˆå§‹åŒ–碰撞检测器
        Args:
            path_mapping: è·¯å¾„点映射字典
            min_distance: AGV之间的最小安全距离
            time_buffer: æ—¶é—´ç¼“冲区(时间步数)
        """
        self.path_mapping = path_mapping
        self.min_distance = min_distance
        self.time_buffer = time_buffer
        self.logger = logging.getLogger(__name__)
    def detect_conflicts(self, planned_paths: List[Dict]) -> List[Conflict]:
        """
        æ£€æµ‹æ‰€æœ‰AGV路径之间的冲突
        Args:
            planned_paths: è§„划路径列表
        Returns:
            List[Conflict]: å†²çªåˆ—表
        """
        conflicts = []
        # æž„建时空表
        space_time_table = self._build_space_time_table(planned_paths)
        # æ£€æµ‹é¡¶ç‚¹å†²çªï¼ˆåŒä¸€æ—¶é—´åŒä¸€ä½ç½®ï¼‰
        conflicts.extend(self._detect_vertex_conflicts(space_time_table))
        # æ£€æµ‹è¾¹å†²çªï¼ˆAGV交换位置)
        conflicts.extend(self._detect_edge_conflicts(space_time_table))
        # æ£€æµ‹è·Ÿéšå†²çªï¼ˆAGV距离过近)
        conflicts.extend(self._detect_following_conflicts(space_time_table))
        self.logger.info(f"检测到 {len(conflicts)} ä¸ªè·¯å¾„冲突")
        return conflicts
    def _build_space_time_table(self, planned_paths: List[Dict]) -> Dict[int, List[SpaceTimeNode]]:
        """
        æž„建时空表
        Args:
            planned_paths: è§„划路径列表
        Returns:
            Dict[int, List[SpaceTimeNode]]: æ—¶é—´æ­¥ -> æ—¶ç©ºèŠ‚ç‚¹åˆ—è¡¨
        """
        space_time_table = defaultdict(list)
        for path_data in planned_paths:
            agv_id = path_data.get('agvId', '')
            code_list = path_data.get('codeList', [])
            for time_step, path_code in enumerate(code_list):
                position = path_code.get('code', '') if isinstance(path_code, dict) else path_code.code
                direction = path_code.get('direction', '90') if isinstance(path_code, dict) else path_code.direction
                coordinates = get_coordinate_from_path_id(position, self.path_mapping)
                if coordinates:
                    node = SpaceTimeNode(
                        agv_id=agv_id,
                        position=position,
                        coordinates=coordinates,
                        time_step=time_step,
                        direction=direction
                    )
                    space_time_table[time_step].append(node)
        return space_time_table
    def _detect_vertex_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
        """检测顶点冲突(同一时间同一位置)"""
        conflicts = []
        for time_step, nodes in space_time_table.items():
            # æŒ‰ä½ç½®åˆ†ç»„
            position_groups = defaultdict(list)
            for node in nodes:
                position_groups[node.position].append(node)
            # æ£€æŸ¥æ¯ä¸ªä½ç½®æ˜¯å¦æœ‰å¤šä¸ªAGV
            for position, agv_nodes in position_groups.items():
                if len(agv_nodes) > 1:
                    # å‘现冲突
                    for i in range(len(agv_nodes)):
                        for j in range(i + 1, len(agv_nodes)):
                            conflict = Conflict(
                                type="vertex",
                                agv1=agv_nodes[i].agv_id,
                                agv2=agv_nodes[j].agv_id,
                                time_step=time_step,
                                position1=position,
                                position2=position,
                                description=f"AGV {agv_nodes[i].agv_id} å’Œ {agv_nodes[j].agv_id} åœ¨æ—¶é—´ {time_step} å ç”¨åŒä¸€ä½ç½® {position}"
                            )
                            conflicts.append(conflict)
        return conflicts
    def _detect_edge_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
        """检测边冲突(AGV交换位置)"""
        conflicts = []
        max_time = max(space_time_table.keys()) if space_time_table else 0
        for time_step in range(max_time):
            current_nodes = space_time_table.get(time_step, [])
            next_nodes = space_time_table.get(time_step + 1, [])
            # æž„建当前和下一时刻的位置映射
            current_positions = {node.agv_id: node.position for node in current_nodes}
            next_positions = {node.agv_id: node.position for node in next_nodes}
            # æ£€æŸ¥æ˜¯å¦æœ‰AGV交换位置
            for agv1, pos1_current in current_positions.items():
                for agv2, pos2_current in current_positions.items():
                    if agv1 >= agv2:  # é¿å…é‡å¤æ£€æŸ¥
                        continue
                    pos1_next = next_positions.get(agv1)
                    pos2_next = next_positions.get(agv2)
                    if (pos1_next and pos2_next and
                        pos1_current == pos2_next and pos2_current == pos1_next):
                        # å‘现边冲突
                        conflict = Conflict(
                            type="edge",
                            agv1=agv1,
                            agv2=agv2,
                            time_step=time_step,
                            position1=pos1_current,
                            position2=pos2_current,
                            description=f"AGV {agv1} å’Œ {agv2} åœ¨æ—¶é—´ {time_step}-{time_step+1} äº¤æ¢ä½ç½® {pos1_current}<->{pos2_current}"
                        )
                        conflicts.append(conflict)
        return conflicts
    def _detect_following_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
        """检测跟随冲突(AGV距离过近)"""
        conflicts = []
        for time_step, nodes in space_time_table.items():
            # æ£€æŸ¥æ‰€æœ‰AGV对之间的距离
            for i in range(len(nodes)):
                for j in range(i + 1, len(nodes)):
                    node1, node2 = nodes[i], nodes[j]
                    # è®¡ç®—欧几里得距离
                    distance = math.sqrt(
                        (node1.coordinates[0] - node2.coordinates[0]) ** 2 +
                        (node1.coordinates[1] - node2.coordinates[1]) ** 2
                    )
                    if distance < self.min_distance and distance > 0:
                        conflict = Conflict(
                            type="follow",
                            agv1=node1.agv_id,
                            agv2=node2.agv_id,
                            time_step=time_step,
                            position1=node1.position,
                            position2=node2.position,
                            description=f"AGV {node1.agv_id} å’Œ {node2.agv_id} åœ¨æ—¶é—´ {time_step} è·ç¦»è¿‡è¿‘ ({distance:.2f} < {self.min_distance})"
                        )
                        conflicts.append(conflict)
        return conflicts
class CollisionResolver:
    """AGV路径碰撞解决器"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]], detector: CollisionDetector, agv_manager=None):
        """
        åˆå§‹åŒ–碰撞解决器
        Args:
            path_mapping: è·¯å¾„点映射字典
            detector: ç¢°æ’žæ£€æµ‹å™¨
            agv_manager: AGV管理器,用于获取AGV状态信息
        """
        self.path_mapping = path_mapping
        self.detector = detector
        self.agv_manager = agv_manager
        self.logger = logging.getLogger(__name__)
    def evaluate_agv_priority(self, agv_id: str, path: Dict, executing_tasks: List[Dict] = None) -> AGVPriority:
        """
        è¯„ä¼°AGV的避让优先级
        Args:
            agv_id: AGV ID
            path: AGV路径信息
            executing_tasks: æ‰§è¡Œä¸­çš„任务列表
        Returns:
            AGVPriority: AGV优先级评估结果
        """
        # é»˜è®¤å€¼
        task_status = "idle"
        action_type = "unknown"
        task_priority = 1
        voltage = 100
        priority_score = 0.0
        explanation_parts = []
        # èŽ·å–AGV状态信息
        agv_model = None
        if self.agv_manager:
            agv_model = self.agv_manager.get_agv_by_id(agv_id)
            if agv_model:
                voltage = agv_model.voltage
        # ä»Žè·¯å¾„信息中获取动作类型
        code_list = path.get('codeList', [])
        if code_list:
            first_code = code_list[0]
            if isinstance(first_code, dict):
                action_type = first_code.get('type', 'unknown')
        # ä»Žæ‰§è¡Œä¸­ä»»åŠ¡èŽ·å–ä»»åŠ¡çŠ¶æ€å’Œä¼˜å…ˆçº§
        if executing_tasks:
            for task in executing_tasks:
                if task.get('agvId') == agv_id:
                    task_status = task.get('status', 'idle')
                    # å°è¯•从任务ID获取优先级(简化处理)
                    task_id = task.get('taskId', '')
                    if 'HIGH_PRIORITY' in task_id:
                        task_priority = 10
                    elif 'PRIORITY' in task_id:
                        task_priority = 8
                    else:
                        task_priority = 5
                    break
        # è®¡ç®—优先级分数(分数越高优先级越高,越不容易让步)
        priority_score = 0.0
        # 1. ä»»åŠ¡çŠ¶æ€ä¼˜å…ˆçº§ (40%权重)
        if task_status == "executing":
            priority_score += 40.0
            explanation_parts.append("执行任务中(+40)")
        elif task_status == "assigned":
            priority_score += 20.0
            explanation_parts.append("已分配任务(+20)")
        else:
            priority_score += 5.0
            explanation_parts.append("空闲状态(+5)")
        # 2. åŠ¨ä½œç±»åž‹ä¼˜å…ˆçº§ (30%权重)
        if action_type == AGVActionTypeEnum.TASK.value:
            priority_score += 30.0
            explanation_parts.append("任务行为(+30)")
        elif action_type == AGVActionTypeEnum.CHARGING.value:
            priority_score += 25.0
            explanation_parts.append("充电行为(+25)")
        elif action_type == AGVActionTypeEnum.AVOIDANCE.value:
            priority_score += 5.0
            explanation_parts.append("避让行为(+5)")
        elif action_type == AGVActionTypeEnum.STANDBY.value:
            priority_score += 10.0
            explanation_parts.append("待机行为(+10)")
        else:
            priority_score += 15.0
            explanation_parts.append("未知行为(+15)")
        # 3. ä»»åŠ¡ä¼˜å…ˆçº§ (20%权重)
        task_priority_score = (task_priority / 10.0) * 20.0
        priority_score += task_priority_score
        explanation_parts.append(f"任务优先级{task_priority}(+{task_priority_score:.1f})")
        # 4. ç”µé‡çŠ¶æ€ (10%权重)
        if voltage <= 10:  # å¿…须充电
            priority_score += 10.0
            explanation_parts.append("电量危险(+10)")
        elif voltage <= 20:  # å¯è‡ªåŠ¨å……ç”µ
            priority_score += 8.0
            explanation_parts.append("电量偏低(+8)")
        elif voltage <= 50:
            priority_score += 5.0
            explanation_parts.append("电量一般(+5)")
        else:
            priority_score += 2.0
            explanation_parts.append("电量充足(+2)")
        explanation = f"总分{priority_score:.1f}: " + ", ".join(explanation_parts)
        return AGVPriority(
            agv_id=agv_id,
            priority_score=priority_score,
            task_status=task_status,
            action_type=action_type,
            task_priority=task_priority,
            voltage=voltage,
            explanation=explanation
        )
    def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Conflict], executing_tasks: List[Dict] = None) -> List[Dict]:
        """
        è§£å†³å†²çª
        Args:
            planned_paths: åŽŸå§‹è§„åˆ’è·¯å¾„
            conflicts: å†²çªåˆ—表
            executing_tasks: æ‰§è¡Œä¸­çš„任务列表
        Returns:
            List[Dict]: è§£å†³å†²çªåŽçš„路径
        """
        if not conflicts:
            return planned_paths
        resolved_paths = [path.copy() for path in planned_paths]
        # æŒ‰æ—¶é—´æ­¥æŽ’序冲突,优先解决早期冲突
        conflicts_sorted = sorted(conflicts, key=lambda c: c.time_step)
        for conflict in conflicts_sorted:
            resolved_paths = self._resolve_single_conflict(resolved_paths, conflict, executing_tasks)
        self.logger.info(f"解决了 {len(conflicts)} ä¸ªè·¯å¾„冲突")
        return resolved_paths
    def _resolve_single_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """
        è§£å†³å•个冲突
        Args:
            paths: å½“前路径列表
            conflict: å†²çª
            executing_tasks: æ‰§è¡Œä¸­çš„任务列表
        Returns:
            List[Dict]: æ›´æ–°åŽçš„路径列表
        """
        if conflict.type == "vertex":
            return self._resolve_vertex_conflict(paths, conflict, executing_tasks)
        elif conflict.type == "edge":
            return self._resolve_edge_conflict(paths, conflict, executing_tasks)
        elif conflict.type == "follow":
            return self._resolve_following_conflict(paths, conflict, executing_tasks)
        return paths
    def _resolve_vertex_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """解决顶点冲突 - åŸºäºŽä¼˜å…ˆçº§è¯„估选择让步的AGV"""
        updated_paths = paths.copy()
        # æ‰¾åˆ°å†²çªçš„AGV路径
        agv1_path = None
        agv2_path = None
        agv1_index = agv2_index = -1
        for i, path in enumerate(updated_paths):
            if path.get('agvId') == conflict.agv1:
                agv1_path = path
                agv1_index = i
            elif path.get('agvId') == conflict.agv2:
                agv2_path = path
                agv2_index = i
        if not agv1_path or not agv2_path:
            return updated_paths
        # è¯„估两个AGV的优先级
        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
        # é€‰æ‹©ä¼˜å…ˆçº§è¾ƒä½Žçš„AGV进行等待
        if agv1_priority.priority_score <= agv2_priority.priority_score:
            waiting_agv_path = agv1_path
            waiting_agv_index = agv1_index
            waiting_agv_id = conflict.agv1
            higher_priority_agv_id = conflict.agv2
        else:
            waiting_agv_path = agv2_path
            waiting_agv_index = agv2_index
            waiting_agv_id = conflict.agv2
            higher_priority_agv_id = conflict.agv1
        # è®°å½•详细的避让决策信息
        self.logger.info(f"顶点冲突避让决策 - ä½ç½®: {conflict.position1}, æ—¶é—´: {conflict.time_step}")
        self.logger.info(f"  AGV {conflict.agv1}: {agv1_priority.explanation}")
        self.logger.info(f"  AGV {conflict.agv2}: {agv2_priority.explanation}")
        self.logger.info(f"  å†³ç­–: AGV {waiting_agv_id} è®©æ­¥ç»™ AGV {higher_priority_agv_id}")
        # åœ¨å†²çªä½ç½®ä¹‹å‰æ’入等待步骤
        self._insert_wait_step(waiting_agv_path, conflict.time_step)
        updated_paths[waiting_agv_index] = waiting_agv_path
        return updated_paths
    def _resolve_edge_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """解决边冲突 - åŸºäºŽä¼˜å…ˆçº§è¯„估选择让步的AGV"""
        updated_paths = paths.copy()
        # æ‰¾åˆ°å†²çªçš„AGV路径
        agv1_path = None
        agv2_path = None
        agv1_index = agv2_index = -1
        for i, path in enumerate(updated_paths):
            if path.get('agvId') == conflict.agv1:
                agv1_path = path
                agv1_index = i
            elif path.get('agvId') == conflict.agv2:
                agv2_path = path
                agv2_index = i
        if not agv1_path or not agv2_path:
            return updated_paths
        # è¯„估两个AGV的优先级
        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
        # é€‰æ‹©ä¼˜å…ˆçº§è¾ƒä½Žçš„AGV进行等待
        if agv1_priority.priority_score <= agv2_priority.priority_score:
            waiting_agv_path = agv1_path
            waiting_agv_index = agv1_index
            waiting_agv_id = conflict.agv1
            higher_priority_agv_id = conflict.agv2
        else:
            waiting_agv_path = agv2_path
            waiting_agv_index = agv2_index
            waiting_agv_id = conflict.agv2
            higher_priority_agv_id = conflict.agv1
        # è®°å½•详细的避让决策信息
        self.logger.info(f"边冲突避让决策 - ä½ç½®äº¤æ¢: {conflict.position1}<->{conflict.position2}, æ—¶é—´: {conflict.time_step}")
        self.logger.info(f"  AGV {conflict.agv1}: {agv1_priority.explanation}")
        self.logger.info(f"  AGV {conflict.agv2}: {agv2_priority.explanation}")
        self.logger.info(f"  å†³ç­–: AGV {waiting_agv_id} è®©æ­¥ç»™ AGV {higher_priority_agv_id}")
        # åœ¨å†²çªä½ç½®ä¹‹å‰æ’入等待步骤
        self._insert_wait_step(waiting_agv_path, conflict.time_step)
        updated_paths[waiting_agv_index] = waiting_agv_path
        return updated_paths
    def _resolve_following_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """解决跟随冲突 - åŸºäºŽä¼˜å…ˆçº§è¯„估选择让步的AGV"""
        updated_paths = paths.copy()
        # æ‰¾åˆ°å†²çªçš„AGV路径
        agv1_path = None
        agv2_path = None
        agv1_index = agv2_index = -1
        for i, path in enumerate(updated_paths):
            if path.get('agvId') == conflict.agv1:
                agv1_path = path
                agv1_index = i
            elif path.get('agvId') == conflict.agv2:
                agv2_path = path
                agv2_index = i
        if not agv1_path or not agv2_path:
            return updated_paths
        # è¯„估两个AGV的优先级
        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
        # é€‰æ‹©ä¼˜å…ˆçº§è¾ƒä½Žçš„AGV进行等待
        if agv1_priority.priority_score <= agv2_priority.priority_score:
            waiting_agv_path = agv1_path
            waiting_agv_index = agv1_index
            waiting_agv_id = conflict.agv1
            higher_priority_agv_id = conflict.agv2
        else:
            waiting_agv_path = agv2_path
            waiting_agv_index = agv2_index
            waiting_agv_id = conflict.agv2
            higher_priority_agv_id = conflict.agv1
        # è®°å½•详细的避让决策信息
        self.logger.info(f"跟随冲突避让决策 - è·ç¦»è¿‡è¿‘, æ—¶é—´: {conflict.time_step}")
        self.logger.info(f"  AGV {conflict.agv1} (位置: {conflict.position1}): {agv1_priority.explanation}")
        self.logger.info(f"  AGV {conflict.agv2} (位置: {conflict.position2}): {agv2_priority.explanation}")
        self.logger.info(f"  å†³ç­–: AGV {waiting_agv_id} è®©æ­¥ç»™ AGV {higher_priority_agv_id}")
        # åœ¨å†²çªä½ç½®ä¹‹å‰æ’入等待步骤
        self._insert_wait_step(waiting_agv_path, conflict.time_step)
        updated_paths[waiting_agv_index] = waiting_agv_path
        return updated_paths
    def _insert_wait_step(self, path: Dict, time_step: int):
        """
        åœ¨æŒ‡å®šæ—¶é—´æ­¥æ’入等待步骤
        Args:
            path: AGV路径
            time_step: æ’入位置的时间步
        """
        code_list = path.get('codeList', [])
        if time_step < len(code_list) and time_step > 0:
            # åœ¨æŒ‡å®šä½ç½®æ’入等待步骤(重复前一个位置)
            prev_code = code_list[time_step - 1]
            # ç¡®ä¿å¤åˆ¶æ‰€æœ‰å­—段
            if isinstance(prev_code, dict):
                wait_code = prev_code.copy()
            else:
                wait_code = {
                    'code': prev_code.code,
                    'direction': prev_code.direction
                }
            code_list.insert(time_step, wait_code)
            path['codeList'] = code_list
    def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]:
        """
        éªŒè¯å¹¶ä¿®æ­£è·¯å¾„,确保AGV只能以0、90、180、270度移动
        Args:
            planned_paths: è§„划路径列表
        Returns:
            List[Dict]: ä¿®æ­£åŽçš„路径列表
        """
        validated_paths = []
        for path_data in planned_paths:
            validated_path = self._validate_single_path_directions(path_data)
            validated_paths.append(validated_path)
        return validated_paths
    def _validate_single_path_directions(self, path_data: Dict) -> Dict:
        """
        éªŒè¯å•个路径的移动方向
        Args:
            path_data: å•个AGV路径数据
        Returns:
            Dict: ä¿®æ­£åŽçš„路径数据
        """
        validated_path = path_data.copy()
        code_list = path_data.get('codeList', [])
        if len(code_list) < 2:
            return validated_path
        validated_code_list = []
        for i in range(len(code_list)):
            current_code = code_list[i]
            if isinstance(current_code, dict):
                position = current_code.get('code', '')
                direction = current_code.get('direction', '90')
                # ä¿ç•™åŽŸæœ‰çš„æ‰€æœ‰å­—æ®µ
                validated_code = current_code.copy()
            else:
                position = current_code.code
                direction = current_code.direction
                # è½¬æ¢ä¸ºå­—典格式并保留基本字段
                validated_code = {
                    'code': position,
                    'direction': direction
                }
            # å¦‚果不是第一个点,计算正确的方向
            if i > 0:
                prev_code = code_list[i - 1]
                prev_position = prev_code.get('code', '') if isinstance(prev_code, dict) else prev_code.code
                # è®¡ç®—移动方向
                correct_direction = self._calculate_movement_direction(prev_position, position)
                if correct_direction:
                    direction = correct_direction
            # ç¡®ä¿æ–¹å‘是有效的四方向之一
            direction = self._normalize_direction(direction)
            # æ›´æ–°æ–¹å‘,保留其他所有字段
            validated_code['direction'] = direction
            validated_code_list.append(validated_code)
        validated_path['codeList'] = validated_code_list
        return validated_path
    def _calculate_movement_direction(self, from_position: str, to_position: str) -> Optional[str]:
        """
        è®¡ç®—从一个位置到另一个位置的移动方向
        Args:
            from_position: èµ·å§‹ä½ç½®ç 
            to_position: ç›®æ ‡ä½ç½®ç 
        Returns:
            Optional[str]: ç§»åŠ¨æ–¹å‘ï¼ˆ0, 90, 180, 270)
        """
        from_coord = get_coordinate_from_path_id(from_position, self.path_mapping)
        to_coord = get_coordinate_from_path_id(to_position, self.path_mapping)
        if not from_coord or not to_coord:
            return None
        dx = to_coord[0] - from_coord[0]
        dy = to_coord[1] - from_coord[1]
        # åªå…è®¸å››ä¸ªæ–¹å‘的移动
        if dx > 0 and dy == 0:
            return "0"    # å‘东
        elif dx < 0 and dy == 0:
            return "180"  # å‘西
        elif dx == 0 and dy > 0:
            return "90"   # å‘南
        elif dx == 0 and dy < 0:
            return "270"  # å‘北
        else:
            # ä¸æ˜¯æ ‡å‡†å››æ–¹å‘移动,返回默认方向
            return "90"
    def _normalize_direction(self, direction: str) -> str:
        """
        æ ‡å‡†åŒ–方向值,确保只有0、90、180、270
        Args:
            direction: åŽŸå§‹æ–¹å‘å€¼
        Returns:
            str: æ ‡å‡†åŒ–后的方向值
        """
        try:
            angle = int(direction) % 360
            # å°†è§’度映射到最近的四方向
            if angle <= 45 or angle > 315:
                return "0"
            elif 45 < angle <= 135:
                return "90"
            elif 135 < angle <= 225:
                return "180"
            else:
                return "270"
        except:
            return "90"  # é»˜è®¤æ–¹å‘
algorithm_system/algorithms/path_planning.py
New file
@@ -0,0 +1,1131 @@
"""
路径规划算法
"""
import heapq
import random
import math
import time
import logging
from typing import Dict, List, Tuple, Optional, Set, Any
from collections import defaultdict, deque
from common.data_models import PlannedPath, PathCode, AGVStatus, BackpackData
from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance, generate_segment_id, generate_navigation_code
from algorithm_system.algorithms.collision_detection import CollisionDetector, CollisionResolver
class ExecutingTaskExtractor:
    """执行中任务提取器"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
        """
        åˆå§‹åŒ–任务提取器
        Args:
            path_mapping: è·¯å¾„点映射字典
        """
        self.path_mapping = path_mapping
        self.logger = logging.getLogger(__name__)
        # é¢„计算坐标映射
        self.coord_to_code = {(data['x'], data['y']): code for code, data in path_mapping.items()}
        # é¢„分类位置类型
        self.pickup_positions = [pos for pos in path_mapping.keys() if pos.startswith("1")]
        self.charging_positions = [pos for pos in path_mapping.keys() if pos.startswith("2")]
        self.delivery_positions = [pos for pos in path_mapping.keys() if pos.startswith("3")]
        self.standby_positions = [pos for pos in path_mapping.keys() if pos.startswith("4")]
    def extract_optimized_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
        """智能提取AGV执行任务"""
        executing_tasks = []
        for agv_status in agv_status_list:
            agv_id = agv_status.agvId
            current_position = agv_status.position
            if not agv_status.backpack:
                continue
            optimized_task = self._analyze_all_backpack_tasks(
                agv_id, current_position, agv_status.backpack, agv_status
            )
            if optimized_task:
                executing_tasks.append(optimized_task)
        self.logger.info(f"智能分析 {len(executing_tasks)} ä¸ªAGV的背篓任务并生成最优路径")
        return executing_tasks
    def _analyze_all_backpack_tasks(self, agv_id: str, current_position: str,
                                   backpack: List[BackpackData], agv_status: AGVStatus) -> Optional[Dict]:
        """
        åˆ†æžæ‰€æœ‰èƒŒç¯“任务,确定最优执行策略
        """
        # 1. æå–并分类所有有效任务
        loaded_tasks = []      # å·²è£…载的任务
        unloaded_tasks = []    # æœªè£…载的任务
        all_tasks = []         # æ‰€æœ‰ä»»åŠ¡
        self.logger.debug(f"[AGV {agv_id}] å¼€å§‹åˆ†æžèƒŒç¯“任务,位置: {current_position}")
        for i, bp in enumerate(backpack):
            self.logger.debug(f"[AGV {agv_id}] èƒŒç¯“位置{i+1}: taskId={bp.taskId}, loaded={bp.loaded}, execute={bp.execute}")
            if not bp.taskId:
                self.logger.debug(f"[AGV {agv_id}] èƒŒç¯“位置{i+1}: è·³è¿‡ï¼ˆæ— ä»»åŠ¡ID)")
                continue
            # å°è¯•解析任务详情
            task_info = self._parse_task_details(bp.taskId, current_position)
            if not task_info:
                self.logger.warning(f"[AGV {agv_id}] ä»»åŠ¡ {bp.taskId} è§£æžå¤±è´¥ï¼Œè·³è¿‡")
                continue
            task_data = {
                'backpack_item': bp,
                'task_info': task_info,
                'distance_to_start': task_info.get('distance_to_start', float('inf')),
                'distance_to_end': task_info.get('distance_to_end', float('inf'))
            }
            all_tasks.append(task_data)
            if bp.loaded:
                loaded_tasks.append(task_data)
                self.logger.debug(f"[AGV {agv_id}] ä»»åŠ¡ {bp.taskId} å·²è£…载,距离终点: {task_info.get('distance_to_end', 'N/A')}")
            else:
                unloaded_tasks.append(task_data)
                self.logger.debug(f"[AGV {agv_id}] ä»»åŠ¡ {bp.taskId} æœªè£…载,距离起点: {task_info.get('distance_to_start', 'N/A')}")
        self.logger.debug(f"[AGV {agv_id}] ä»»åŠ¡åˆ†æžç»“æžœ: æ€»ä»»åŠ¡={len(all_tasks)}, å·²è£…è½½={len(loaded_tasks)}, æœªè£…è½½={len(unloaded_tasks)}")
        # 2. å¦‚果没有任何任务,返回None
        if not all_tasks:
            self.logger.debug(f"[AGV {agv_id}] æ²¡æœ‰æœ‰æ•ˆä»»åŠ¡ï¼Œè¿”å›žNone")
            return None
        # 3. å†³ç­–最优路径策略
        optimal_strategy = self._determine_next_best_action(
            loaded_tasks, unloaded_tasks, current_position
        )
        if not optimal_strategy:
            self.logger.debug(f"[AGV {agv_id}] å†³ç­–返回None,没有最优策略")
            return None
        self.logger.debug(f"[AGV {agv_id}] å†³ç­–完成: {optimal_strategy['strategy']} - {optimal_strategy['action']}")
        self.logger.debug(f"[AGV {agv_id}] ç›®æ ‡ä½ç½®: {optimal_strategy['target_position']}")
        self.logger.debug(f"[AGV {agv_id}] å†³ç­–原因: {optimal_strategy.get('reason', 'N/A')}")
        # 4. æž„建任务信息
        result = {
            'agvId': agv_id,
            'currentPosition': current_position,
            'strategy': optimal_strategy['strategy'],
            'next_action': optimal_strategy['action'],
            'target_position': optimal_strategy['target_position'],
            'primary_task': optimal_strategy['selected_task'],
            'all_loaded_tasks': loaded_tasks,
            'all_unloaded_tasks': unloaded_tasks,
            'total_tasks_count': len(all_tasks),
            'estimated_efficiency': optimal_strategy.get('efficiency_score', 0),
            'decision_reason': optimal_strategy.get('reason', ''),
            'agvDirection': agv_status.direction,
            'agvVoltage': agv_status.vol,
            'agvError': agv_status.error
        }
        self.logger.debug(f"[AGV {agv_id}] æœ€ç»ˆä»»åŠ¡ä¿¡æ¯æž„å»ºå®Œæˆï¼Œè¿”å›žç»“æžœ")
        return result
    def _determine_next_best_action(self, loaded_tasks: List[Dict],
                                   unloaded_tasks: List[Dict], current_position: str) -> Optional[Dict]:
        """
        ç¡®å®šAGV下一步最优行动
        Args:
            loaded_tasks: å·²è£…载的任务
            unloaded_tasks: æœªè£…载的任务
            current_position: å½“前位置
        Returns:
            Optional[Dict]: æœ€ä¼˜è¡ŒåŠ¨ç­–ç•¥
        """
        # é»˜è®¤ç­–略:综合优化决策
        # 1. å¦‚果有已装载任务,优先送货
        if loaded_tasks:
            nearest_delivery = min(loaded_tasks, key=lambda t: t['distance_to_end'])
            # 2. å¦‚果也有未装载任务,比较距离决策
            if unloaded_tasks:
                nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start'])
                # ç»¼åˆä¼˜åŒ–策略:如果取货点显著更近,优先取货;否则优先送货
                if nearest_pickup['distance_to_start'] * 1.5 < nearest_delivery['distance_to_end']:
                    return {
                        'strategy': 'comprehensive_optimization',
                        'action': 'pickup',
                        'selected_task': nearest_pickup,
                        'target_position': nearest_pickup['task_info']['start_position'],
                        'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position),
                        'reason': f"综合优化:取货点({nearest_pickup['distance_to_start']})比送货点({nearest_delivery['distance_to_end']})显著更近"
                    }
                else:
                    return {
                        'strategy': 'comprehensive_optimization',
                        'action': 'deliver',
                        'selected_task': nearest_delivery,
                        'target_position': nearest_delivery['task_info']['end_position'],
                        'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position),
                        'reason': f"综合优化:优先送达已装载货物,送货距离{nearest_delivery['distance_to_end']}"
                    }
            else:
                # åªæœ‰å·²è£…载任务,直接送货
                return {
                    'strategy': 'comprehensive_optimization',
                    'action': 'deliver',
                    'selected_task': nearest_delivery,
                    'target_position': nearest_delivery['task_info']['end_position'],
                    'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position),
                    'reason': f"综合优化:送达唯一已装载货物,距离{nearest_delivery['distance_to_end']}"
                }
        # 3. å¦‚果只有未装载任务,取货
        elif unloaded_tasks:
            nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start'])
            return {
                'strategy': 'comprehensive_optimization',
                'action': 'pickup',
                'selected_task': nearest_pickup,
                'target_position': nearest_pickup['task_info']['start_position'],
                'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position),
                'reason': f"综合优化:取货唯一未装载任务,距离{nearest_pickup['distance_to_start']}"
            }
        # 4. æ²¡æœ‰ä»»åŠ¡
        return None
    def _parse_task_details(self, task_id: str, current_position: str) -> Optional[Dict]:
        """
        è§£æžä»»åŠ¡è¯¦ç»†ä¿¡æ¯ï¼ŒåŒ…æ‹¬èµ·ç»ˆç‚¹å’Œè·ç¦»
        Args:
            task_id: ä»»åŠ¡ID
            current_position: å½“前位置
        Returns:
            Optional[Dict]: ä»»åŠ¡è¯¦ç»†ä¿¡æ¯
        """
        self.logger.debug(f"开始解析任务 {task_id},当前位置: {current_position}")
        start_pos, end_pos = self._fast_parse_task_start_end(task_id)
        if not start_pos or not end_pos:
            self.logger.warning(f"任务 {task_id} è§£æžå¤±è´¥: start_pos={start_pos}, end_pos={end_pos}")
            return None
        self.logger.debug(f"任务 {task_id} è§£æžæˆåŠŸ: start={start_pos}, end={end_pos}")
        # è®¡ç®—距离
        current_coord = get_coordinate_from_path_id(current_position, self.path_mapping)
        start_coord = get_coordinate_from_path_id(start_pos, self.path_mapping)
        end_coord = get_coordinate_from_path_id(end_pos, self.path_mapping)
        if not all([current_coord, start_coord, end_coord]):
            return None
        distance_to_start = calculate_manhattan_distance(current_coord, start_coord)
        distance_to_end = calculate_manhattan_distance(current_coord, end_coord)
        return {
            'task_id': task_id,
            'start_position': start_pos,
            'end_position': end_pos,
            'distance_to_start': distance_to_start,
            'distance_to_end': distance_to_end,
            'start_coord': start_coord,
            'end_coord': end_coord
        }
    def _calculate_delivery_efficiency(self, task_data: Dict, current_position: str) -> float:
        """
        è®¡ç®—送货效率分数
        Args:
            task_data: ä»»åŠ¡æ•°æ®
            current_position: å½“前位置
        Returns:
            float: æ•ˆçŽ‡åˆ†æ•° (0-10)
        """
        distance = task_data['distance_to_end']
        # è·ç¦»è¶Šè¿‘效率越高
        distance_score = max(0, 10 - distance / 5)
        # å·²è£…载的任务有额外奖励
        loaded_bonus = 3.0 if task_data['backpack_item'].loaded else 0
        return distance_score + loaded_bonus
    def _calculate_pickup_efficiency(self, task_data: Dict, current_position: str) -> float:
        """
        è®¡ç®—取货效率分数
        Args:
            task_data: ä»»åŠ¡æ•°æ®
            current_position: å½“前位置
        Returns:
            float: æ•ˆçŽ‡åˆ†æ•° (0-10)
        """
        distance_to_start = task_data['distance_to_start']
        # è·ç¦»è¶Šè¿‘效率越高
        distance_score = max(0, 10 - distance_to_start / 3)
        # å¾ˆè¿‘的取货点有高奖励
        if distance_to_start <= 3:
            distance_score += 5.0
        elif distance_to_start <= 5:
            distance_score += 2.0
        return distance_score
    def _fast_parse_task_start_end(self, task_id: str) -> Tuple[Optional[str], Optional[str]]:
        """
        è§£æžä»»åŠ¡èµ·ç‚¹å’Œç»ˆç‚¹
        æ”¯æŒä¸¤ç§æ ¼å¼ï¼š
        1. æ ‡å‡†æ ¼å¼ï¼šxxx_start_end
        2. ç®€å•格式:数字ID(通过hash分配位置)
        Args:
            task_id: ä»»åŠ¡ID
        Returns:
            Tuple[Optional[str], Optional[str]]: (起点, ç»ˆç‚¹)
        """
        self.logger.debug(f"解析任务ID: {task_id}")
        # æ–¹æ³•1:尝试标准格式解析(包含下划线)
        if "_" in task_id:
            parts = task_id.split("_")
            self.logger.debug(f"任务ID包含下划线,分割结果: {parts}")
            if len(parts) >= 3:
                potential_start = parts[-2]
                potential_end = parts[-1]
                start_valid = potential_start in self.path_mapping
                end_valid = potential_end in self.path_mapping
                self.logger.debug(f"标准格式解析结果: start={potential_start}(valid={start_valid}), end={potential_end}(valid={end_valid})")
                if start_valid and end_valid:
                    return potential_start, potential_end
            self.logger.debug(f"任务ID分割后部分数量不足: {len(parts)} < 3")
        # æ–¹æ³•2:简单数字ID的备选机制(通过hash分配位置)
        self.logger.debug(f"尝试简单数字ID备选解析机制")
        # èŽ·å–æ‰€æœ‰å¯ç”¨ä½ç½®
        all_positions = list(self.path_mapping.keys())
        if len(all_positions) < 2:
            self.logger.warning(f"路径映射位置不足,无法为任务分配起终点")
            return None, None
        # ä½¿ç”¨ä»»åŠ¡ID的hash值来分配起点和终点
        try:
            task_hash = abs(hash(task_id))
            # åˆ†é…èµ·ç‚¹ï¼ˆä½¿ç”¨hash的前一半)
            start_index = task_hash % len(all_positions)
            start_pos = all_positions[start_index]
            # åˆ†é…ç»ˆç‚¹ï¼ˆä½¿ç”¨hash的后一半,确保与起点不同)
            end_index = (task_hash // len(all_positions)) % len(all_positions)
            if end_index == start_index:
                end_index = (end_index + 1) % len(all_positions)
            end_pos = all_positions[end_index]
            self.logger.debug(f"简单ID备选解析成功: ä»»åŠ¡{task_id} -> start={start_pos}, end={end_pos}")
            return start_pos, end_pos
        except Exception as e:
            self.logger.error(f"简单ID备选解析失败: {e}")
        self.logger.warning(f"任务ID {task_id} æ‰€æœ‰è§£æžæ–¹æ³•都失败,返回 None, None")
        return None, None
    def _fast_parse_target(self, task_id: str, current_position: str, loaded: bool) -> Optional[str]:
        """
        è§£æžä»»åŠ¡ç›®æ ‡ä½ç½®
        Args:
            task_id: ä»»åŠ¡ID
            current_position: å½“前位置
            loaded: æ˜¯å¦å·²è£…è½½
        Returns:
            Optional[str]: ç›®æ ‡ä½ç½®ç 
        """
        _, task_end = self._fast_parse_task_start_end(task_id)
        if task_end:
            return task_end
        # åŸºäºŽhash值和装载状态快速选择
        hash_val = hash(task_id) % 1000
        if loaded:
            # å·²è£…载,选择出库区域
            target_positions = [pos for pos in self.delivery_positions if pos != current_position]
            if not target_positions:
                target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
        else:
            # æœªè£…载,选择仓库区域
            target_positions = [pos for pos in self.pickup_positions if pos != current_position]
            if not target_positions:
                target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
        if target_positions:
            return target_positions[hash_val % len(target_positions)]
        return None
    def extract_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
        """提取所有AGV正在执行的任务"""
        self.logger.debug(f"开始转换任务为标准格式,AGV数量: {len(agv_status_list)}")
        optimized_tasks = self.extract_optimized_executing_tasks(agv_status_list)
        self.logger.debug(f"提取结果数量: {len(optimized_tasks)}")
        legacy_tasks = []
        for i, task in enumerate(optimized_tasks):
            try:
                self.logger.debug(f"转换任务 #{i+1}: AGV {task['agvId']}")
                primary_task = task['primary_task']
                task_info = primary_task['task_info']
                backpack_item = primary_task['backpack_item']
                self.logger.debug(f"  - primary_task存在: {primary_task is not None}")
                self.logger.debug(f"  - task_info存在: {task_info is not None}")
                self.logger.debug(f"  - backpack_item存在: {backpack_item is not None}")
                # æ ¹æ®action确定状态
                is_pickup_action = task['next_action'] == 'pickup'
                self.logger.debug(f"  - next_action: {task['next_action']}")
                self.logger.debug(f"  - is_pickup_action: {is_pickup_action}")
                self.logger.debug(f"  - target_position: {task['target_position']}")
                # ä¿æŒåŽŸæœ‰çš„æ ‡å‡†JSON格式
                legacy_task = {
                    'agvId': task['agvId'],
                    'taskId': task_info['task_id'],
                    'currentPosition': task['currentPosition'],
                    'backpackIndex': backpack_item.index,
                    'status': 'executing',
                    'agvDirection': task['agvDirection'],
                    'agvVoltage': task['agvVoltage'],
                    'agvError': task['agvError'],
                    'loaded': backpack_item.loaded,
                    'needPickup': is_pickup_action,
                    'targetPosition': task['target_position'],
                    'taskStart': task_info['start_position'],
                    'taskEnd': task_info['end_position']
                }
                self.logger.debug(f"  - ä»»åŠ¡æž„å»ºæˆåŠŸ: {legacy_task['agvId']} -> {legacy_task['targetPosition']}")
                legacy_tasks.append(legacy_task)
            except Exception as e:
                self.logger.error(f"转换任务 #{i+1} å¤±è´¥: {e}")
                self.logger.error(f"任务内容: {task}")
                continue
        self.logger.info(f"使用综合优化策略,为 {len(legacy_tasks)} ä¸ªAGV生成标准格式路径")
        return legacy_tasks
class BatchPathPlanner:
    """批量路径规划器"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]],
                 algorithm_type: str = "A_STAR", agv_manager=None):
        """
        åˆå§‹åŒ–批量路径规划器
        Args:
            path_mapping: è·¯å¾„点映射字典
            algorithm_type: åŸºç¡€è·¯å¾„规划算法类型
            agv_manager: ç”¨äºŽèŽ·å–AGV状态信息
        """
        self.path_mapping = path_mapping
        self.algorithm_type = algorithm_type
        self.agv_manager = agv_manager
        self.logger = logging.getLogger(__name__)
        # åˆå§‹åŒ–优化组件
        self.task_extractor = ExecutingTaskExtractor(path_mapping)
        self.collision_detector = FastCollisionDetector(min_distance=3.0)
        self.collision_resolver = FastCollisionResolver(path_mapping, self.collision_detector, agv_manager)
        self.base_planner = PathPlanningFactory.create_path_planner(algorithm_type, path_mapping)
    def plan_all_agv_paths(self, agv_status_list: List[AGVStatus],
                          include_idle_agv: bool = False,
                          constraints: List[Tuple[int, int, float]] = None) -> Dict:
        """
        ä¸ºæ‰€æœ‰AGV规划路径(每个AGV只生成一个路径)
        Args:
            agv_status_list: AGV状态列表
            include_idle_agv: æ˜¯å¦åŒ…含空闲AGV
            constraints: è·¯å¾„约束条件
        Returns:
            Dict: åŒ…含所有路径信息的结果
        """
        start_time = time.time()
        # 1. æå–执行中的任务(使用向后兼容的方法)
        executing_tasks = self.task_extractor.extract_executing_tasks(agv_status_list)
        # 2. è·¯å¾„规划
        planned_paths = []
        total_planning_time = 0
        planned_agv_ids = set()
        for task in executing_tasks:
            agv_id = task['agvId']
            current_pos = task['currentPosition']
            target_pos = task['targetPosition']
            task_id = task['taskId']
            self.logger.debug(f"开始路径规划: AGV {agv_id}, ä»»åŠ¡ {task_id}")
            self.logger.debug(f"  - current_pos: {current_pos} ({type(current_pos)})")
            self.logger.debug(f"  - target_pos: {target_pos} ({type(target_pos)})")
            # é¿å…é‡å¤è§„划
            if agv_id in planned_agv_ids:
                self.logger.debug(f"  - è·³è¿‡AGV {agv_id}:已规划")
                continue
            # éªŒè¯ä½ç½®æ ¼å¼
            if not current_pos or not target_pos:
                self.logger.error(f"  - AGV {agv_id} ä½ç½®ä¿¡æ¯æ— æ•ˆ: current={current_pos}, target={target_pos}")
                continue
            # è·¯å¾„格式标准化
            from common.utils import normalize_path_id
            normalized_current = normalize_path_id(current_pos)
            normalized_target = normalize_path_id(target_pos)
            self.logger.debug(f"  - æ ¼å¼æ ‡å‡†åŒ–: {current_pos} -> {normalized_current}, {target_pos} -> {normalized_target}")
            # è·¯å¾„规划
            path_start_time = time.time()
            self.logger.debug(f"  - è°ƒç”¨base_planner.plan_path({normalized_current}, {normalized_target})")
            planned_path = self.base_planner.plan_path(normalized_current, normalized_target, constraints)
            path_end_time = time.time()
            planning_time = (path_end_time - path_start_time) * 1000
            total_planning_time += planning_time
            self.logger.debug(f"  - è·¯å¾„规划结果: {planned_path is not None}")
            if planned_path:
                self.logger.debug(f"  - è·¯å¾„长度: {len(planned_path.codeList) if hasattr(planned_path, 'codeList') else 'N/A'}")
            else:
                self.logger.error(f"  - AGV {agv_id} è·¯å¾„规划失败: {current_pos} -> {target_pos}")
            if planned_path:
                # ç”Ÿæˆè·¯å¾„å­—å…¸
                seg_id = generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2")
                path_dict = {
                    'agvId': agv_id,
                    'segId': seg_id,
                    'codeList': []
                }
                # è½¬æ¢è·¯å¾„点格式
                for i, path_code in enumerate(planned_path.codeList):
                    if isinstance(path_code, PathCode):
                        action_type = "2"  # ä»»åŠ¡ç±»åž‹
                        pos_type = None
                        backpack_level = task.get('backpackIndex', 0)
                        is_target_point = False
                        # ç¡®å®šä½ç½®ç±»åž‹ï¼šæœ€åŽä¸€ä¸ªç‚¹çš„动作类型
                        if i == len(planned_path.codeList) - 1:  # æœ€åŽä¸€ä¸ªè·¯å¾„点
                            is_target_point = True
                            if task.get('loaded', False):
                                # å·²è£…载任务:当前路径是去终点放货
                                pos_type = "2"  # æ”¾è´§
                                self.logger.debug(f"AGV {agv_id} åœ¨ç»ˆç‚¹ {path_code.code} è®¾ç½®æ”¾è´§åŠ¨ä½œ")
                            else:
                                # æœªè£…载任务:当前路径是去起点取货
                                pos_type = "1"  # å–è´§
                                self.logger.debug(f"AGV {agv_id} åœ¨èµ·ç‚¹ {path_code.code} è®¾ç½®å–货动作")
                        enhanced_code = generate_navigation_code(
                            code=path_code.code,
                            direction=path_code.direction,
                            action_type=action_type,
                            task_id=task_id,
                            pos_type=pos_type,
                            backpack_level=backpack_level,
                            is_target_point=is_target_point
                        )
                        path_dict['codeList'].append(enhanced_code)
                    else:
                        enhanced_code = generate_navigation_code(
                            code=path_code.get('code', ''),
                            direction=path_code.get('direction', '90'),
                            action_type="2",
                            task_id=task_id
                        )
                        path_dict['codeList'].append(enhanced_code)
                planned_paths.append(path_dict)
                planned_agv_ids.add(agv_id)
                self.logger.debug(f"AGV {agv_id} å¿«é€Ÿè§„划: {current_pos} -> {target_pos}")
        # 3. ç©ºé—²AGV处理
        if include_idle_agv:
            idle_paths = self._fast_plan_idle_agv_paths(agv_status_list, executing_tasks, constraints, planned_agv_ids)
            planned_paths.extend(idle_paths)
        # 4. åŽ»é‡æ£€æŸ¥
        final_paths = []
        final_agv_ids = set()
        for path in planned_paths:
            agv_id = path['agvId']
            if agv_id not in final_agv_ids:
                final_paths.append(path)
                final_agv_ids.add(agv_id)
        planned_paths = final_paths
        # 5. ç¢°æ’žæ£€æµ‹å’Œè§£å†³
        conflicts = self.collision_detector.detect_conflicts(planned_paths)
        if conflicts:
            self.logger.info(f"发现 {len(conflicts)} ä¸ªç¢°æ’žï¼Œå¿«é€Ÿè§£å†³ä¸­...")
            planned_paths = self.collision_resolver.resolve_conflicts(planned_paths, conflicts, executing_tasks)
            remaining_conflicts = self.collision_detector.detect_conflicts(planned_paths)
        else:
            remaining_conflicts = []
        end_time = time.time()
        total_time = (end_time - start_time) * 1000
        # 6. æž„建优化结果
        result = {
            'totalAgvs': len(agv_status_list),
            'executingTasksCount': len(executing_tasks),
            'plannedPathsCount': len(planned_paths),
            'totalPlanningTime': round(total_time, 2),
            'pathPlanningTime': round(total_planning_time, 2),
            'conflictsDetected': len(conflicts),
            'remainingConflicts': len(remaining_conflicts),
            'algorithm': f"Optimized_{self.algorithm_type}",
            'fourDirectionCompliant': True,
            'collisionFree': len(remaining_conflicts) == 0,
            'plannedPaths': planned_paths,
            'executingTasks': executing_tasks
        }
        self.logger.info(f"优化批量路径规划完成 - æ€»AGV: {result['totalAgvs']}, "
                        f"执行中任务: {result['executingTasksCount']}, "
                        f"规划路径: {result['plannedPathsCount']}, "
                        f"总耗时: {result['totalPlanningTime']:.2f}ms, "
                        f"无碰撞: {result['collisionFree']}")
        return result
    def _fast_plan_idle_agv_paths(self, agv_status_list: List[AGVStatus],
                                 executing_tasks: List[Dict],
                                 constraints: List[Tuple[int, int, float]] = None,
                                 planned_agv_ids: Set[str] = None) -> List[Dict]:
        """
        ä¸ºç©ºé—²AGV规划路径
        """
        idle_paths = []
        executing_agv_ids = {task['agvId'] for task in executing_tasks}
        if planned_agv_ids is None:
            planned_agv_ids = executing_agv_ids.copy()
        for agv_status in agv_status_list:
            agv_id = agv_status.agvId
            if agv_id in planned_agv_ids:
                continue
            if self._is_agv_idle_fast(agv_status):
                current_pos = agv_status.position
                target_pos = self._get_idle_agv_target_fast(agv_id, current_pos)
                if target_pos and target_pos != current_pos:
                    planned_path = self.base_planner.plan_path(current_pos, target_pos, constraints)
                    if planned_path:
                        action_type = "3" if self._is_charging_path(target_pos) else "4"
                        seg_id = generate_segment_id(agv_id=agv_id, target_position=target_pos, action_type=action_type)
                        path_dict = {
                            'agvId': agv_id,
                            'segId': seg_id,
                            'codeList': []
                        }
                        for i, path_code in enumerate(planned_path.codeList):
                            if isinstance(path_code, PathCode):
                                # ç©ºé—²AGV不需要taskId、posType和lev字段
                                enhanced_code = generate_navigation_code(
                                    code=path_code.code,
                                    direction=path_code.direction,
                                    action_type=action_type,
                                    is_target_point=False  # ç©ºé—²AGV路径无目标点概念
                                )
                                path_dict['codeList'].append(enhanced_code)
                        idle_paths.append(path_dict)
                        planned_agv_ids.add(agv_id)
        return idle_paths
    def _is_agv_idle_fast(self, agv_status: AGVStatus) -> bool:
        """判断AGV是否空闲"""
        if agv_status.status not in [0, 1, 2] or agv_status.error != 0:
            return False
        # æ£€æŸ¥èƒŒç¯“
        if agv_status.backpack:
            for item in agv_status.backpack[:2]:  # åªæ£€æŸ¥å‰ä¸¤ä¸ª
                if item.loaded and item.execute:
                    return False
        return True
    def _get_idle_agv_target_fast(self, agv_id: str, current_position: str) -> Optional[str]:
        """获取空闲AGV目标位置"""
        hash_val = hash(agv_id) % 1000
        # é€‰æ‹©é€»è¾‘
        charging_positions = [pos for pos in self.task_extractor.charging_positions if pos != current_position]
        if charging_positions:
            return charging_positions[hash_val % len(charging_positions)]
        standby_positions = [pos for pos in self.task_extractor.standby_positions if pos != current_position]
        if standby_positions:
            return standby_positions[hash_val % len(standby_positions)]
        all_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
        if all_positions:
            return all_positions[hash_val % len(all_positions)]
        return None
    def _is_charging_path(self, target_position: str) -> bool:
        """
        åˆ¤æ–­ç›®æ ‡ä½ç½®æ˜¯å¦ä¸ºå……电站(后期修改)
        """
        if not target_position:
            return False
        # å……电站识别
        return target_position.startswith("2") or "charge" in target_position.lower()
class PathPlanner:
    """路径规划器基类"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
        """
        åˆå§‹åŒ–路径规划器
        Args:
            path_mapping: è·¯å¾„点映射字典
        """
        self.path_mapping = path_mapping
        self.logger = logging.getLogger(__name__)
    def plan_path(self, start_code: str, end_code: str,
                 constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]:
        """
        è§„划从起点到终点的路径
        Args:
            start_code: èµ·å§‹è·¯å¾„点ID
            end_code: ç›®æ ‡è·¯å¾„点ID
            constraints: çº¦æŸæ¡ä»¶åˆ—表
        Returns:
            Optional[PlannedPath]: è§„划的路径,如果无法规划则返回None
        """
        raise NotImplementedError("子类必须实现此方法")
    def is_valid_path_point(self, code: str) -> bool:
        """
        æ£€æŸ¥è·¯å¾„点ID是否有效
        Args:
            code: è·¯å¾„点ID
        Returns:
            bool: æ˜¯å¦æœ‰æ•ˆ
        """
        return code in self.path_mapping
class AStarPathPlanner(PathPlanner):
    """A*路径规划器"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]], turn_cost: float = 0.5):
        """
        åˆå§‹åŒ–A*路径规划器
        Args:
            path_mapping: è·¯å¾„点映射字典
            turn_cost: è½¬å‘代价
        """
        super().__init__(path_mapping)
        self.turn_cost = turn_cost
        self.logger = logging.getLogger(__name__)
        # å››ä¸ªæ–¹å‘:上、右、下、左 (严格四方向移动)
        self.directions = [(0, -1), (1, 0), (0, 1), (-1, 0)]
        self.direction_angles = ["270", "0", "90", "180"]
        # é¢„计算坐标映射(必须在 _build_adjacency_list ä¹‹å‰åˆå§‹åŒ–)
        self.coord_to_code = {}
        for code, data in path_mapping.items():
            if isinstance(data, dict) and 'x' in data and 'y' in data:
                self.coord_to_code[(data['x'], data['y'])] = code
        # é¢„计算邻接表
        self.adjacency = self._build_adjacency_list()
        # æ€§èƒ½ç›‘控
        self.planning_count = 0
        self.total_planning_time = 0.0
        self.logger.debug(f"A*规划器初始化完成,邻接表: {len(self.adjacency)} ä¸ªèŠ‚ç‚¹")
    def _build_adjacency_list(self) -> Dict[str, List[Tuple[str, str]]]:
        """
        é¢„计算邻接表
        Returns:
            Dict[str, List[Tuple[str, str]]]: {节点: [(邻居节点, ç§»åŠ¨æ–¹å‘), ...]}
        """
        adjacency = defaultdict(list)
        for code, coord_data in self.path_mapping.items():
            x, y = coord_data['x'], coord_data['y']
            # æ£€æŸ¥å››ä¸ªæ–¹å‘的邻居
            for i, (dx, dy) in enumerate(self.directions):
                next_x, next_y = x + dx, y + dy
                # æŸ¥æ‰¾é‚»å±…代码
                neighbor_coord = (next_x, next_y)
                if neighbor_coord in self.coord_to_code:
                    neighbor_code = self.coord_to_code[neighbor_coord]
                    direction = self.direction_angles[i]
                    adjacency[code].append((neighbor_code, direction))
        return adjacency
    def plan_path(self, start_code: str, end_code: str,
                 constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]:
        """
        Args:
            start_code: èµ·å§‹è·¯å¾„点ID
            end_code: ç›®æ ‡è·¯å¾„点ID
            constraints: çº¦æŸæ¡ä»¶åˆ—表
        Returns:
            Optional[PlannedPath]: è§„划的路径
        """
        # éªŒè¯
        if not self.is_valid_path_point(start_code) or not self.is_valid_path_point(end_code):
            return None
        if start_code == end_code:
            return PlannedPath(agvId="", codeList=[PathCode(code=start_code, direction="90")])
        # èŽ·å–åæ ‡ï¼ˆä½¿ç”¨é¢„è®¡ç®—æ˜ å°„ï¼‰
        start_coord = (self.path_mapping[start_code]['x'], self.path_mapping[start_code]['y'])
        end_coord = (self.path_mapping[end_code]['x'], self.path_mapping[end_code]['y'])
        # A*实现
        # ä½¿ç”¨å…ƒç»„堆,减少对象创建开销
        open_set = [(0, start_code, None, [])]  # (f_score, code, parent, path)
        closed_set = set()
        g_scores = {start_code: 0}
        # é¢„计算约束检查函数
        constraint_check = self._build_constraint_checker(constraints) if constraints else None
        while open_set:
            f_score, current_code, parent_code, path = heapq.heappop(open_set)
            if current_code in closed_set:
                continue
            closed_set.add(current_code)
            current_path = path + [current_code]
            if current_code == end_code:
                # é‡å»ºè·¯å¾„
                return self._build_path_result(current_path)
            # ä½¿ç”¨é¢„计算的邻接表
            for neighbor_code, direction in self.adjacency[current_code]:
                if neighbor_code in closed_set:
                    continue
                # çº¦æŸæ£€æŸ¥
                if constraint_check and constraint_check(neighbor_code):
                    continue
                # ä»£ä»·è®¡ç®—
                g_cost = g_scores[current_code] + 1
                if neighbor_code not in g_scores or g_cost < g_scores[neighbor_code]:
                    g_scores[neighbor_code] = g_cost
                    # å¯å‘式计算
                    neighbor_coord = (self.path_mapping[neighbor_code]['x'],
                                    self.path_mapping[neighbor_code]['y'])
                    h_cost = abs(neighbor_coord[0] - end_coord[0]) + abs(neighbor_coord[1] - end_coord[1])
                    f_cost = g_cost + h_cost
                    heapq.heappush(open_set, (f_cost, neighbor_code, current_code, current_path))
        return None
    def _build_constraint_checker(self, constraints: List[Tuple[int, int, float]]):
        """构建约束检查函数"""
        def check_constraints(code: str) -> bool:
            coord = (self.path_mapping[code]['x'], self.path_mapping[code]['y'])
            for cx, cy, radius in constraints:
                if (coord[0] - cx) ** 2 + (coord[1] - cy) ** 2 <= radius ** 2:
                    return True
            return False
        return check_constraints
    def _build_path_result(self, path_codes: List[str]) -> PlannedPath:
        """构建路径结果"""
        result_codes = []
        for i, code in enumerate(path_codes):
            direction = "90"  # é»˜è®¤æ–¹å‘
            if i < len(path_codes) - 1:
                # ä½¿ç”¨é¢„计算的邻接表查找方向
                next_code = path_codes[i + 1]
                for neighbor_code, neighbor_direction in self.adjacency[code]:
                    if neighbor_code == next_code:
                        direction = neighbor_direction
                        break
            result_codes.append(PathCode(code=code, direction=direction))
        return PlannedPath(agvId="", codeList=result_codes)
# ç¢°æ’žæ£€æµ‹å™¨
class FastCollisionDetector:
    """碰撞检测器"""
    def __init__(self, min_distance: float = 3.0):
        self.min_distance = min_distance
        self.logger = logging.getLogger(__name__)
    def detect_conflicts(self, planned_paths: List[Dict]) -> List[Dict]:
        """
        ç¢°æ’žæ£€æµ‹ - åŸºäºŽæ—¶é—´æ­¥çš„优化检测
        Args:
            planned_paths: è§„划路径列表
        Returns:
            List[Dict]: å†²çªåˆ—表
        """
        conflicts = []
        if len(planned_paths) < 2:
            return conflicts
        # æž„建时间-位置映射
        time_positions = defaultdict(list)
        for path_data in planned_paths:
            agv_id = path_data['agvId']
            code_list = path_data.get('codeList', [])
            for time_step, path_code in enumerate(code_list):
                # å¿«é€Ÿæå–位置码
                if isinstance(path_code, dict):
                    position = path_code.get('code', '')
                elif hasattr(path_code, 'code'):
                    position = path_code.code
                else:
                    position = str(path_code)
                if position:
                    time_positions[time_step].append((agv_id, position))
        # å¿«é€Ÿå†²çªæ£€æµ‹
        for time_step, agv_positions in time_positions.items():
            if len(agv_positions) < 2:
                continue
            # ä½ç½®åˆ†ç»„检查
            position_groups = defaultdict(list)
            for agv_id, position in agv_positions:
                position_groups[position].append(agv_id)
            # å‘现冲突
            for position, agv_list in position_groups.items():
                if len(agv_list) > 1:
                    conflicts.append({
                        'type': 'position_conflict',
                        'time_step': time_step,
                        'position': position,
                        'agv_ids': agv_list,
                        'severity': 'high'
                    })
        self.logger.debug(f"快速检测到 {len(conflicts)} ä¸ªå†²çª")
        return conflicts
# ç¢°æ’žè§£å†³å™¨
class FastCollisionResolver:
    """碰撞解决器"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]],
                 collision_detector: FastCollisionDetector, agv_manager=None):
        self.path_mapping = path_mapping
        self.collision_detector = collision_detector
        self.agv_manager = agv_manager
        self.logger = logging.getLogger(__name__)
    def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Dict], executing_tasks: List[Dict] = None) -> List[Dict]:
        """
        è§£å†³å†²çª
        Args:
            planned_paths: è§„划路径列表
            conflicts: å†²çªåˆ—表
            executing_tasks: æ‰§è¡Œä»»åŠ¡åˆ—è¡¨
        Returns:
            List[Dict]: è§£å†³å†²çªåŽçš„路径列表
        """
        if not conflicts:
            return planned_paths
        # æž„建路径字典便于快速访问
        paths_dict = {path['agvId']: path for path in planned_paths}
        # æŒ‰æ—¶é—´æ­¥æŽ’序处理冲突
        sorted_conflicts = sorted(conflicts, key=lambda x: x.get('time_step', 0))
        for conflict in sorted_conflicts:
            if conflict['type'] == 'position_conflict':
                agv_ids = conflict['agv_ids']
                time_step = conflict['time_step']
                # ç®€å•策略:保留第一个AGV,其他延迟
                sorted_agv_ids = sorted(agv_ids)
                for i, agv_id in enumerate(sorted_agv_ids[1:], 1):
                    if agv_id in paths_dict:
                        self._add_delay_to_path(paths_dict[agv_id], delay_steps=i)
                        self.logger.debug(f"为AGV {agv_id} æ·»åŠ  {i} æ­¥å»¶è¿Ÿ")
        return list(paths_dict.values())
    def _add_delay_to_path(self, path_data: Dict, delay_steps: int):
        """为路径添加延迟步骤"""
        if delay_steps <= 0 or not path_data.get('codeList'):
            return
        # åœ¨è·¯å¾„开始处重复第一个位置
        first_code = path_data['codeList'][0]
        # åˆ›å»ºå»¶è¿Ÿä»£ç 
        if isinstance(first_code, dict):
            delay_code = first_code.copy()
        else:
            delay_code = {
                'code': first_code.code if hasattr(first_code, 'code') else str(first_code),
                'direction': first_code.direction if hasattr(first_code, 'direction') else "90"
            }
        # æ·»åŠ å»¶è¿Ÿæ­¥éª¤
        for _ in range(delay_steps):
            path_data['codeList'].insert(0, delay_code)
    def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]:
        """验证四向移动约束"""
        return planned_paths
class PathPlanningFactory:
    """路径规划器工厂类"""
    @staticmethod
    def create_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]]) -> PathPlanner:
        """
        åˆ›å»ºè·¯å¾„规划器实例
        Args:
            algorithm_type: ç®—法类型
            path_mapping: è·¯å¾„点映射字典
        Returns:
            PathPlanner: è·¯å¾„规划器实例
        """
        algorithm_upper = algorithm_type.upper()
        if algorithm_upper in ["A_STAR", "ASTAR", "A*", "DIJKSTRA", "GREEDY"]:
            return AStarPathPlanner(path_mapping)
        else:
            # é»˜è®¤A*算法
            logging.getLogger(__name__).warning(f"未知算法类型 {algorithm_type},使用默认A*算法")
            return AStarPathPlanner(path_mapping)
    @staticmethod
    def create_batch_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]],
                                 agv_manager=None) -> BatchPathPlanner:
        """
        åˆ›å»ºæ‰¹é‡è·¯å¾„规划器实例
        Args:
            algorithm_type: åŸºç¡€ç®—法类型
            path_mapping: è·¯å¾„点映射字典
            agv_manager: AGV管理器,用于获取AGV状态信息
        Returns:
            BatchPathPlanner: æ‰¹é‡è·¯å¾„规划器实例
        """
        return BatchPathPlanner(path_mapping, algorithm_type, agv_manager)
# å¯¼å‡ºæ ¸å¿ƒç±»å’Œå‡½æ•°
__all__ = [
    'ExecutingTaskExtractor',
    'BatchPathPlanner',
    'PathPlanner',
    'AStarPathPlanner',
    'PathPlanningFactory',
    'FastCollisionDetector',
    'FastCollisionResolver'
]
algorithm_system/algorithms/task_allocation.py
New file
@@ -0,0 +1,687 @@
"""
任务分配算法
"""
import time
import random
import logging
from typing import Dict, List, Tuple, Optional, Set, Any
from collections import defaultdict
from abc import ABC, abstractmethod
from common.data_models import TaskData, AGVStatus, TaskAssignment, BackpackData
from algorithm_system.models.agv_model import AGVModel, AGVModelManager
from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance
from dataclasses import dataclass
class TaskAllocation(ABC):
    """任务分配算法基类"""
    def __init__(self, agv_manager: AGVModelManager):
        """
        åˆå§‹åŒ–任务分配算法
        Args:
            agv_manager: AGV模型管理器
        """
        self.agv_manager = agv_manager
        self.logger = logging.getLogger(__name__)
    @abstractmethod
    def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
        """
        åˆ†é…ä»»åŠ¡ç»™AGV
        Args:
            tasks: å¾…分配的任务列表
        Returns:
            List[TaskAssignment]: åˆ†é…ç»“果列表
        """
        pass
    def find_available_backpack_slot(self, agv_status: AGVStatus) -> Optional[int]:
        """
        æŸ¥æ‰¾AGV的可用背篓位置
        Args:
            agv_status: AGV状态信息
        Returns:
            Optional[int]: å¯ç”¨çš„背篓位置编号,如果没有可用位置则返回None
        """
        if not agv_status.backpack:
            # å¦‚果没有背篓信息,假设从第一个位置开始
            self.logger.warning(f"AGV {agv_status.agvId} æ²¡æœ‰èƒŒç¯“信息,分配到位置0")
            return 0
        # æŸ¥æ‰¾ç©ºé—²ä¸”未执行任务的背篓位置
        for backpack_item in agv_status.backpack:
            if not backpack_item.loaded and not backpack_item.execute and not backpack_item.taskId:
                self.logger.debug(f"AGV {agv_status.agvId} æ‰¾åˆ°å¯ç”¨èƒŒç¯“位置: {backpack_item.index}")
                return backpack_item.index
        # å¦‚果所有位置都被占用,返回None
        self.logger.debug(f"AGV {agv_status.agvId} æ²¡æœ‰å¯ç”¨çš„背篓位置")
        return None
    def get_agv_available_capacity(self, agv_status: AGVStatus) -> int:
        """
        èŽ·å–AGV的可用背篓容量
        Args:
            agv_status: AGV状态信息
        Returns:
            int: å¯ç”¨èƒŒç¯“数量
        """
        if not agv_status.backpack:
            return 1  # å‡è®¾è‡³å°‘有一个背篓位置
        available_count = 0
        for backpack_item in agv_status.backpack:
            if not backpack_item.loaded and not backpack_item.execute and not backpack_item.taskId:
                available_count += 1
        return available_count
    def assign_task_with_backpack(self, agv_model, task: TaskData, lev_id: int) -> bool:
        """
        å°†ä»»åŠ¡åˆ†é…ç»™AGV的指定背篓位置
        Args:
            agv_model: AGV模型
            task: ä»»åŠ¡æ•°æ®
            lev_id: èƒŒç¯“位置编号
        Returns:
            bool: åˆ†é…æ˜¯å¦æˆåŠŸ
        """
        try:
            # ä½¿ç”¨AGV模型的assign_task方法
            success = agv_model.assign_task(
                task_id=task.taskId,
                priority=task.priority,
                start_code=task.start,
                end_code=task.end
            )
            if success:
                self.logger.info(f"任务 {task.taskId} æˆåŠŸåˆ†é…ç»™AGV {agv_model.agvId} çš„背篓位置 {lev_id}")
                return True
            else:
                self.logger.warning(f"任务 {task.taskId} åˆ†é…ç»™AGV {agv_model.agvId} å¤±è´¥")
                return False
        except Exception as e:
            self.logger.error(f"分配任务时发生异常: {e}")
            return False
class NearestFirstAllocation(TaskAllocation):
    """最近优先分配算法"""
    def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
        """
        ä½¿ç”¨æœ€è¿‘优先策略分配任务
        Args:
            tasks: å¾…分配的任务列表
        Returns:
            List[TaskAssignment]: åˆ†é…ç»“果列表
        """
        if not tasks:
            return []
        # èŽ·å–å¯ç”¨çš„AGV
        available_agvs = self.agv_manager.get_available_agvs()
        if not available_agvs:
            self.logger.warning("没有可用的AGV进行任务分配")
            return []
        # 1. é¦–先检查任务是否已经分配,避免重复分配
        already_assigned_tasks = set()
        all_agvs = self.agv_manager.get_all_agvs()
        for agv in all_agvs:
            if agv.backpack:
                for backpack_item in agv.backpack:
                    if backpack_item.taskId:
                        already_assigned_tasks.add(backpack_item.taskId)
                        self.logger.info(f"任务 {backpack_item.taskId} å·²åˆ†é…ç»™ AGV {agv.agvId},跳过重复分配")
        # 2. è¿‡æ»¤æŽ‰å·²åˆ†é…çš„任务
        unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
        if not unassigned_tasks:
            self.logger.info("所有任务都已分配,无需重新分配")
            return []
        self.logger.info(f"总任务数: {len(tasks)}, å·²åˆ†é…: {len(already_assigned_tasks)}, å¾…分配: {len(unassigned_tasks)}")
        assignments = []
        path_mapping = self.agv_manager.path_mapping
        # å¯¹æ¯ä¸ªä»»åŠ¡æ‰¾åˆ°æœ€è¿‘çš„AGV
        for task in unassigned_tasks:
            if not available_agvs:
                break
            # èŽ·å–ä»»åŠ¡èµ·ç‚¹åæ ‡
            task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
            if not task_start_coord:
                self.logger.warning(f"无法获取任务 {task.taskId} èµ·ç‚¹ {task.start} çš„坐标")
                continue
            # æ‰¾åˆ°è·ç¦»æœ€è¿‘çš„AGV
            nearest_agv = None
            min_distance = float('inf')
            for agv in available_agvs:
                if agv.coordinates:
                    distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
                    # å¦‚æžœAGV已有任务,计算完成当前任务后到新任务起点的距离
                    if agv.current_task_count > 0:
                        # ç®€åŒ–:假设AGV需要额外时间完成当前任务
                        distance += agv.current_task_count * 10
                    if distance < min_distance:
                        min_distance = distance
                        nearest_agv = agv
            if nearest_agv and nearest_agv.can_accept_task(task.priority):
                # èŽ·å–AGV的原始状态数据来查找可用背篓位置
                agv_status = None
                for agv_state in self.agv_manager.get_all_agv_status():
                    if agv_state.agvId == nearest_agv.agvId:
                        agv_status = agv_state
                        break
                if agv_status:
                    # æŸ¥æ‰¾å¯ç”¨çš„背篓位置
                    available_lev_id = self.find_available_backpack_slot(agv_status)
                    if available_lev_id is not None:
                        # åˆ†é…ä»»åŠ¡åˆ°æŒ‡å®šèƒŒç¯“ä½ç½®
                        success = self.assign_task_with_backpack(nearest_agv, task, available_lev_id)
                        if success:
                            assignments.append(TaskAssignment(
                                taskId=task.taskId,
                                agvId=nearest_agv.agvId,
                                lev_id=available_lev_id
                            ))
                            self.logger.info(f"任务 {task.taskId} åˆ†é…ç»™æœ€è¿‘çš„AGV {nearest_agv.agvId},背篓位置: {available_lev_id},距离: {min_distance}")
                            # æ£€æŸ¥AGV是否还有可用背篓位置
                            remaining_capacity = self.get_agv_available_capacity(agv_status) - 1
                            if remaining_capacity <= 0:
                                available_agvs.remove(nearest_agv)
                        else:
                            self.logger.warning(f"任务 {task.taskId} åˆ†é…ç»™AGV {nearest_agv.agvId} å¤±è´¥")
                    else:
                        self.logger.warning(f"AGV {nearest_agv.agvId} æ²¡æœ‰å¯ç”¨çš„背篓位置")
                        available_agvs.remove(nearest_agv)
                else:
                    self.logger.warning(f"无法获取AGV {nearest_agv.agvId} çš„状态信息")
        return assignments
class LoadBalancedAllocation(TaskAllocation):
    """负载均衡分配算法"""
    def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
        """
        ä½¿ç”¨è´Ÿè½½å‡è¡¡ç­–略分配任务
        Args:
            tasks: å¾…分配的任务列表
        Returns:
            List[TaskAssignment]: åˆ†é…ç»“果列表
        """
        if not tasks:
            return []
        # èŽ·å–æ‰€æœ‰AGV
        all_agvs = self.agv_manager.get_all_agvs()
        if not all_agvs:
            self.logger.warning("没有AGV进行任务分配")
            return []
        # 1. é¦–先检查任务是否已经分配,避免重复分配
        already_assigned_tasks = set()
        for agv in all_agvs:
            if agv.backpack:
                for backpack_item in agv.backpack:
                    if backpack_item.taskId:
                        already_assigned_tasks.add(backpack_item.taskId)
                        self.logger.info(f"任务 {backpack_item.taskId} å·²åˆ†é…ç»™ AGV {agv.agvId},跳过重复分配")
        # 2. è¿‡æ»¤æŽ‰å·²åˆ†é…çš„任务
        unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
        if not unassigned_tasks:
            self.logger.info("所有任务都已分配,无需重新分配")
            return []
        self.logger.info(f"总任务数: {len(tasks)}, å·²åˆ†é…: {len(already_assigned_tasks)}, å¾…分配: {len(unassigned_tasks)}")
        assignments = []
        path_mapping = self.agv_manager.path_mapping
        # æŒ‰ä¼˜å…ˆçº§æŽ’序任务
        sorted_tasks = sorted(unassigned_tasks, key=lambda t: t.priority, reverse=True)
        # å¯¹æ¯ä¸ªä»»åŠ¡åˆ†é…ç»™è´Ÿè½½æœ€ä½Žçš„AGV
        for task in sorted_tasks:
            # èŽ·å–ä»»åŠ¡èµ·ç‚¹åæ ‡
            task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
            if not task_start_coord:
                self.logger.warning(f"无法获取任务 {task.taskId} èµ·ç‚¹ {task.start} çš„坐标")
                continue
            # æŒ‰è´Ÿè½½å’Œè·ç¦»æŽ’序AGV
            agv_scores = []
            for agv in all_agvs:
                if not agv.can_accept_task(task.priority):
                    continue
                # è®¡ç®—负载得分(负载越低得分越高)
                load_score = 1.0 - agv.get_workload_ratio()
                # è®¡ç®—距离得分(距离越近得分越高)
                distance_score = 0.0
                if agv.coordinates and task_start_coord:
                    distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
                    distance_score = 1.0 / (1.0 + distance / 100.0)  # å½’一化距离得分
                # è®¡ç®—效率得分
                efficiency_score = agv.calculate_efficiency_score(path_mapping)
                # ç»¼åˆå¾—分
                total_score = 0.4 * load_score + 0.3 * distance_score + 0.3 * efficiency_score
                agv_scores.append((agv, total_score))
            if not agv_scores:
                # è¯¦ç»†åˆ†æžä¸ºä»€ä¹ˆæ²¡æœ‰AGV可以接受任务
                total_agvs = len(all_agvs)
                busy_count = 0
                low_battery_count = 0
                overloaded_count = 0
                status_invalid_count = 0
                for agv in all_agvs:
                    if agv.is_overloaded():
                        overloaded_count += 1
                    elif str(agv.status) not in ["0", "1", "2"]:
                        status_invalid_count += 1
                    elif agv.need_charging():
                        low_battery_count += 1
                    else:
                        busy_count += 1
                self.logger.warning(f"没有AGV可以接受任务 {task.taskId} - è¯¦ç»†åˆ†æž:")
                self.logger.warning(f"  æ€»AGV数: {total_agvs}")
                self.logger.warning(f"  ä»»åŠ¡æ»¡è½½: {overloaded_count}")
                self.logger.warning(f"  çŠ¶æ€å¼‚å¸¸: {status_invalid_count}")
                self.logger.warning(f"  ç”µé‡è¿‡ä½Ž: {low_battery_count}")
                self.logger.warning(f"  å…¶ä»–原因: {busy_count}")
                self.logger.warning(f"  ä»»åŠ¡ä¼˜å…ˆçº§: {task.priority}")
                continue
            # é€‰æ‹©å¾—分最高的AGV
            agv_scores.sort(key=lambda x: x[1], reverse=True)
            best_agv = agv_scores[0][0]
            # èŽ·å–AGV的原始状态数据来查找可用背篓位置
            agv_status = self.agv_manager.get_agv_status(best_agv.agvId)
            if agv_status:
                # æŸ¥æ‰¾å¯ç”¨çš„背篓位置
                available_lev_id = self.find_available_backpack_slot(agv_status)
                if available_lev_id is not None:
                    # åˆ†é…ä»»åŠ¡åˆ°æŒ‡å®šèƒŒç¯“ä½ç½®
                    success = self.assign_task_with_backpack(best_agv, task, available_lev_id)
                    if success:
                        assignments.append(TaskAssignment(
                            taskId=task.taskId,
                            agvId=best_agv.agvId,
                            lev_id=available_lev_id
                        ))
                        self.logger.info(f"任务 {task.taskId} åˆ†é…ç»™è´Ÿè½½å‡è¡¡çš„AGV {best_agv.agvId},背篓位置: {available_lev_id},得分: {agv_scores[0][1]:.3f}")
                    else:
                        self.logger.warning(f"任务 {task.taskId} åˆ†é…ç»™AGV {best_agv.agvId} å¤±è´¥")
                else:
                    self.logger.warning(f"AGV {best_agv.agvId} æ²¡æœ‰å¯ç”¨çš„背篓位置")
            else:
                self.logger.warning(f"无法获取AGV {best_agv.agvId} çš„状态信息")
        return assignments
class PriorityFirstAllocation(TaskAllocation):
    """优先级优先分配算法"""
    def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
        """
        ä½¿ç”¨ä¼˜å…ˆçº§ä¼˜å…ˆç­–略分配任务
        Args:
            tasks: å¾…分配的任务列表
        Returns:
            List[TaskAssignment]: åˆ†é…ç»“果列表
        """
        if not tasks:
            return []
        # èŽ·å–å¯ç”¨çš„AGV
        available_agvs = self.agv_manager.get_available_agvs()
        if not available_agvs:
            self.logger.warning("没有可用的AGV进行任务分配")
            return []
        # 1. é¦–先检查任务是否已经分配,避免重复分配
        already_assigned_tasks = set()
        all_agvs = self.agv_manager.get_all_agvs()
        for agv in all_agvs:
            if agv.backpack:
                for backpack_item in agv.backpack:
                    if backpack_item.taskId:
                        already_assigned_tasks.add(backpack_item.taskId)
                        self.logger.info(f"任务 {backpack_item.taskId} å·²åˆ†é…ç»™ AGV {agv.agvId},跳过重复分配")
        # 2. è¿‡æ»¤æŽ‰å·²åˆ†é…çš„任务
        unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
        if not unassigned_tasks:
            self.logger.info("所有任务都已分配,无需重新分配")
            return []
        self.logger.info(f"总任务数: {len(tasks)}, å·²åˆ†é…: {len(already_assigned_tasks)}, å¾…分配: {len(unassigned_tasks)}")
        # æŒ‰ä¼˜å…ˆçº§æŽ’序任务(高优先级在前)
        sorted_tasks = sorted(unassigned_tasks, key=lambda t: t.priority, reverse=True)
        assignments = []
        path_mapping = self.agv_manager.path_mapping
        # ä¼˜å…ˆåˆ†é…é«˜ä¼˜å…ˆçº§ä»»åŠ¡
        for task in sorted_tasks:
            if not available_agvs:
                break
            # èŽ·å–ä»»åŠ¡èµ·ç‚¹åæ ‡
            task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
            if not task_start_coord:
                self.logger.warning(f"无法获取任务 {task.taskId} èµ·ç‚¹ {task.start} çš„坐标")
                continue
            # ä¸ºé«˜ä¼˜å…ˆçº§ä»»åŠ¡é€‰æ‹©æœ€ä½³AGV
            best_agv = None
            best_score = -1
            for agv in available_agvs:
                if not agv.can_accept_task(task.priority):
                    continue
                # è®¡ç®—综合得分
                distance_score = 0.0
                if agv.coordinates and task_start_coord:
                    distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
                    distance_score = 1.0 / (1.0 + distance / 50.0)
                efficiency_score = agv.calculate_efficiency_score(path_mapping)
                capacity_score = agv.get_task_capacity() / agv.max_capacity
                # é«˜ä¼˜å…ˆçº§ä»»åŠ¡æ›´æ³¨é‡æ•ˆçŽ‡å’Œè·ç¦»
                total_score = 0.5 * distance_score + 0.3 * efficiency_score + 0.2 * capacity_score
                if total_score > best_score:
                    best_score = total_score
                    best_agv = agv
            if best_agv:
                # èŽ·å–AGV的原始状态数据来查找可用背篓位置
                agv_status = self.agv_manager.get_agv_status(best_agv.agvId)
                if agv_status:
                    # æŸ¥æ‰¾å¯ç”¨çš„背篓位置
                    available_lev_id = self.find_available_backpack_slot(agv_status)
                    if available_lev_id is not None:
                        # åˆ†é…ä»»åŠ¡åˆ°æŒ‡å®šèƒŒç¯“ä½ç½®
                        success = self.assign_task_with_backpack(best_agv, task, available_lev_id)
                        if success:
                            assignments.append(TaskAssignment(
                                taskId=task.taskId,
                                agvId=best_agv.agvId,
                                lev_id=available_lev_id
                            ))
                            self.logger.info(f"高优先级任务 {task.taskId} (优先级: {task.priority}) åˆ†é…ç»™AGV {best_agv.agvId},背篓位置: {available_lev_id}")
                            # æ£€æŸ¥AGV是否还有可用背篓位置
                            remaining_capacity = self.get_agv_available_capacity(agv_status) - 1
                            if remaining_capacity <= 0:
                                available_agvs.remove(best_agv)
                        else:
                            self.logger.warning(f"任务 {task.taskId} åˆ†é…ç»™AGV {best_agv.agvId} å¤±è´¥")
                    else:
                        self.logger.warning(f"AGV {best_agv.agvId} æ²¡æœ‰å¯ç”¨çš„背篓位置")
                        available_agvs.remove(best_agv)
                else:
                    self.logger.warning(f"无法获取AGV {best_agv.agvId} çš„状态信息")
        return assignments
class MultiObjectiveAllocation(TaskAllocation):
    """多目标优化分配算法"""
    def __init__(self, agv_manager: AGVModelManager,
                 distance_weight: float = 0.4,
                 load_weight: float = 0.3,
                 efficiency_weight: float = 0.3):
        """
        åˆå§‹åŒ–多目标优化分配算法
        Args:
            agv_manager: AGV模型管理器
            distance_weight: è·ç¦»æƒé‡
            load_weight: è´Ÿè½½æƒé‡
            efficiency_weight: æ•ˆçŽ‡æƒé‡
        """
        super().__init__(agv_manager)
        self.distance_weight = distance_weight
        self.load_weight = load_weight
        self.efficiency_weight = efficiency_weight
    def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
        """
        ä½¿ç”¨å¤šç›®æ ‡ä¼˜åŒ–策略分配任务
        Args:
            tasks: å¾…分配的任务列表
        Returns:
            List[TaskAssignment]: åˆ†é…ç»“果列表
        """
        if not tasks:
            return []
        # èŽ·å–æ‰€æœ‰AGV
        all_agvs = self.agv_manager.get_all_agvs()
        if not all_agvs:
            self.logger.warning("没有AGV进行任务分配")
            return []
        # 1. é¦–先检查任务是否已经分配,避免重复分配
        already_assigned_tasks = set()
        for agv in all_agvs:
            if agv.backpack:
                for backpack_item in agv.backpack:
                    if backpack_item.taskId:
                        already_assigned_tasks.add(backpack_item.taskId)
                        self.logger.info(f"任务 {backpack_item.taskId} å·²åˆ†é…ç»™ AGV {agv.agvId},跳过重复分配")
        # 2. è¿‡æ»¤æŽ‰å·²åˆ†é…çš„任务
        unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
        if not unassigned_tasks:
            self.logger.info("所有任务都已分配,无需重新分配")
            return []
        self.logger.info(f"总任务数: {len(tasks)}, å·²åˆ†é…: {len(already_assigned_tasks)}, å¾…分配: {len(unassigned_tasks)}")
        assignments = []
        path_mapping = self.agv_manager.path_mapping
        # å¯¹æ¯ä¸ªä»»åŠ¡-AGV对计算得分
        task_agv_scores = {}
        for task in unassigned_tasks:
            task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
            if not task_start_coord:
                continue
            for agv in all_agvs:
                if not agv.can_accept_task(task.priority):
                    continue
                # è·ç¦»å¾—分
                distance_score = 0.0
                if agv.coordinates:
                    distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
                    distance_score = 1.0 / (1.0 + distance / 100.0)
                # è´Ÿè½½å¾—分
                load_score = 1.0 - agv.get_workload_ratio()
                # æ•ˆçŽ‡å¾—åˆ†
                efficiency_score = agv.calculate_efficiency_score(path_mapping)
                # è®¡ç®—综合得分
                total_score = (
                    self.distance_weight * distance_score +
                    self.load_weight * load_score +
                    self.efficiency_weight * efficiency_score
                )
                task_agv_scores[(task.taskId, agv.agvId)] = total_score
        # ä½¿ç”¨è´ªå¿ƒç®—法进行匹配
        assignments = self._greedy_matching(unassigned_tasks, all_agvs, task_agv_scores)
        return assignments
    def _greedy_matching(self, tasks: List[TaskData], agvs: List[AGVModel],
                        scores: Dict[Tuple[str, str], float]) -> List[TaskAssignment]:
        """
        ä½¿ç”¨è´ªå¿ƒç®—法进行任务-AGV匹配
        Args:
            tasks: ä»»åŠ¡åˆ—è¡¨
            agvs: AGV列表
            scores: ä»»åŠ¡-AGV对的得分
        Returns:
            List[TaskAssignment]: åˆ†é…ç»“æžœ
        """
        assignments = []
        remaining_tasks = [task.taskId for task in tasks]
        # é‡å¤åˆ†é…ç›´åˆ°æ²¡æœ‰ä»»åŠ¡æˆ–æ²¡æœ‰å¯ç”¨AGV
        while remaining_tasks:
            # æ‰¾åˆ°å¾—分最高的任务-AGV对
            best_score = -1
            best_task_id = None
            best_agv = None
            for task_id in remaining_tasks:
                for agv in agvs:
                    if agv.is_overloaded():
                        continue
                    score = scores.get((task_id, agv.agvId), 0.0)
                    if score > best_score:
                        best_score = score
                        best_task_id = task_id
                        best_agv = agv
            if best_task_id and best_agv:
                # èŽ·å–AGV的原始状态数据来查找可用背篓位置
                agv_status = self.agv_manager.get_agv_status(best_agv.agvId)
                if agv_status:
                    # æŸ¥æ‰¾å¯ç”¨çš„背篓位置
                    available_lev_id = self.find_available_backpack_slot(agv_status)
                    if available_lev_id is not None:
                        # æ‰¾åˆ°å¯¹åº”的任务对象
                        task = next((t for t in tasks if t.taskId == best_task_id), None)
                        if task:
                            # åˆ†é…ä»»åŠ¡åˆ°æŒ‡å®šèƒŒç¯“ä½ç½®
                            success = self.assign_task_with_backpack(best_agv, task, available_lev_id)
                            if success:
                                assignments.append(TaskAssignment(
                                    taskId=best_task_id,
                                    agvId=best_agv.agvId,
                                    lev_id=available_lev_id
                                ))
                                remaining_tasks.remove(best_task_id)
                                self.logger.info(f"多目标优化:任务 {best_task_id} åˆ†é…ç»™AGV {best_agv.agvId},背篓位置: {available_lev_id},得分: {best_score:.3f}")
                            else:
                                self.logger.warning(f"任务 {best_task_id} åˆ†é…ç»™AGV {best_agv.agvId} å¤±è´¥")
                                break
                        else:
                            self.logger.error(f"找不到任务 {best_task_id} çš„详细信息")
                            break
                    else:
                        self.logger.debug(f"AGV {best_agv.agvId} æ²¡æœ‰å¯ç”¨çš„背篓位置,跳过")
                        break
                else:
                    self.logger.warning(f"无法获取AGV {best_agv.agvId} çš„状态信息")
                    break
            else:
                break
        return assignments
class TaskAllocationFactory:
    """任务分配算法工厂类"""
    @staticmethod
    def create_allocator(algorithm_type: str, agv_manager: AGVModelManager) -> TaskAllocation:
        """
        åˆ›å»ºä»»åŠ¡åˆ†é…ç®—æ³•
        Args:
            algorithm_type: ç®—法类型
            agv_manager: AGV模型管理器
        Returns:
            TaskAllocation: ä»»åŠ¡åˆ†é…ç®—æ³•å¯¹è±¡
        """
        if algorithm_type == "NEAREST_FIRST":
            return NearestFirstAllocation(agv_manager)
        elif algorithm_type == "LOAD_BALANCED":
            return LoadBalancedAllocation(agv_manager)
        elif algorithm_type == "PRIORITY_FIRST":
            return PriorityFirstAllocation(agv_manager)
        elif algorithm_type == "MULTI_OBJECTIVE":
            return MultiObjectiveAllocation(agv_manager)
        else:
            # é»˜è®¤ä½¿ç”¨è´Ÿè½½å‡è¡¡ç®—法
            return LoadBalancedAllocation(agv_manager)
algorithm_system/models/__init__.py
New file
@@ -0,0 +1,2 @@
# Algorithm System Models Module
__version__ = "1.0.0"
algorithm_system/models/agv_model.py
New file
@@ -0,0 +1,572 @@
"""
AGV模型 - ç”¨äºŽç®—法系统的AGV数据建模
"""
import time
import logging
from typing import Dict, List, Optional, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
from common.data_models import AGVStatus, BackpackData
from common.utils import get_coordinate_from_path_id, calculate_manhattan_distance
class AGVTaskStatus(Enum):
    """AGV任务状态"""
    IDLE = "IDLE"           # ç©ºé—²
    ASSIGNED = "ASSIGNED"   # å·²åˆ†é…ä»»åŠ¡ä½†æœªå¼€å§‹
    EXECUTING = "EXECUTING" # æ‰§è¡Œä¸­
    COMPLETED = "COMPLETED" # å·²å®Œæˆ
@dataclass
class TaskAssignment:
    """任务分配信息"""
    task_id: str
    assigned_time: float
    priority: int
    status: AGVTaskStatus = AGVTaskStatus.ASSIGNED
    start_code: Optional[str] = None
    end_code: Optional[str] = None
    estimated_duration: Optional[float] = None
class AGVModel:
    """AGV模型类,用于算法计算"""
    def __init__(self, agv_id: str, path_mapping: Dict[str, Dict[str, int]]):
        """
        åˆå§‹åŒ–AGV模型
        Args:
            agv_id: AGV ID
            path_mapping: è·¯å¾„点映射字典
        """
        self.agvId = agv_id
        self.path_mapping = path_mapping
        self.logger = logging.getLogger(__name__)
        # AGV状态信息
        self.status: str = "0"  # AGV状态码
        self.mapCode: str = ""  # å½“前位置码
        self.coordinates: Optional[Tuple[int, int]] = None  # å½“前坐标
        self.backpack: Optional[List[BackpackData]] = None  # èƒŒåŒ…信息
        # å……电相关
        self.voltage: int = 100  # å½“前电量百分比
        self.autoCharge: int = 20  # ä½Žç”µé‡è®¾å®šé˜ˆå€¼
        self.lowVol: int = 10  # æœ€ä½Žç”µé‡é˜ˆå€¼
        # ä»»åŠ¡ç›¸å…³
        self.assigned_tasks: List[TaskAssignment] = []
        self.current_task_count: int = 0
        self.max_capacity: int = 5  # æœ€å¤§ä»»åŠ¡å®¹é‡
        # æ€§èƒ½ç›¸å…³
        self.efficiency_score: float = 1.0  # æ•ˆçŽ‡å¾—åˆ†
        self.last_update_time: float = time.time()
        # ç»Ÿè®¡ä¿¡æ¯
        self.total_completed_tasks: int = 0
        self.total_distance_traveled: float = 0.0
        self.average_completion_time: float = 0.0
    def update_from_agv_status(self, agv_status: AGVStatus):
        """从AGV状态更新模型"""
        self.status = agv_status.status
        if hasattr(agv_status, 'position'):
            self.mapCode = agv_status.position
        elif hasattr(agv_status, 'mapCode'):
            self.mapCode = agv_status.mapCode
        else:
            self.mapCode = ""
        self.backpack = agv_status.backpack
        self.last_update_time = time.time()
        raw_voltage = getattr(agv_status, 'vol', 100)
        raw_auto_charge = getattr(agv_status, 'autoCharge', 20)
        raw_low_vol = getattr(agv_status, 'lowVol', 10)
        # ç”µé‡æ•°æ®æ ‡å‡†åŒ–处理
        # å¦‚果电压值大于100,可能是毫伏值,需要转换为百分比
        if raw_voltage > 100:
            # å‡è®¾æ­£å¸¸ç”µåŽ‹èŒƒå›´æ˜¯3000-5000mV,转换为0-100%
            # è¿™é‡Œä½¿ç”¨ç®€å•的线性映射
            normalized_voltage = max(0, min(100, ((raw_voltage - 3000) / 2000) * 100))
            self.logger.debug(f"AGV {self.agvId} ç”µåŽ‹æ ‡å‡†åŒ–: {raw_voltage}mV -> {normalized_voltage:.1f}%")
            self.voltage = int(normalized_voltage)
        else:
            self.voltage = raw_voltage
        # é˜ˆå€¼æ ‡å‡†åŒ–处理
        if raw_auto_charge > 100:
            # å¦‚果阈值也是毫伏值,同样转换
            self.autoCharge = max(0, min(100, ((raw_auto_charge - 3000) / 2000) * 100))
            self.logger.debug(f"AGV {self.agvId} è‡ªåŠ¨å……ç”µé˜ˆå€¼æ ‡å‡†åŒ–: {raw_auto_charge}mV -> {self.autoCharge:.1f}%")
        else:
            self.autoCharge = raw_auto_charge
        if raw_low_vol > 100:
            # å¦‚果最低电量阈值也是毫伏值,同样转换
            self.lowVol = max(0, min(100, ((raw_low_vol - 3000) / 2000) * 100))
            self.logger.debug(f"AGV {self.agvId} æœ€ä½Žç”µé‡é˜ˆå€¼æ ‡å‡†åŒ–: {raw_low_vol}mV -> {self.lowVol:.1f}%")
        else:
            self.lowVol = raw_low_vol
        # æ›´æ–°åæ ‡
        if self.mapCode:
            self.coordinates = get_coordinate_from_path_id(self.mapCode, self.path_mapping)
        # æ ¹æ®AGV状态更新任务计数
        if self.backpack and self.backpack:
            self.current_task_count = len([bp for bp in self.backpack if bp.execute])
        else:
            self.current_task_count = 0
    def assign_task(self, task_id: str, priority: int = 5,
                   start_code: str = "", end_code: str = "") -> bool:
        """
        åˆ†é…ä»»åŠ¡ç»™AGV
        Args:
            task_id: ä»»åŠ¡ID
            priority: ä»»åŠ¡ä¼˜å…ˆçº§
            start_code: èµ·å§‹ä½ç½®ç 
            end_code: ç»“束位置码
        Returns:
            bool: æ˜¯å¦åˆ†é…æˆåŠŸ
        """
        if self.is_overloaded():
            self.logger.warning(f"AGV {self.agvId} å·²æ»¡è½½ï¼Œæ— æ³•分配更多任务")
            return False
        # åˆ›å»ºä»»åŠ¡åˆ†é…è®°å½•
        task_assignment = TaskAssignment(
            task_id=task_id,
            assigned_time=time.time(),
            priority=priority,
            start_code=start_code,
            end_code=end_code
        )
        self.assigned_tasks.append(task_assignment)
        self.current_task_count += 1
        self.logger.info(f"任务 {task_id} å·²åˆ†é…ç»™AGV {self.agvId}")
        return True
    def complete_task(self, task_id: str) -> bool:
        """
        å®Œæˆä»»åŠ¡
        Args:
            task_id: ä»»åŠ¡ID
        Returns:
            bool: æ˜¯å¦å®ŒæˆæˆåŠŸ
        """
        for task in self.assigned_tasks:
            if task.task_id == task_id:
                task.status = AGVTaskStatus.COMPLETED
                self.current_task_count = max(0, self.current_task_count - 1)
                self.total_completed_tasks += 1
                self.logger.info(f"AGV {self.agvId} å®Œæˆä»»åŠ¡ {task_id}")
                return True
        return False
    def can_accept_task(self, priority: int) -> bool:
        """
        æ£€æŸ¥æ˜¯å¦å¯ä»¥æŽ¥å—新任务
        Args:
            priority: ä»»åŠ¡ä¼˜å…ˆçº§
        Returns:
            bool: æ˜¯å¦å¯ä»¥æŽ¥å—
        """
        # æ£€æŸ¥å®¹é‡
        if self.is_overloaded():
            self.logger.debug(f"AGV {self.agvId} ä»»åŠ¡å·²æ»¡è½½({self.current_task_count}/{self.max_capacity}),拒绝新任务")
            return False
        # æ£€æŸ¥AGV状态(兼容整数和字符串格式)
        # çŠ¶æ€ 0(空闲), 1(忙碌), 2(充电) å¯ä»¥æŽ¥å—任务
        # çŠ¶æ€ 3(故障), 4(维护) ä¸èƒ½æŽ¥å—任务
        status_value = str(self.status)  # ç»Ÿä¸€è½¬æ¢ä¸ºå­—符串进行比较
        if status_value not in ["0", "1", "2"]:
            self.logger.debug(f"AGV {self.agvId} çŠ¶æ€å¼‚å¸¸(status={status_value}),拒绝新任务")
            return False
        # æ£€æŸ¥å……电状态 - å¦‚果电量过低必须充电,不能接受新任务
        if self.need_charging():
            self.logger.debug(f"AGV {self.agvId} ç”µé‡è¿‡ä½Ž({self.voltage}% <= {self.lowVol}%),必须充电,拒绝新任务")
            return False
        # å¦‚果是高优先级任务,即使低电量也可以接受
        if priority >= 9:  # é«˜ä¼˜å…ˆçº§ä»»åŠ¡é˜ˆå€¼
            self.logger.debug(f"AGV {self.agvId} æŽ¥å—高优先级任务(priority={priority})")
            return True
        # å¯¹äºŽæ™®é€šä¼˜å…ˆçº§ä»»åŠ¡ï¼Œå¦‚æžœç”µé‡ä½Žä½†ä¸æ˜¯å¿…é¡»å……ç”µï¼Œå¯ä»¥æ ¹æ®ä¼˜å…ˆçº§å†³å®š
        if self.can_auto_charge():
            # ä¼˜å…ˆçº§è¶Šé«˜ï¼Œè¶Šå€¾å‘于接受任务
            if priority >= 5:  # ä¸­é«˜ä¼˜å…ˆçº§
                self.logger.debug(f"AGV {self.agvId} æŽ¥å—中高优先级任务(priority={priority}, ç”µé‡={self.voltage}%)")
                return True
            elif priority >= 3:  # ä¸­ç­‰ä¼˜å…ˆçº§ï¼Œ75%概率接受
                import random
                accept = random.random() < 0.75
                self.logger.debug(f"AGV {self.agvId} éšæœºå†³å®š{'接受' if accept else '拒绝'}中等优先级任务(priority={priority}, ç”µé‡={self.voltage}%)")
                return accept
            elif priority >= 1:  # ä½Žä¼˜å…ˆçº§ï¼Œ50%概率接受(包括优先级1)
                import random
                accept = random.random() < 0.5
                self.logger.debug(f"AGV {self.agvId} éšæœºå†³å®š{'接受' if accept else '拒绝'}低优先级任务(priority={priority}, ç”µé‡={self.voltage}%)")
                return accept
            else:  # æžä½Žä¼˜å…ˆçº§ï¼ˆpriority=0),拒绝
                self.logger.debug(f"AGV {self.agvId} ç”µé‡åä½Ž({self.voltage}%),拒绝极低优先级任务(priority={priority})")
                return False
        # æ­£å¸¸æƒ…况下可以接受任务
        self.logger.debug(f"AGV {self.agvId} çŠ¶æ€è‰¯å¥½ï¼Œå¯ä»¥æŽ¥å—ä»»åŠ¡(电量={self.voltage}%, priority={priority})")
        return True
    def is_overloaded(self) -> bool:
        """
        æ£€æŸ¥æ˜¯å¦è¿‡è½½
        Returns:
            bool: æ˜¯å¦è¿‡è½½
        """
        return self.current_task_count >= self.max_capacity
    def get_workload_ratio(self) -> float:
        """
        èŽ·å–å·¥ä½œè´Ÿè½½æ¯”ä¾‹
        Returns:
            float: å·¥ä½œè´Ÿè½½æ¯”例 (0.0 - 1.0)
        """
        return min(1.0, self.current_task_count / self.max_capacity)
    def get_task_capacity(self) -> int:
        """
        èŽ·å–å‰©ä½™ä»»åŠ¡å®¹é‡
        Returns:
            int: å‰©ä½™å®¹é‡
        """
        return max(0, self.max_capacity - self.current_task_count)
    def need_charging(self) -> bool:
        """
        æ£€æŸ¥æ˜¯å¦éœ€è¦å……电
        Returns:
            bool: æ˜¯å¦éœ€è¦å……电
        """
        return self.voltage <= self.lowVol
    def can_auto_charge(self) -> bool:
        """
        æ£€æŸ¥æ˜¯å¦å¯ä»¥è‡ªåŠ¨å……ç”µï¼ˆä½ŽäºŽé˜ˆå€¼ä½†ä¸æ˜¯å¿…é¡»å……ç”µï¼‰
        Returns:
            bool: æ˜¯å¦å¯ä»¥è‡ªåŠ¨å……ç”µ
        """
        return self.lowVol < self.voltage <= self.autoCharge
    def is_low_power(self) -> bool:
        """
        æ£€æŸ¥æ˜¯å¦ä¸ºä½Žç”µé‡çŠ¶æ€
        Returns:
            bool: æ˜¯å¦ä¸ºä½Žç”µé‡çŠ¶æ€
        """
        return self.voltage <= self.autoCharge
    def get_charging_priority(self) -> int:
        """
        èŽ·å–å……ç”µä¼˜å…ˆçº§ï¼ˆæ•°å€¼è¶Šå¤§ä¼˜å…ˆçº§è¶Šé«˜ï¼‰
        Returns:
            int: å……电优先级
        """
        if self.need_charging():
            return 100  # å¿…须充电,最高优先级
        elif self.can_auto_charge():
            return 50 + (self.autoCharge - self.voltage)  # æ ¹æ®ç”µé‡å·®è®¡ç®—优先级
        else:
            return 0  # æ— éœ€å……电
    def calculate_efficiency_score(self, path_mapping: Dict[str, Dict[str, int]]) -> float:
        """
        è®¡ç®—AGV效率得分
        Args:
            path_mapping: è·¯å¾„点映射
        Returns:
            float: æ•ˆçŽ‡å¾—åˆ† (0.0 - 1.0)
        """
        # åŸºç¡€æ•ˆçŽ‡åˆ†æ•°
        base_score = 0.7
        # æ ¹æ®å®Œæˆä»»åŠ¡æ•°é‡è°ƒæ•´
        if self.total_completed_tasks > 0:
            completion_bonus = min(0.2, self.total_completed_tasks * 0.01)
            base_score += completion_bonus
        # æ ¹æ®è´Ÿè½½æƒ…况调整
        load_ratio = self.get_workload_ratio()
        if load_ratio < 0.8:  # è´Ÿè½½ä¸å¤ªé«˜æ—¶æ•ˆçŽ‡æ›´é«˜
            load_bonus = (0.8 - load_ratio) * 0.1
            base_score += load_bonus
        # æ ¹æ®AGV状态调整
        if self.status == "0":  # æ­£å¸¸çŠ¶æ€
            base_score += 0.1
        elif self.status in ["3", "4"]:  # å¼‚常状态
            base_score -= 0.2
        # æ—¶é—´å› å­ï¼šæœ€è¿‘æ›´æ–°çš„AGV得分更高
        time_since_update = time.time() - self.last_update_time
        if time_since_update < 60:  # 1分钟内更新
            time_bonus = (60 - time_since_update) / 600  # æœ€å¤šåŠ 0.1
            base_score += time_bonus
        return max(0.0, min(1.0, base_score))
    def estimate_travel_time(self, target_code: str) -> float:
        """
        ä¼°ç®—到目标位置的行驶时间
        Args:
            target_code: ç›®æ ‡ä½ç½®ç 
        Returns:
            float: ä¼°ç®—时间(秒)
        """
        if not self.coordinates:
            return float('inf')
        target_coord = get_coordinate_from_path_id(target_code, self.path_mapping)
        if not target_coord:
            return float('inf')
        # è®¡ç®—曼哈顿距离
        distance = calculate_manhattan_distance(self.coordinates, target_coord)
        # å‡è®¾AGV平均速度为1单位/秒
        estimated_time = distance * 1.0
        # è€ƒè™‘当前负载对速度的影响
        load_factor = 1.0 + (self.get_workload_ratio() * 0.2)
        return estimated_time * load_factor
    def get_task_by_id(self, task_id: str) -> Optional[TaskAssignment]:
        """
        æ ¹æ®ä»»åŠ¡ID获取任务分配信息
        Args:
            task_id: ä»»åŠ¡ID
        Returns:
            Optional[TaskAssignment]: ä»»åŠ¡åˆ†é…ä¿¡æ¯
        """
        for task in self.assigned_tasks:
            if task.task_id == task_id:
                return task
        return None
    def get_pending_tasks(self) -> List[TaskAssignment]:
        """
        èŽ·å–å¾…æ‰§è¡Œçš„ä»»åŠ¡
        Returns:
            List[TaskAssignment]: å¾…执行任务列表
        """
        return [task for task in self.assigned_tasks
                if task.status in [AGVTaskStatus.ASSIGNED, AGVTaskStatus.EXECUTING]]
    def clear_completed_tasks(self):
        """清理已完成的任务"""
        self.assigned_tasks = [task for task in self.assigned_tasks
                              if task.status != AGVTaskStatus.COMPLETED]
    def __str__(self) -> str:
        """字符串表示"""
        return (f"AGV({self.agvId}, status={self.status}, "
                f"pos={self.mapCode}, tasks={self.current_task_count}/{self.max_capacity})")
class AGVModelManager:
    """AGV模型管理器"""
    def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
        """
        åˆå§‹åŒ–AGV模型管理器
        Args:
            path_mapping: è·¯å¾„点映射字典
        """
        self.path_mapping = path_mapping
        self.agv_models: Dict[str, AGVModel] = {}
        self.agv_status_data: Dict[str, AGVStatus] = {}  # å­˜å‚¨åŽŸå§‹AGV状态数据
        self.logger = logging.getLogger(__name__)
    def update_agv_data(self, agv_status_list: List[AGVStatus]):
        """
        æ›´æ–°AGV数据
        Args:
            agv_status_list: AGV状态列表
        """
        # èŽ·å–å½“å‰æ›´æ–°çš„AGV ID列表
        current_agv_ids = {agv_status.agvId for agv_status in agv_status_list}
        # ç§»é™¤ä¸å†å­˜åœ¨çš„AGV模型和状态数据
        removed_agvs = []
        for agv_id in list(self.agv_models.keys()):
            if agv_id not in current_agv_ids:
                removed_agvs.append(agv_id)
                del self.agv_models[agv_id]
                if agv_id in self.agv_status_data:
                    del self.agv_status_data[agv_id]
        if removed_agvs:
            self.logger.info(f"移除不存在的AGV: {removed_agvs}")
        # æ›´æ–°æˆ–创建AGV模型和状态数据
        for agv_status in agv_status_list:
            agv_id = agv_status.agvId
            # å­˜å‚¨åŽŸå§‹AGV状态数据
            self.agv_status_data[agv_id] = agv_status
            # å¦‚æžœAGV模型不存在,创建新的
            if agv_id not in self.agv_models:
                self.agv_models[agv_id] = AGVModel(agv_id, self.path_mapping)
                self.logger.info(f"创建新的AGV模型: {agv_id}")
            # æ›´æ–°AGV模型
            self.agv_models[agv_id].update_from_agv_status(agv_status)
        self.logger.debug(f"更新了 {len(agv_status_list)} ä¸ªAGV模型,当前总数: {len(self.agv_models)}")
    def get_all_agv_status(self) -> List[AGVStatus]:
        """
        èŽ·å–æ‰€æœ‰AGV的原始状态数据
        Returns:
            List[AGVStatus]: AGV状态数据列表
        """
        return list(self.agv_status_data.values())
    def get_agv_status(self, agv_id: str) -> Optional[AGVStatus]:
        """
        èŽ·å–æŒ‡å®šAGV的原始状态数据
        Args:
            agv_id: AGV ID
        Returns:
            Optional[AGVStatus]: AGV状态数据
        """
        return self.agv_status_data.get(agv_id)
    def get_agv_model(self, agv_id: str) -> Optional[AGVModel]:
        """
        èŽ·å–AGV模型
        Args:
            agv_id: AGV ID
        Returns:
            Optional[AGVModel]: AGV模型
        """
        return self.agv_models.get(agv_id)
    def get_all_agvs(self) -> List[AGVModel]:
        """
        èŽ·å–æ‰€æœ‰AGV模型
        Returns:
            List[AGVModel]: AGV模型列表
        """
        return list(self.agv_models.values())
    def get_available_agvs(self) -> List[AGVModel]:
        """
        èŽ·å–å¯ç”¨çš„AGV模型(未满载且状态正常)
        Returns:
            List[AGVModel]: å¯ç”¨AGV模型列表
        """
        available_agvs = []
        for agv in self.agv_models.values():
            if not agv.is_overloaded() and agv.can_accept_task(5):
                available_agvs.append(agv)
        return available_agvs
    def get_agvs_by_status(self, status: str) -> List[AGVModel]:
        """
        æ ¹æ®çŠ¶æ€èŽ·å–AGV列表
        Args:
            status: AGV状态
        Returns:
            List[AGVModel]: ç¬¦åˆçŠ¶æ€çš„AGV列表
        """
        return [agv for agv in self.agv_models.values() if agv.status == status]
    def cleanup_old_agvs(self, max_age_seconds: float = 300):
        """
        æ¸…理长时间未更新的AGV
        Args:
            max_age_seconds: æœ€å¤§å…è®¸çš„æœªæ›´æ–°æ—¶é—´ï¼ˆç§’)
        """
        current_time = time.time()
        old_agvs = []
        for agv_id, agv in self.agv_models.items():
            if current_time - agv.last_update_time > max_age_seconds:
                old_agvs.append(agv_id)
        for agv_id in old_agvs:
            del self.agv_models[agv_id]
            self.logger.info(f"清理长时间未更新的AGV: {agv_id}")
    def get_statistics(self) -> Dict[str, any]:
        """
        èŽ·å–AGV统计信息
        Returns:
            Dict: ç»Ÿè®¡ä¿¡æ¯
        """
        total_agvs = len(self.agv_models)
        available_agvs = len(self.get_available_agvs())
        total_tasks = sum(agv.current_task_count for agv in self.agv_models.values())
        total_capacity = sum(agv.max_capacity for agv in self.agv_models.values())
        avg_efficiency = 0.0
        if self.agv_models:
            avg_efficiency = sum(agv.calculate_efficiency_score(self.path_mapping)
                               for agv in self.agv_models.values()) / total_agvs
        return {
            "total_agvs": total_agvs,
            "available_agvs": available_agvs,
            "total_assigned_tasks": total_tasks,
            "total_capacity": total_capacity,
            "capacity_utilization": total_tasks / total_capacity if total_capacity > 0 else 0.0,
            "average_efficiency": avg_efficiency
        }
algorithm_system/path_monitor.py
New file
@@ -0,0 +1,442 @@
"""
监控RCS系统状态并生成路径
"""
import time
import threading
import logging
from typing import Dict, List, Optional, Any
import queue
from common.data_models import AGVStatus, PlannedPath, from_dict
from common.api_client import RCSAPIClient
from algorithm_system.algorithms.path_planning import BatchPathPlanner
from algorithm_system.models.agv_model import AGVModelManager
from common.utils import load_path_mapping, generate_segment_id
try:
    from config.settings import RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL
except ImportError:
    RCS_SERVER_HOST = "10.10.10.156"
    RCS_SERVER_PORT = 8088
    MONITOR_POLLING_INTERVAL = 5.0
    logging.warning("无法从config.settings导入配置,使用默认值")
class PathMonitorService:
    """监控RCS状态并生成路径"""
    def __init__(self,
                 rcs_host: str = None,
                 rcs_port: int = None,
                 poll_interval: float = None,
                 path_algorithm: str = "A_STAR",
                 auto_send_paths: bool = True):
        #初始化路径监控服务
        self.logger = logging.getLogger(__name__)
        self.rcs_host = rcs_host or RCS_SERVER_HOST
        self.rcs_port = rcs_port or RCS_SERVER_PORT
        self.poll_interval = poll_interval or MONITOR_POLLING_INTERVAL
        self.path_algorithm = path_algorithm
        self.auto_send_paths = auto_send_paths
        self.rcs_client = RCSAPIClient(self.rcs_host, self.rcs_port, timeout=10)
        self.path_mapping = load_path_mapping()
        self.agv_manager = AGVModelManager(self.path_mapping)
        self.path_planner = BatchPathPlanner(
            self.path_mapping,
            self.path_algorithm,
            self.agv_manager
        )
        self.is_running = False
        self.monitor_thread = None
        self._stop_event = threading.Event()
        self.stats = {
            'poll_count': 0,           # è½®è¯¢æ¬¡æ•°
            'successful_polls': 0,     # æˆåŠŸè½®è¯¢æ¬¡æ•°
            'path_generations': 0,     # è·¯å¾„生成次数
            'last_poll_time': None,    # æœ€åŽè½®è¯¢æ—¶é—´
            'last_generation_time': None  # æœ€åŽç”Ÿæˆæ—¶é—´
        }
        self.logger.info(f"路径监控服务初始化完成 - è½®è¯¢é—´éš”: {poll_interval}s, ç®—法: {path_algorithm}")
    def start_monitoring(self):
        """启动监控服务"""
        if self.is_running:
            self.logger.warning("监控服务已在运行中")
            return
        self.is_running = True
        self._stop_event.clear()
        self.monitor_thread = threading.Thread(
            target=self._monitoring_loop,
            name="PathMonitorThread",
            daemon=True
        )
        self.monitor_thread.start()
        self.logger.info("监控服务已启动")
    def stop_monitoring(self):
        """停止监控服务"""
        if not self.is_running:
            self.logger.warning("监控服务未在运行")
            return
        self.is_running = False
        self._stop_event.set()
        if self.monitor_thread and self.monitor_thread.is_alive():
            self.monitor_thread.join(timeout=5.0)
        self.logger.info("监控服务已停止")
    def _monitoring_loop(self):
        """监控主循环"""
        self.logger.info("开始监控循环...")
        while self.is_running and not self._stop_event.is_set():
            try:
                self._perform_monitoring_cycle()
                if not self._stop_event.wait(self.poll_interval):
                    continue
                else:
                    break
            except Exception as e:
                self.logger.error(f"监控循环异常: {e}")
                self._stop_event.wait(min(self.poll_interval * 2, 10.0))
        self.logger.info("监控循环已结束")
    def _perform_monitoring_cycle(self):
        """执行一次监控周期"""
        start_time = time.time()
        self.stats['poll_count'] += 1
        self.stats['last_poll_time'] = start_time
        try:
            self.logger.info(f"开始路径监控周期 #{self.stats['poll_count']}")
            # 1. èŽ·å–å½“å‰AGV状态和任务状态
            current_agv_status, current_task_status = self._fetch_rcs_status()
            if not current_agv_status:
                self.logger.debug("未获取到AGV状态数据,跳过路径生成")
                return
            # 2. ç›´æŽ¥æ ¹æ®å½“前状态生成免碰撞路径
            self.logger.info(f"[路径生成] å¼€å§‹ä¸º {len(current_agv_status)} ä¸ªAGV生成路径")
            generated_paths = self._generate_collision_free_paths(current_agv_status)
            # 3. å‘送路径到RCS(根据配置决定)
            if generated_paths:
                if self.auto_send_paths:
                    self._send_paths_to_rcs(generated_paths)
                else:
                    self.logger.debug(f"路径生成完成但未发送到RCS - AGV数量: {len(current_agv_status)}, è·¯å¾„æ•°: {len(generated_paths)}")
                self.logger.info(f"路径生成完成 - AGV数量: {len(current_agv_status)}, è·¯å¾„æ•°: {len(generated_paths)}, è‡ªåŠ¨å‘é€: {self.auto_send_paths}")
            else:
                self.logger.debug("未生成任何路径(可能所有AGV都处于空闲状态)")
            # æ›´æ–°æˆåŠŸç»Ÿè®¡
            self.stats['successful_polls'] += 1
            self.stats['path_generations'] += 1
            self.stats['last_generation_time'] = time.time()
            generation_time = (time.time() - start_time) * 1000
            self.logger.debug(f"监控周期完成 - è€—æ—¶: {generation_time:.2f}ms")
        except Exception as e:
            self.logger.error(f"监控周期执行失败: {e}")
    def _generate_unique_seg_id(self, agv_id: str, task_id: str = '') -> str:
        """生成segId"""
        try:
            if task_id:
                # æœ‰ä»»åŠ¡æ—¶ä½¿ç”¨task_id
                return generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2")
            else:
                # æ— ä»»åŠ¡æ—¶ä½¿ç”¨æ—¶é—´æˆ³ä½œä¸ºç›®æ ‡ä½ç½®ï¼Œç¡®ä¿ä¸åŒæ—¶é—´æœ‰ä¸åŒsegId
                import time
                time_target = f"IDLE_{int(time.time() / 3600)}"  # æŒ‰å°æ—¶åˆ†ç»„
                return generate_segment_id(agv_id=agv_id, target_position=time_target, action_type="4")
        except Exception as e:
            self.logger.error(f"生成segId失败: {e}")
            # é™çº§æ–¹æ¡ˆï¼šä½¿ç”¨common.utils的函数生成备用ID
            import time
            fallback_target = f"FALLBACK_{int(time.time())}"
            return generate_segment_id(agv_id=agv_id, target_position=fallback_target, action_type="1")
    def _fetch_rcs_status(self) -> tuple[List[AGVStatus], Dict]:
        """从RCS获取当前状态"""
        agv_status_list = []
        task_status = {}
        try:
            # èŽ·å–AGV状态 - ä½¿ç”¨ç©ºå‚数获取所有AGV状态和任务状态
            self.logger.info(" è½®è¯¢ç›®æ ‡: ä½¿ç”¨ç©ºå‚æ•°(agvId=None, mapId=None)获取RCS所有AGV状态和任务状态")
            agv_response = self.rcs_client.get_agv_status(agv_id=None, map_id=None)
            if agv_response.code == 200 and agv_response.data:
                self.logger.info(f"成功获取AGV状态 - æ•°é‡: {len(agv_response.data)}")
                # ç»Ÿè®¡ä¿¡æ¯
                total_tasks = 0
                executing_tasks = 0
                loaded_tasks = 0
                self.logger.info("=" * 80)
                self.logger.info("[详细AGV状态信息]")
                self.logger.info("=" * 80)
                for i, agv_data in enumerate(agv_response.data, 1):
                    try:
                        # è½¬æ¢ä¸ºAGVStatus对象
                        agv_status = from_dict(AGVStatus, agv_data)
                        agv_status_list.append(agv_status)
                        # æ‰“印AGV基本信息
                        self.logger.info(f"[AGV #{i}] {agv_status.agvId}")
                        self.logger.info(f"   çŠ¶æ€: {agv_status.status} | ä½ç½®: {agv_status.position} | æ–¹å‘: {agv_status.direction}")
                        self.logger.info(f"   ç”µåŽ‹: {agv_status.vol} | é”™è¯¯ç : {agv_status.error} | ç©ºèƒŒç¯“: {agv_status.empty}")
                        # åˆ†æžèƒŒç¯“信息
                        if agv_status.backpack:
                            self.logger.info(f"   [背篓信息] ({len(agv_status.backpack)} ä¸ª):")
                            for backpack_item in agv_status.backpack:
                                status_text = "执行中" if backpack_item.execute else ("已装载" if backpack_item.loaded else "空闲")
                                task_info = f"任务: {backpack_item.taskId}" if backpack_item.taskId else "无任务"
                                self.logger.info(f"     [位置{backpack_item.index}] {task_info} | "
                                                f"状态: {status_text} | å·²è£…è½½: {backpack_item.loaded} | æ‰§è¡Œä¸­: {backpack_item.execute}")
                                # ç»Ÿè®¡ä»»åŠ¡
                                if backpack_item.taskId:
                                    total_tasks += 1
                                    if backpack_item.execute:
                                        executing_tasks += 1
                                    if backpack_item.loaded:
                                        loaded_tasks += 1
                                    # æå–任务状态信息
                                    task_status[backpack_item.taskId] = {
                                        'agvId': agv_status.agvId,
                                        'loaded': backpack_item.loaded,
                                        'execute': backpack_item.execute,
                                        'index': backpack_item.index
                                    }
                        else:
                            self.logger.info(f"   [背篓信息] æ— èƒŒç¯“信息")
                        self.logger.info("-" * 60)
                    except Exception as e:
                        self.logger.warning(f"解析AGV状态数据失败: {agv_data} - {e}")
                # æ‰“印统计摘要
                self.logger.info("[任务统计摘要]")
                self.logger.info(f"   æ€»AGV数量: {len(agv_status_list)}")
                self.logger.info(f"   æ€»ä»»åŠ¡æ•°é‡: {total_tasks}")
                self.logger.info(f"   æ‰§è¡Œä¸­ä»»åŠ¡: {executing_tasks}")
                self.logger.info(f"   å·²è£…载任务: {loaded_tasks}")
                self.logger.info(f"   æœªè£…载任务: {total_tasks - loaded_tasks}")
                self.logger.info("=" * 80)
            else:
                self.logger.warning(f"获取AGV状态失败: {agv_response.msg if agv_response else '无响应'}")
        except Exception as e:
            self.logger.error(f"从RCS获取状态失败: {e}")
        return agv_status_list, task_status
    def _generate_collision_free_paths(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
        """为所有AGV生成免碰撞路径"""
        try:
            self.logger.debug(f"开始为 {len(agv_status_list)} ä¸ªAGV生成免碰撞路径")
            # ä½¿ç”¨æ‰¹é‡è·¯å¾„规划器生成路径
            result = self.path_planner.plan_all_agv_paths(
                agv_status_list=agv_status_list,
                include_idle_agv=False,  # åªä¸ºæ‰§è¡Œä»»åŠ¡çš„AGV生成路径
                constraints=None
            )
            planned_paths = result.get('plannedPaths', result.get('planned_paths', []))
            # è¯Šæ–­ï¼šæ˜¾ç¤ºè·¯å¾„规划结果的所有字段
            self.logger.debug(f"路径规划器返回的字段: {list(result.keys())}")
            self.logger.debug(f"planned_paths字段存在: {'planned_paths' in result}")
            self.logger.debug(f"plannedPaths字段存在: {'plannedPaths' in result}")
            self.logger.debug(f"最终获取到的路径数量: {len(planned_paths)}")
            if planned_paths:
                self.logger.info(f"成功生成 {len(planned_paths)} æ¡å…ç¢°æ’žè·¯å¾„")
                # è®°å½•每个AGV的路径信息
                for path_info in planned_paths:
                    agv_id = path_info.get('agvId', 'unknown')
                    code_list = path_info.get('codeList', [])
                    self.logger.debug(f"AGV {agv_id} è·¯å¾„长度: {len(code_list)}")
            else:
                self.logger.info("未生成任何路径(可能所有AGV都处于空闲状态)")
            return planned_paths
        except Exception as e:
            self.logger.error(f"生成免碰撞路径失败: {e}")
            return []
    def _send_paths_to_rcs(self, generated_paths: List[Dict]):
        """将生成的路径批量发送到RCS系统"""
        try:
            self.logger.info(f"开始批量发送 {len(generated_paths)} æ¡è·¯å¾„到RCS系统")
            # æž„建批量路径数据
            agv_paths = []
            import uuid
            import time as time_module
            for path_info in generated_paths:
                agv_id = path_info.get('agvId')
                code_list = path_info.get('codeList', [])
                if agv_id and code_list:
                    path_task_id = ''
                    for path_code in code_list:
                        if path_code.get('taskId'):
                            path_task_id = path_code.get('taskId')
                            break
                    navigation_data = []
                    for path_code in code_list:
                        nav_item = {
                            'code': path_code.get('code', ''),
                            'direction': path_code.get('direction', '90'),
                            'type': path_code.get('type')
                        }
                        if path_code.get('taskId'):
                            nav_item['taskId'] = path_code.get('taskId')
                        if path_code.get('posType'):
                            nav_item['posType'] = path_code.get('posType')
                        if path_code.get('lev') is not None:
                            nav_item['lev'] = path_code.get('lev')
                        navigation_data.append(nav_item)
                    auto_seg_id = self._generate_unique_seg_id(agv_id, path_task_id)
                    agv_path_data = {
                        'agvId': agv_id,
                        'segId': auto_seg_id,
                        'codeList': navigation_data
                    }
                    agv_paths.append(agv_path_data)
            if not agv_paths:
                self.logger.warning("没有有效的路径数据需要发送")
                return
            try:
                import requests
                import json
                rcs_url = f"http://{self.rcs_host}:{self.rcs_port}/api/open/algorithm/zkd/navigation/v1"
                self.logger.info(f"算法系统发送路径规划结果到RCS:")
                self.logger.info(f"   AGV数量: {len(agv_paths)}")
                self.logger.info(f"   ç›®æ ‡URL: {rcs_url}")
                successful_sends = 0
                for agv_path in agv_paths:
                    agv_id = agv_path['agvId']
                    seg_id = agv_path['segId']
                    code_list = agv_path['codeList']
                    # æž„造标准导航请求数据(agvId, segId, codeList格式)
                    navigation_data = {
                        'agvId': agv_id,
                        'segId': seg_id,
                        'codeList': code_list
                    }
                    # æ˜¾ç¤ºè·¯å¾„摘要
                    start_code = code_list[0].get('code', '') if code_list else ''
                    end_code = code_list[-1].get('code', '') if code_list else ''
                    task_codes = [nc for nc in code_list if nc.get('taskId')]
                    task_id = task_codes[0].get('taskId', '') if task_codes else ''
                    self.logger.info(f"    AGV: {agv_id}, ä»»åŠ¡: {task_id}, è·¯å¾„: {start_code} -> {end_code} ({len(code_list)}点)")
                    # è¯¦ç»†æ˜¾ç¤ºè·¯å¾„JSON数据
                    self.logger.info(f"发送路径JSON数据:")
                    self.logger.info(json.dumps(navigation_data, ensure_ascii=False, indent=2))
                    try:
                        response = requests.post(
                            rcs_url,
                            json=navigation_data,
                            timeout=10,
                            headers={'Content-Type': 'application/json'}
                        )
                        if response.status_code == 200:
                            response_data = response.json()
                            self.logger.info(f" æˆåŠŸå‘é€è·¯å¾„åˆ°RCS - AGV: {agv_id}, å“åº”: {response_data.get('msg', 'OK')}")
                            successful_sends += 1
                        else:
                            self.logger.warning(f" å‘送路径到RCS失败 - AGV: {agv_id}, HTTP状态: {response.status_code}")
                            self.logger.warning(f"   å“åº”内容: {response.text}")
                    except Exception as e:
                        self.logger.warning(f" å‘送导航指令异常 - AGV: {agv_id}, é”™è¯¯: {e}")
                self.logger.info(f" è·¯å¾„发送完成 - æˆåŠŸ: {successful_sends}/{len(agv_paths)}")
            except Exception as e:
                self.logger.error(f" å‘送导航指令异常: {e}")
        except Exception as e:
            self.logger.error(f"发送路径到RCS异常: {e}")
    def get_monitoring_status(self) -> Dict[str, Any]:
        """获取监控服务状态"""
        return {
            'is_running': self.is_running,
            'rcs_host': self.rcs_host,
            'rcs_port': self.rcs_port,
            'poll_interval': self.poll_interval,
            'path_algorithm': self.path_algorithm,
            'auto_send_paths': self.auto_send_paths,
            'stats': self.stats.copy()
        }
    def reset_stats(self):
        """重置统计信息"""
        self.stats = {
            'poll_count': 0,
            'successful_polls': 0,
            'path_generations': 0,
            'last_poll_time': None,
            'last_generation_time': None
        }
        self.logger.info("监控统计信息已重置")
common/__init__.py
New file
@@ -0,0 +1,2 @@
# Common Components Module
__version__ = "1.0.0"
common/api_client.py
New file
@@ -0,0 +1,124 @@
"""
与RCS系统通信
"""
import requests
import json
import logging
import time
from typing import Dict, Optional, Any
from .data_models import (
    APIResponse, create_error_response, ResponseCode
)
try:
    from config.settings import (
        RCS_SERVER_HOST, RCS_SERVER_PORT,
        REQUEST_TIMEOUT, AGV_STATUS_API_ENDPOINT
    )
except ImportError:
    RCS_SERVER_HOST = "10.10.10.156"
    RCS_SERVER_PORT = 8088
    REQUEST_TIMEOUT = 30
    AGV_STATUS_API_ENDPOINT = "/api/open/algorithm/getAgv"
    logging.warning("无法从config.settings导入配置,使用默认值")
class APIClient:
    """HTTP API客户端基类"""
    def __init__(self, base_url: str, timeout: int = 30):
        """初始化API客户端"""
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.logger = logging.getLogger(__name__)
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })
    def _make_request(self, method: str, endpoint: str, data: Any = None,
                     params: Dict = None) -> APIResponse:
        """发送HTTP请求"""
        url = f"{self.base_url}{endpoint}"
        try:
            start_time = time.time()
            request_kwargs = {
                'timeout': self.timeout,
                'params': params
            }
            if data is not None:
                if isinstance(data, (dict, list)):
                    request_kwargs['json'] = data
                else:
                    request_kwargs['data'] = json.dumps(data)
            response = self.session.request(method, url, **request_kwargs)
            end_time = time.time()
            duration = (end_time - start_time) * 1000  # è½¬æ¢ä¸ºæ¯«ç§’
            self.logger.info(f"{method} {url} - {response.status_code} - {duration:.2f}ms")
            try:
                response_data = response.json()
            except ValueError:
                response_data = {"text": response.text}
            if response.status_code == 200:
                if isinstance(response_data, dict) and 'code' in response_data:
                    return APIResponse(
                        code=response_data.get('code', ResponseCode.SUCCESS),
                        msg=response_data.get('msg', '操作成功'),
                        data=response_data.get('data')
                    )
                else:
                    return APIResponse(
                        code=ResponseCode.SUCCESS,
                        msg='操作成功',
                        data=response_data
                    )
            else:
                return create_error_response(
                    ResponseCode.SERVER_ERROR,
                    f"HTTP {response.status_code}: {response.text}"
                )
        except requests.exceptions.Timeout:
            self.logger.error(f"请求超时: {url}")
            return create_error_response(ResponseCode.SERVER_ERROR, "请求超时")
        except requests.exceptions.ConnectionError:
            self.logger.error(f"连接错误: {url}")
            return create_error_response(ResponseCode.SERVER_ERROR, "连接错误")
        except Exception as e:
            self.logger.error(f"请求异常: {url} - {str(e)}")
            return create_error_response(ResponseCode.SERVER_ERROR, f"请求异常: {str(e)}")
class RCSAPIClient(APIClient):
    def __init__(self, rcs_host: str = None, rcs_port: int = None, timeout: int = None):
        """初始化RCS API客户端"""
        rcs_host = rcs_host or RCS_SERVER_HOST
        rcs_port = rcs_port or RCS_SERVER_PORT
        timeout = timeout or REQUEST_TIMEOUT
        base_url = f"http://{rcs_host}:{rcs_port}"
        super().__init__(base_url, timeout)
    def get_agv_status(self, agv_id: Optional[str] = None,
                      map_id: Optional[str] = None) -> APIResponse:
        """获取AGV状态"""
        data = {}
        if agv_id:
            data['agvId'] = agv_id
        if map_id:
            data['mapId'] = map_id
        return self._make_request('POST', AGV_STATUS_API_ENDPOINT, data=data)
common/data_models.py
New file
@@ -0,0 +1,184 @@
"""
数据模型定义
"""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import json
class AGVStatusEnum(Enum):
    # AGV状态
    IDLE = 0        # ç©ºé—²
    BUSY = 1        # å¿™ç¢Œ
    CHARGING = 2    # å……电
    ERROR = 3       # æ•…éšœ
    MAINTENANCE = 4 # ç»´æŠ¤
class TaskTypeEnum(Enum):
    # ä»»åŠ¡ç±»åž‹
    PICKUP = "1"    # å–è´§
    DELIVERY = "2"  # é€è´§
    TRANSPORT = "3" # è¿è¾“
class AGVActionTypeEnum(Enum):
    # AGV动作类型
    AVOIDANCE = "1"    # é¿è®©
    TASK = "2"         # ä»»åŠ¡
    CHARGING = "3"     # å……电
    STANDBY = "4"      # åŽ»å¾…æœºä½
@dataclass
class BackpackData:
    # èƒŒç¯“数据
    index: int                    # èƒŒç¯“编号
    loaded: bool                  # æ˜¯å¦è½½è´§
    execute: bool                 # æ˜¯å¦åœ¨æ‰§è¡Œ
    taskId: Optional[str] = None  # æ‰§è¡Œä»»åŠ¡ç¼–å·
@dataclass
class AGVStatus:
    # AGV状态
    agvId: str                          # å°è½¦ç¼–号
    status: int                         # çŠ¶æ€
    position: str                       # å°è½¦å½“前点位
    empty: int                          # ç©ºèƒŒç¯“数量
    direction: str                      # å°è½¦è§’度
    vol: int                           # ç”µåŽ‹
    error: int                         # å¼‚常码,0表示正常
    backpack: List[BackpackData] = field(default_factory=list)  # èƒŒç¯“数据
    autoCharge: int = 20               # ä½Žç”µé‡è®¾å®šé˜ˆå€¼ï¼Œä½ŽäºŽè¯¥å€¼å¯ä»¥åŽ»è‡ªåŠ¨å……ç”µä¹Ÿå¯ä»¥ç»§ç»­åšä»»åŠ¡
    lowVol: int = 10                   # æœ€ä½Žç”µé‡ï¼Œç”µé‡ä½ŽäºŽè¯¥å€¼å¿…须去充电
@dataclass
class TaskData:
    # ä»»åŠ¡æ•°æ®
    taskId: str         # ä»»åŠ¡id
    start: str          # èµ·ç‚¹
    end: str            # ç»ˆç‚¹
    type: str          # ä»»åŠ¡ç±»åž‹
    priority: int      # ä¼˜å…ˆçº§
@dataclass
class PathCode:
    # è·¯å¾„点
    code: str                          # åœ°å›¾ç‚¹ä½id
    direction: str                     # æ–¹å‘
    type: Optional[str] = None         # AGV动作类型(避让、任务、充电、去待机位)
    taskId: Optional[str] = None       # ä»»åŠ¡ç¼–å·ï¼Œå¦‚æžœæ˜¯æ‰§è¡Œä»»åŠ¡åˆ™å¿…éœ€
    posType: Optional[str] = None      # åŠ¨ä½œç±»åž‹ï¼Œè¡¨ç¤ºåˆ°è¾¾æŸä¸ªç‚¹ä½è¿›è¡Œçš„åŠ¨ä½œï¼Œå¦‚å–ã€æ”¾
    lev: Optional[int] = None          # è¡¨ç¤ºposType对应的任务所要操作的是第几个背篓
@dataclass
class PlannedPath:
    # è§„划路径
    agvId: str                      # å°è½¦ç¼–号
    codeList: List[PathCode]        # ç‚¹ä½é›†åˆ
    segId: Optional[str] = None     # å¯¼èˆªé‡å¤å‘送时的去重标识
@dataclass
class TaskAssignment:
    # ä»»åŠ¡åˆ†é…ç»“æžœ
    taskId: str        # ä»»åŠ¡ID
    agvId: str         # åˆ†é…çš„AGV ID
    lev_id: int = 0    # èƒŒç¯“位置编号
@dataclass
class APIResponse:
    # API响应格式 - å­—段顺序固定为 code, msg, data
    code: int                       # çŠ¶æ€ç 
    msg: str                        # æ¶ˆæ¯
    data: Optional[Any] = None      # æ•°æ®
    def to_ordered_dict(self) -> Dict:
        # è½¬æ¢ä¸ºæœ‰åºå­—å…¸
        from collections import OrderedDict
        return OrderedDict([
            ('code', self.code),
            ('msg', self.msg),
            ('data', self.data)
        ])
class ResponseCode:
    # å“åº”状态码
    SUCCESS = 200           # æ“ä½œæˆåŠŸ
    NO_DATA = 201          # æš‚无数据
    PARAM_EMPTY = 401      # å‚数为空
    PERMISSION_DENIED = 403 # æƒé™ä¸è¶³
    DUPLICATE_SUBMIT = 407  # è¯·å‹¿é‡å¤æäº¤
    SERVER_ERROR = 500     # æœåŠ¡å™¨é”™è¯¯
def create_success_response(data: Any = None, msg: str = "操作成功") -> APIResponse:
    # åˆ›å»ºæˆåŠŸå“åº”
    return APIResponse(code=ResponseCode.SUCCESS, msg=msg, data=data)
def create_error_response(code: int, msg: str) -> APIResponse:
    # åˆ›å»ºé”™è¯¯å“åº”
    return APIResponse(code=code, msg=msg, data=None)
def to_dict(obj) -> Dict:
    # å°†æ•°æ®ç±»è½¬æ¢ä¸ºå­—å…¸
    if hasattr(obj, '__dict__'):
        if isinstance(obj, APIResponse):
            return obj.to_ordered_dict()
        result = {}
        for key, value in obj.__dict__.items():
            if isinstance(value, list):
                result[key] = [to_dict(item) if hasattr(item, '__dict__') else item for item in value]
            elif hasattr(value, '__dict__'):
                result[key] = to_dict(value)
            else:
                result[key] = value
        return result
    return obj
def from_dict(data_class, data: Dict):
    # ä»Žå­—典创建数据类实例
    if isinstance(data, dict):
        field_types = data_class.__annotations__
        kwargs = {}
        for field_name, field_type in field_types.items():
            if field_name in data:
                value = data[field_name]
                if hasattr(field_type, '__origin__') and field_type.__origin__ is list:
                    inner_type = field_type.__args__[0]
                    if hasattr(inner_type, '__annotations__'):
                        kwargs[field_name] = [from_dict(inner_type, item) for item in value]
                    else:
                        kwargs[field_name] = value
                elif hasattr(field_type, '__origin__'):
                    import typing
                    origin = getattr(field_type, '__origin__', None)
                    if origin is typing.Union or str(origin) == 'typing.Union':
                        kwargs[field_name] = value
                    else:
                        kwargs[field_name] = value
                elif hasattr(field_type, '__annotations__'):
                    kwargs[field_name] = from_dict(field_type, value)
                else:
                    kwargs[field_name] = value
            else:
                field_info = data_class.__dataclass_fields__.get(field_name)
                if field_info and field_info.default is not field_info.default_factory:
                    pass
                elif field_info and field_info.default_factory is not field_info.default_factory:
                    pass
        return data_class(**kwargs)
    return data
common/utils.py
New file
@@ -0,0 +1,286 @@
import json
import logging
import os
import random
import time
import uuid
import string
from typing import Dict, List, Tuple, Optional, Any
from datetime import datetime
def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None) -> None:
    """设置日志配置"""
    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    formatter = logging.Formatter(log_format)
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, log_level.upper()))
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    root_logger.addHandler(console_handler)
    if log_file:
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)
def load_path_mapping(mapping_file: str = "path_mapping.json") -> Dict[str, Dict[str, int]]:
    """加载路径映射文件"""
    logger = logging.getLogger(__name__)
    try:
        with open(mapping_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
        path_mapping = {}
        if "path_id_to_coordinates" in data:
            for path_id, coordinates in data["path_id_to_coordinates"].items():
                if coordinates and len(coordinates) > 0:
                    coord = coordinates[0]
                    path_mapping[path_id] = {
                        "x": coord["x"],
                        "y": coord["y"]
                    }
        logger.info(f"成功加载路径映射,共 {len(path_mapping)} ä¸ªè·¯å¾„点")
        return path_mapping
    except FileNotFoundError:
        logger.error(f"路径映射文件不存在: {mapping_file}")
        return {}
    except json.JSONDecodeError as e:
        logger.error(f"路径映射文件格式错误: {e}")
        return {}
    except Exception as e:
        logger.error(f"加载路径映射文件失败: {e}")
        return {}
def get_coordinate_from_path_id(path_id: str, path_mapping: Dict[str, Dict[str, int]]) -> Optional[Tuple[int, int]]:
    """
    æ ¹æ®è·¯å¾„点ID获取坐标
    Args:
        path_id: è·¯å¾„点ID(支持带8位带零的格式,e.g.'00000206')
        path_mapping: è·¯å¾„映射字典
    Returns:
        Optional[Tuple[int, int]]: åæ ‡(x, y),如果找不到则返回None
    """
    logger = logging.getLogger(__name__)
    if path_id in path_mapping:
        coord = path_mapping[path_id]
        logger.debug(f"路径ID {path_id} åŒ¹é…æˆåŠŸï¼Œåæ ‡: ({coord['x']}, {coord['y']})")
        return (coord["x"], coord["y"])
    # å¦‚果匹配失败,尝试去掉零后匹配
    try:
        normalized_path_id = str(int(path_id))
        if normalized_path_id in path_mapping:
            coord = path_mapping[normalized_path_id]
            logger.debug(f"路径ID {path_id} è§„范化为 {normalized_path_id} åŽåŒ¹é…æˆåŠŸï¼Œåæ ‡: ({coord['x']}, {coord['y']})")
            return (coord["x"], coord["y"])
        else:
            logger.warning(f"规范化后的路径ID {normalized_path_id}(原始: {path_id})在路径映射中未找到")
    except (ValueError, TypeError):
        logger.warning(f"路径ID {path_id} ä¸æ˜¯æœ‰æ•ˆçš„æ•°å­—格式")
    logger.warning(f"无法找到路径ID {path_id} å¯¹åº”的坐标")
    return None
def get_path_id_from_coordinate(x: int, y: int, path_mapping: Dict[str, Dict[str, int]]) -> Optional[str]:
    """根据坐标获取路径点ID"""
    for path_id, coord in path_mapping.items():
        if coord["x"] == x and coord["y"] == y:
            return path_id
    return None
def calculate_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> float:
    """计算两点之间的欧氏距离"""
    return ((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2) ** 0.5
def calculate_manhattan_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> int:
    """计算两点之间的曼哈顿距离"""
    return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def generate_random_agv_id() -> str:
    """生成随机AGV ID"""
    return f"AGV_{random.randint(10001, 99999)}"
def generate_random_task_id() -> str:
    """生成随机任务ID"""
    timestamp = int(time.time() * 1000)  # æ¯«ç§’时间戳
    random_suffix = random.randint(100, 999)
    return f"TASK_{timestamp}_{random_suffix}"
def get_random_path_ids(path_mapping: Dict[str, Dict[str, int]], count: int = 2) -> List[str]:
    """随机获取路径点ID"""
    if not path_mapping:
        return []
    path_ids = list(path_mapping.keys())
    if len(path_ids) < count:
        return path_ids
    return random.sample(path_ids, count)
def format_timestamp(timestamp: Optional[float] = None) -> str:
    """格式化时间戳"""
    if timestamp is None:
        timestamp = time.time()
    return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def normalize_path_id(path_id: str) -> str:
    """标准化路径点ID格式"""
    if not path_id:
        return path_id
    try:
        normalized_path_id = str(int(path_id))
        return normalized_path_id
    except (ValueError, TypeError):
        return path_id
def validate_path_id(path_id: str) -> bool:
    """验证路径点ID格式"""
    try:
        # è·¯å¾„点ID应该是数字字符串,在1-1696范围内
        path_num = int(path_id)
        return 1 <= path_num <= 1696
    except (ValueError, TypeError):
        return False
def validate_agv_id(agv_id: str) -> bool:
    """验证AGV ID格式"""
    if not agv_id or not isinstance(agv_id, str):
        return False
    # AGV ID不能为空且长度合理
    return len(agv_id.strip()) > 0 and len(agv_id) <= 50
def save_json_file(data: Any, file_path: str) -> bool:
    """保存数据到JSON"""
    logger = logging.getLogger(__name__)
    try:
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        logger.info(f"数据已保存到文件: {file_path}")
        return True
    except Exception as e:
        logger.error(f"保存文件失败: {file_path} - {e}")
        return False
def load_json_file(file_path: str) -> Optional[Any]:
    """从JSON文件加载数据"""
    logger = logging.getLogger(__name__)
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        logger.info(f"数据已从文件加载: {file_path}")
        return data
    except FileNotFoundError:
        logger.warning(f"文件不存在: {file_path}")
        return None
    except json.JSONDecodeError as e:
        logger.error(f"JSON格式错误: {file_path} - {e}")
        return None
    except Exception as e:
        logger.error(f"加载文件失败: {file_path} - {e}")
        return None
def ensure_directory_exists(directory: str) -> bool:
    """确保目录存在,如果不存在则创建"""
    try:
        os.makedirs(directory, exist_ok=True)
        return True
    except Exception as e:
        logging.getLogger(__name__).error(f"创建目录失败: {directory} - {e}")
        return False
def generate_segment_id(agv_id: str, task_id: Optional[str] = None,
                       target_position: Optional[str] = None,
                       action_type: str = "2") -> str:
    """
    ç”Ÿæˆå¯¼èˆªæ®µID,用于去重标识
    åŸºäºŽAGV ID、任务ID、目标位置和动作类型生成固定的11位ID
    Args:
        agv_id: AGV ID
        task_id: ä»»åŠ¡ID
        target_position: ç›®æ ‡ä½ç½®
        action_type: åŠ¨ä½œç±»åž‹
    Returns:
        str: 11位导航段ID
    """
    import hashlib
    # æž„建用于哈希的字符串
    if task_id:
        # ä»»åŠ¡ç±»åž‹ï¼šä½¿ç”¨agv_id + task_id
        hash_input = f"{agv_id}_{task_id}_{action_type}"
    else:
        # éžä»»åŠ¡ç±»åž‹ï¼šä½¿ç”¨agv_id + target_position + action_type
        target = target_position or "unknown"
        hash_input = f"{agv_id}_{target}_{action_type}"
    # ä½¿ç”¨MD5哈希生成固定值
    hash_object = hashlib.md5(hash_input.encode())
    hash_hex = hash_object.hexdigest()
    # å°†16进制转换为数字并截取11位
    hash_int = int(hash_hex[:8], 16)
    seg_id = str(hash_int)[-11:].zfill(11)
    return seg_id
def generate_navigation_code(code: str, direction: str, action_type: str = "2",
                           task_id: Optional[str] = None, pos_type: Optional[str] = None,
                           backpack_level: Optional[int] = None, is_target_point: bool = False) -> Dict:
    """生成导航路径点代码"""
    path_code = {
        'code': code,
        'direction': direction,
        'type': action_type
    }
    if task_id and action_type == "2":
        path_code['taskId'] = task_id
    # åªæœ‰åˆ°è¾¾ç›®æ ‡ç‚¹æ—¶æ‰æ·»åŠ posType和lev,且有值时才包含字段
    if is_target_point:
        if pos_type:
            path_code['posType'] = pos_type
        if backpack_level is not None:
            path_code['lev'] = backpack_level
    return path_code
config/environment.py
New file
@@ -0,0 +1,360 @@
"""
环境配置模块 - è´Ÿè´£å®šä¹‰åŠ è½½ä»“åº“çŽ¯å¢ƒé…ç½®
"""
import json
from enum import Enum, auto
import numpy as np
from typing import Dict, List, Tuple, Optional, Union
class CellType(Enum):
    """单元格类型枚举"""
    EMPTY = auto()  # ç©ºå•元格
    STATION = auto()  # ç«™ç‚¹
    PATH = auto()  # è·¯å¾„
    STORAGE = auto()  # ä»“储区域
    OBSTACLE = auto()  # éšœç¢ç‰©
    LOADING = auto()  # è£…载区
    UNLOADING = auto()  # å¸è½½åŒº
class EnvironmentConfig:
    """环境配置类"""
    def __init__(self, width: int, height: int):
        """初始化环境配置"""
        self.width = width
        self.height = height
        # åˆå§‹åŒ–所有单元格为空
        self.grid = np.full((height, width), CellType.EMPTY)
        # ç«™ç‚¹ä½ç½®åŠå…¶é…ç½®
        self.stations = {}  # æ ¼å¼: {(x, y): {"capacity": 4, "load_position": (x-1, y), "unload_position": (x+1, y)}}
        # è·¯å¾„点位置
        self.paths = set()  # æ ¼å¼: {(x1, y1), (x2, y2), ...}
        # ä»“储区域位置
        self.storage_areas = set()  # æ ¼å¼: {(x1, y1), (x2, y2), ...}
        # è£…卸区位置
        self.loading_areas = set()  # æ ¼å¼: {(x1, y1), (x2, y2), ...}
        self.unloading_areas = set()  # æ ¼å¼: {(x1, y1), (x2, y2), ...}
        # éšœç¢ç‰©ä½ç½®
        self.obstacles = set()  # æ ¼å¼: {(x1, y1), (x2, y2), ...}
        # ç½‘格点邻接表(用于路径规划)
        self.adjacency_list = {}  # æ ¼å¼: {(x1, y1): {(x2, y2): distance, ...}, ...}
    def set_cell_type(self, x: int, y: int, cell_type: CellType) -> bool:
        """
        è®¾ç½®å•元格类型
        Args:
            x: x坐标
            y: y坐标
            cell_type: å•元格类型
        Returns:
            bool: è®¾ç½®æ˜¯å¦æˆåŠŸ
        """
        if 0 <= x < self.width and 0 <= y < self.height:
            self.grid[y, x] = cell_type
            # æ ¹æ®å•元格类型更新相应的集合
            pos = (x, y)
            if cell_type == CellType.PATH:
                self.paths.add(pos)
            elif cell_type == CellType.STATION:
                if pos not in self.stations:
                    self.stations[pos] = {
                        "capacity": 4,  # é»˜è®¤å®¹é‡
                        "load_position": (x-1, y),  # é»˜è®¤åŠ è½½ä½ç½®
                        "unload_position": (x+1, y)  # é»˜è®¤å¸è½½ä½ç½®
                    }
            elif cell_type == CellType.STORAGE:
                self.storage_areas.add(pos)
            elif cell_type == CellType.LOADING:
                self.loading_areas.add(pos)
            elif cell_type == CellType.UNLOADING:
                self.unloading_areas.add(pos)
            elif cell_type == CellType.OBSTACLE:
                self.obstacles.add(pos)
            return True
        return False
    def add_station(self, x: int, y: int, capacity: int = 4,
                   load_position: Tuple[int, int] = None,
                   unload_position: Tuple[int, int] = None) -> bool:
        """
        æ·»åŠ ç«™ç‚¹
        Args:
            x: ç«™ç‚¹x坐标
            y: ç«™ç‚¹y坐标
            capacity: ç«™ç‚¹å®¹é‡
            load_position: åŠ è½½ä½ç½®
            unload_position: å¸è½½ä½ç½®
        Returns:
            bool: æ·»åŠ æ˜¯å¦æˆåŠŸ
        """
        if 0 <= x < self.width and 0 <= y < self.height:
            # è®¾ç½®é»˜è®¤çš„装卸位置
            if load_position is None:
                load_position = (x-1, y)  # å·¦ä¾§ä¸ºå¸æ–™ç®±ä½ç½®
            if unload_position is None:
                unload_position = (x+1, y)  # å³ä¾§ä¸ºå–料箱位置
            # æ›´æ–°ç½‘格和站点信息
            self.grid[y, x] = CellType.STATION
            self.stations[(x, y)] = {
                "capacity": capacity,
                "load_position": load_position,
                "unload_position": unload_position
            }
            # ç¡®ä¿è£…卸位置也被标记
            self.set_cell_type(load_position[0], load_position[1], CellType.LOADING)
            self.set_cell_type(unload_position[0], unload_position[1], CellType.UNLOADING)
            return True
        return False
    def add_path(self, x: int, y: int) -> bool:
        """
        æ·»åŠ è·¯å¾„ç‚¹
        Args:
            x: x坐标
            y: y坐标
        Returns:
            bool: æ·»åŠ æ˜¯å¦æˆåŠŸ
        """
        return self.set_cell_type(x, y, CellType.PATH)
    def add_storage_area(self, x: int, y: int) -> bool:
        """
        æ·»åŠ ä»“å‚¨åŒºåŸŸ
        Args:
            x: x坐标
            y: y坐标
        Returns:
            bool: æ·»åŠ æ˜¯å¦æˆåŠŸ
        """
        return self.set_cell_type(x, y, CellType.STORAGE)
    def add_obstacle(self, x: int, y: int) -> bool:
        """
        æ·»åŠ éšœç¢ç‰©
        Args:
            x: x坐标
            y: y坐标
        Returns:
            bool: æ·»åŠ æ˜¯å¦æˆåŠŸ
        """
        return self.set_cell_type(x, y, CellType.OBSTACLE)
    def is_valid_position(self, x: int, y: int) -> bool:
        """
        æ£€æŸ¥ä½ç½®æ˜¯å¦åˆæ³•(在网格内且非障碍物)
        Args:
            x: x坐标
            y: y坐标
        Returns:
            bool: ä½ç½®æ˜¯å¦åˆæ³•
        """
        return (0 <= x < self.width and
                0 <= y < self.height and
                self.grid[y, x] != CellType.OBSTACLE)
    def build_adjacency_list(self):
        """构建网格点邻接表(用于路径规划)"""
        # æ¸…空现有邻接表
        self.adjacency_list = {}
        # AGV进行四方向移动
        directions = [
            (0, 1),   # ä¸Š
            (1, 0),   # å³
            (0, -1),  # ä¸‹
            (-1, 0)   # å·¦
        ]
        # ä¸ºæ¯ä¸ªè·¯å¾„点构建邻接表
        for x, y in self.paths:
            if (x, y) not in self.adjacency_list:
                self.adjacency_list[(x, y)] = {}
            # æ£€æŸ¥å››ä¸ªæ–¹å‘的邻居
            for dx, dy in directions:
                nx, ny = x + dx, y + dy
                # æ£€æŸ¥ç›¸é‚»ç‚¹æ˜¯å¦ä¸ºæœ‰æ•ˆè·¯å¾„点
                if (nx, ny) in self.paths:
                    # ç§»åŠ¨è·ç¦»ç»Ÿä¸€ä¸º1.0
                    distance = 1.0
                    self.adjacency_list[(x, y)][(nx, ny)] = distance
        # æ·»åŠ ç«™ç‚¹çš„è£…å¸ç‚¹åˆ°é‚»æŽ¥è¡¨
        for station_pos, station_info in self.stations.items():
            load_pos = station_info["load_position"]
            unload_pos = station_info["unload_position"]
            # ç¡®ä¿è£…卸点已添加到邻接表
            if load_pos not in self.adjacency_list:
                self.adjacency_list[load_pos] = {}
            if unload_pos not in self.adjacency_list:
                self.adjacency_list[unload_pos] = {}
            # è¿žæŽ¥è£…卸点到最近的路径点
            for pos in [load_pos, unload_pos]:
                # æ‰¾åˆ°æœ€è¿‘的路径点并连接
                min_dist = float('inf')
                nearest_path = None
                for path_pos in self.paths:
                    dist = ((pos[0] - path_pos[0]) ** 2 + (pos[1] - path_pos[1]) ** 2) ** 0.5
                    if dist < min_dist:
                        min_dist = dist
                        nearest_path = path_pos
                if nearest_path:
                    self.adjacency_list[pos][nearest_path] = min_dist
                    self.adjacency_list[nearest_path][pos] = min_dist
    def save_to_file(self, filepath: str):
        """
        å°†çŽ¯å¢ƒé…ç½®ä¿å­˜åˆ°æ–‡ä»¶
        Args:
            filepath: æ–‡ä»¶è·¯å¾„
        """
        config_data = {
            "width": self.width,
            "height": self.height,
            "stations": {f"{x},{y}": info for (x, y), info in self.stations.items()},
            "paths": [{"x": x, "y": y} for x, y in self.paths],
            "storage_areas": [{"x": x, "y": y} for x, y in self.storage_areas],
            "loading_areas": [{"x": x, "y": y} for x, y in self.loading_areas],
            "unloading_areas": [{"x": x, "y": y} for x, y in self.unloading_areas],
            "obstacles": [{"x": x, "y": y} for x, y in self.obstacles]
        }
        with open(filepath, 'w') as f:
            json.dump(config_data, f, indent=2)
    @classmethod
    def load_from_file(cls, filepath: str) -> 'EnvironmentConfig':
        """
        ä»Žæ–‡ä»¶åŠ è½½çŽ¯å¢ƒé…ç½®
        Args:
            filepath: æ–‡ä»¶è·¯å¾„
        Returns:
            EnvironmentConfig: çŽ¯å¢ƒé…ç½®å¯¹è±¡
        """
        with open(filepath, 'r') as f:
            config_data = json.load(f)
        env_config = cls(config_data["width"], config_data["height"])
        # åŠ è½½ç«™ç‚¹
        for pos_str, info in config_data["stations"].items():
            x, y = map(int, pos_str.split(','))
            load_x, load_y = info["load_position"]
            unload_x, unload_y = info["unload_position"]
            env_config.add_station(x, y, info["capacity"],
                                  (load_x, load_y), (unload_x, unload_y))
        # åŠ è½½è·¯å¾„
        for path in config_data["paths"]:
            env_config.add_path(path["x"], path["y"])
        # åŠ è½½ä»“å‚¨åŒºåŸŸ
        for area in config_data["storage_areas"]:
            env_config.add_storage_area(area["x"], area["y"])
        # åŠ è½½éšœç¢ç‰©
        for obstacle in config_data["obstacles"]:
            env_config.add_obstacle(obstacle["x"], obstacle["y"])
        # æž„建邻接表
        env_config.build_adjacency_list()
        return env_config
    def create_default_environment(self) -> 'EnvironmentConfig':
        """
        åˆ›å»ºé»˜è®¤çŽ¯å¢ƒé…ç½®ï¼ˆæŒ‰ç…§éœ€æ±‚æè¿°é…ç½®ç«™ç‚¹å’Œè·¯å¾„ï¼‰
        Returns:
            EnvironmentConfig: çŽ¯å¢ƒé…ç½®å¯¹è±¡
        """
        # æ¸…空当前配置
        self.grid = np.full((self.height, self.width), CellType.EMPTY)
        self.stations = {}
        self.paths = set()
        self.storage_areas = set()
        self.loading_areas = set()
        self.unloading_areas = set()
        self.obstacles = set()
        # é…ç½®20个站点
        station_y = self.height - 5
        station_spacing = self.width // 25  # ç«™ç‚¹é—´è·
        for i in range(20):
            station_x = (i + 1) * station_spacing
            self.add_station(station_x, station_y)
        # ä»“储区域
        storage_start_y = 5
        storage_end_y = station_y - 10
        for col in range(5):
            col_x = (col + 1) * (self.width // 6)
            for row_y in range(storage_start_y, storage_end_y, 5):
                for dx in range(-1, 2):
                    for dy in range(-1, 2):
                        self.add_storage_area(col_x + dx, row_y + dy)
        # é…ç½®è·¯å¾„
        # æ°´å¹³ä¸»è·¯å¾„
        for x in range(5, self.width - 5):
            # ç«™ç‚¹å‰æ°´å¹³è·¯å¾„
            self.add_path(x, station_y - 5)
            # è´§æž¶åŒºåŸŸæ°´å¹³è·¯å¾„
            for path_y in range(10, storage_end_y, 10):
                self.add_path(x, path_y)
        # åž‚直连接路径
        for col in range(7):
            path_x = col * (self.width // 7)
            for y in range(5, station_y):
                self.add_path(path_x, y)
        # æž„建邻接表
        self.build_adjacency_list()
        return self
# åˆ›å»ºå’Œåˆå§‹åŒ–默认环境的辅助函数
def create_default_environment(width=100, height=100) -> EnvironmentConfig:
    """
    åˆ›å»ºé»˜è®¤çŽ¯å¢ƒé…ç½®
    Args:
        width: çŽ¯å¢ƒå®½åº¦
        height: çŽ¯å¢ƒé«˜åº¦
    Returns:
        EnvironmentConfig: é»˜è®¤çŽ¯å¢ƒé…ç½®
    """
    env_config = EnvironmentConfig(width, height)
    return env_config.create_default_environment()
config/settings.py
New file
@@ -0,0 +1,27 @@
"""
全局设置和参数配置 - ä»…保留实际使用的配置项
"""
# RCS服务器配置
RCS_SERVER_HOST = "10.10.10.156"
RCS_SERVER_PORT = 8088
# ç®—法服务器配置
ALGORITHM_SERVER_HOST = "10.10.10.239"
ALGORITHM_SERVER_PORT = 8002
# è·¯å¾„监控服务配置
MONITOR_POLLING_INTERVAL = 5.0  # è·¯å¾„监控轮询间隔
# API配置
AGV_STATUS_API_ENDPOINT = "/api/open/algorithm/getAgv"  # AGV状态查询Url
# AGV配置
DEFAULT_AGV_COUNT = 50  # é»˜è®¤AGV数量
# ç½‘络配置
REQUEST_TIMEOUT = 30  # HTTP请求超时时间(秒)
# æ—¥å¿—配置
LOG_LEVEL = "INFO"
LOG_FILE = "warehouse_system.log"  # æ—¥å¿—文件路径
environment.json
New file
Diff too large
path_mapping.json
New file
Diff too large
run_algorithm.py
New file
@@ -0,0 +1,216 @@
#!/usr/bin/env python3
import sys
import os
import signal
import logging
from typing import Optional
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from common.utils import setup_logging
from config.settings import (
    ALGORITHM_SERVER_HOST, ALGORITHM_SERVER_PORT, LOG_LEVEL, LOG_FILE,
    RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL
)
from algorithm_system.algorithm_server import AlgorithmServer
class AlgorithmApplication:
    def __init__(self, host: str = None, port: int = None, log_level: str = None,
                 enable_path_monitor: bool = True, monitor_interval: float = None,
                 rcs_host: str = None, rcs_port: int = None):
        # Initialize algorithm system application
        self.host = host or ALGORITHM_SERVER_HOST
        self.port = port or ALGORITHM_SERVER_PORT
        self.log_level = log_level or LOG_LEVEL
        self.enable_path_monitor = enable_path_monitor
        self.monitor_interval = monitor_interval or MONITOR_POLLING_INTERVAL
        self.rcs_host = rcs_host or RCS_SERVER_HOST
        self.rcs_port = rcs_port or RCS_SERVER_PORT
        setup_logging(self.log_level, LOG_FILE)
        self.logger = logging.getLogger(__name__)
        self.server: Optional[AlgorithmServer] = None
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
    def signal_handler(self, signum, frame):
        # Handle system signals, shutdown program
        self.logger.info(f"Received signal {signum}, shutting down algorithm system...")
        self.shutdown()
        sys.exit(0)
    def start_server(self):
        # Start algorithm server
        try:
            self.logger.info("Initializing algorithm system server...")
            # Set RCS connection configuration
            os.environ['RCS_SERVER_HOST'] = self.rcs_host
            os.environ['RCS_SERVER_PORT'] = str(self.rcs_port)
            self.server = AlgorithmServer(self.host, self.port, self.enable_path_monitor, self.monitor_interval)
            status = self.server.get_server_status()
            self.logger.info("Algorithm system server status:")
            self.logger.info(f"  - Host: {status['host']}")
            self.logger.info(f"  - Port: {status['port']}")
            self.logger.info(f"  - Path mapping loaded: {status['path_mapping_loaded']}")
            self.logger.info(f"  - AGV count: {status['agv_count']}")
            self.logger.info(f"  - Task allocation algorithm: {status['algorithms']['task_allocation']}")
            self.logger.info(f"  - Path planning algorithm: {status['algorithms']['path_planning']}")
            self.logger.info("Algorithm system server initialization completed")
            self.server.start_server()
        except KeyboardInterrupt:
            self.logger.info("Received interrupt signal, shutting down algorithm server...")
        except Exception as e:
            self.logger.error(f"Algorithm server startup failed: {e}")
            raise
        finally:
            self.shutdown()
    def shutdown(self):
        # Shutdown algorithm system components
        self.logger.info("Shutting down algorithm system...")
        if self.server:
            try:
                self.server.stop_server()
            except Exception as e:
                self.logger.error(f"Failed to shutdown server: {e}")
        self.logger.info("Algorithm system shutdown completed")
def main():
    # Main function entry point
    import argparse
    parser = argparse.ArgumentParser(description="Algorithm System - Provides task allocation and path planning services")
    parser.add_argument(
        "--host",
        default=ALGORITHM_SERVER_HOST,
        help=f"Algorithm server host address (default: {ALGORITHM_SERVER_HOST})"
    )
    parser.add_argument(
        "--port",
        type=int,
        default=ALGORITHM_SERVER_PORT,
        help=f"Algorithm server port (default: {ALGORITHM_SERVER_PORT})"
    )
    parser.add_argument(
        "--log-level",
        choices=["DEBUG", "INFO", "WARNING", "ERROR"],
        default="DEBUG",
        help=f"Log level (default: DEBUG)"
    )
    parser.add_argument(
        "--task-algorithm",
        choices=["NEAREST_FIRST", "LOAD_BALANCED", "PRIORITY_FIRST", "MULTI_OBJECTIVE"],
        default="LOAD_BALANCED",
        help="Task allocation algorithm (default: LOAD_BALANCED)"
    )
    parser.add_argument(
        "--path-algorithm",
        choices=["A_STAR", "DIJKSTRA", "GREEDY"],
        default="DIJKSTRA",
        help="Path planning algorithm (default: DIJKSTRA)"
    )
    parser.add_argument(
        "--enable-path-monitor",
        action="store_true",
        default=True,
        help="Enable path monitoring service (default: True)"
    )
    parser.add_argument(
        "--disable-path-monitor",
        action="store_true",
        help="Disable path monitoring service"
    )
    parser.add_argument(
        "--monitor-interval",
        type=float,
        default=MONITOR_POLLING_INTERVAL,
        help=f"Path monitoring polling interval in seconds (default: {MONITOR_POLLING_INTERVAL})"
    )
    parser.add_argument(
        "--rcs-host",
        default=RCS_SERVER_HOST,
        help=f"RCS server host address (default: {RCS_SERVER_HOST})"
    )
    parser.add_argument(
        "--rcs-port",
        type=int,
        default=RCS_SERVER_PORT,
        help=f"RCS server port (default: {RCS_SERVER_PORT})"
    )
    args = parser.parse_args()
    # Handle path monitoring switch
    enable_path_monitor = args.enable_path_monitor and not args.disable_path_monitor
    app = AlgorithmApplication(
        host=args.host,
        port=args.port,
        log_level=args.log_level,
        enable_path_monitor=enable_path_monitor,
        monitor_interval=args.monitor_interval,
        rcs_host=args.rcs_host,
        rcs_port=args.rcs_port
    )
    try:
        print("=" * 60)
        print("CTU Warehouse Management System - Algorithm System")
        print("=" * 60)
        print(f"Server Address: http://{args.host}:{args.port}")
        print(f"Task Allocation Algorithm: {args.task_algorithm}")
        print(f"Path Planning Algorithm: {args.path_algorithm}")
        print(f"Log Level: {args.log_level}")
        print(f"Path Monitoring Service: {'Enabled' if enable_path_monitor else 'Disabled'}")
        if enable_path_monitor:
            print(f"Monitor Polling Interval: {args.monitor_interval}s")
            print(f"RCS Server Address: {args.rcs_host}:{args.rcs_port}")
        print("=" * 60)
        print("Available APIs:")
        print(f"  POST http://{args.host}:{args.port}/open/task/send/v1")
        print(f"       - Task allocation API")
        print(f"       - Supports new format: {{\"tasks\": [...], \"agvStatus\": [...]}}")
        print(f"       - Compatible with old format: [task1, task2, ...]")
        print(f"  POST http://{args.host}:{args.port}/open/path/plan/v1")
        print(f"       - Path planning API")
        print(f"  POST http://{args.host}:{args.port}/open/path/batch/plan/v1")
        print(f"       - Batch path planning API")
        print(f"  POST http://{args.host}:{args.port}/open/algorithm/config/v1")
        print(f"       - Algorithm configuration API")
        if enable_path_monitor:
            print(f"  POST http://{args.host}:{args.port}/monitor/path/start/v1")
            print(f"       - Start path monitoring service")
            print(f"  POST http://{args.host}:{args.port}/monitor/path/stop/v1")
            print(f"       - Stop path monitoring service")
            print(f"  GET  http://{args.host}:{args.port}/monitor/path/status/v1")
            print(f"       - Path monitoring service status query")
        print(f"  GET  http://{args.host}:{args.port}/health")
        print(f"       - Health check API")
        print("=" * 60)
        print("Press Ctrl+C to stop server")
        print()
        app.start_server()
    except KeyboardInterrupt:
        print("\nReceived interrupt signal, shutting down...")
    except Exception as e:
        print(f"Startup failed: {e}")
        sys.exit(1)
if __name__ == "__main__":
    main()