"""
|
路径规划算法
|
"""
|
import heapq
|
import random
|
import math
|
import time
|
import logging
|
from typing import Dict, List, Tuple, Optional, Set, Any
|
from collections import defaultdict, deque
|
|
from common.data_models import PlannedPath, PathCode, AGVStatus, BackpackData
|
from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance, generate_segment_id, generate_navigation_code
|
from algorithm_system.algorithms.collision_detection import CollisionDetector, CollisionResolver
|
|
|
class ExecutingTaskExtractor:
|
"""执行中任务提取器"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
|
"""
|
初始化任务提取器
|
|
Args:
|
path_mapping: 路径点映射字典
|
"""
|
self.path_mapping = path_mapping
|
self.logger = logging.getLogger(__name__)
|
|
# 预计算坐标映射
|
self.coord_to_code = {(data['x'], data['y']): code for code, data in path_mapping.items()}
|
|
# 预分类位置类型
|
self.pickup_positions = [pos for pos in path_mapping.keys() if pos.startswith("1")]
|
self.charging_positions = [pos for pos in path_mapping.keys() if pos.startswith("2")]
|
self.delivery_positions = [pos for pos in path_mapping.keys() if pos.startswith("3")]
|
self.standby_positions = [pos for pos in path_mapping.keys() if pos.startswith("4")]
|
|
def extract_optimized_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
|
"""智能提取AGV执行任务"""
|
executing_tasks = []
|
|
for agv_status in agv_status_list:
|
agv_id = agv_status.agvId
|
current_position = agv_status.position
|
|
if not agv_status.backpack:
|
continue
|
|
optimized_task = self._analyze_all_backpack_tasks(
|
agv_id, current_position, agv_status.backpack, agv_status
|
)
|
|
if optimized_task:
|
executing_tasks.append(optimized_task)
|
|
self.logger.info(f"智能分析 {len(executing_tasks)} 个AGV的背篓任务并生成最优路径")
|
return executing_tasks
|
|
def _analyze_all_backpack_tasks(self, agv_id: str, current_position: str,
|
backpack: List[BackpackData], agv_status: AGVStatus) -> Optional[Dict]:
|
"""
|
分析所有背篓任务,确定最优执行策略
|
"""
|
# 1. 提取并分类所有有效任务
|
loaded_tasks = [] # 已装载的任务
|
unloaded_tasks = [] # 未装载的任务
|
all_tasks = [] # 所有任务
|
|
self.logger.debug(f"[AGV {agv_id}] 开始分析背篓任务,位置: {current_position}")
|
|
for i, bp in enumerate(backpack):
|
self.logger.debug(f"[AGV {agv_id}] 背篓位置{i+1}: taskId={bp.taskId}, loaded={bp.loaded}, execute={bp.execute}")
|
|
if not bp.taskId:
|
self.logger.debug(f"[AGV {agv_id}] 背篓位置{i+1}: 跳过(无任务ID)")
|
continue
|
|
# 尝试解析任务详情
|
task_info = self._parse_task_details(bp.taskId, current_position)
|
if not task_info:
|
self.logger.warning(f"[AGV {agv_id}] 任务 {bp.taskId} 解析失败,跳过")
|
continue
|
|
task_data = {
|
'backpack_item': bp,
|
'task_info': task_info,
|
'distance_to_start': task_info.get('distance_to_start', float('inf')),
|
'distance_to_end': task_info.get('distance_to_end', float('inf'))
|
}
|
|
all_tasks.append(task_data)
|
|
if bp.loaded:
|
loaded_tasks.append(task_data)
|
self.logger.debug(f"[AGV {agv_id}] 任务 {bp.taskId} 已装载,距离终点: {task_info.get('distance_to_end', 'N/A')}")
|
else:
|
unloaded_tasks.append(task_data)
|
self.logger.debug(f"[AGV {agv_id}] 任务 {bp.taskId} 未装载,距离起点: {task_info.get('distance_to_start', 'N/A')}")
|
|
self.logger.debug(f"[AGV {agv_id}] 任务分析结果: 总任务={len(all_tasks)}, 已装载={len(loaded_tasks)}, 未装载={len(unloaded_tasks)}")
|
|
# 2. 如果没有任何任务,返回None
|
if not all_tasks:
|
self.logger.debug(f"[AGV {agv_id}] 没有有效任务,返回None")
|
return None
|
|
# 3. 决策最优路径策略
|
optimal_strategy = self._determine_next_best_action(
|
loaded_tasks, unloaded_tasks, current_position
|
)
|
|
if not optimal_strategy:
|
self.logger.debug(f"[AGV {agv_id}] 决策返回None,没有最优策略")
|
return None
|
|
self.logger.debug(f"[AGV {agv_id}] 决策完成: {optimal_strategy['strategy']} - {optimal_strategy['action']}")
|
self.logger.debug(f"[AGV {agv_id}] 目标位置: {optimal_strategy['target_position']}")
|
self.logger.debug(f"[AGV {agv_id}] 决策原因: {optimal_strategy.get('reason', 'N/A')}")
|
|
# 4. 构建任务信息
|
result = {
|
'agvId': agv_id,
|
'currentPosition': current_position,
|
'strategy': optimal_strategy['strategy'],
|
'next_action': optimal_strategy['action'],
|
'target_position': optimal_strategy['target_position'],
|
'primary_task': optimal_strategy['selected_task'],
|
'all_loaded_tasks': loaded_tasks,
|
'all_unloaded_tasks': unloaded_tasks,
|
'total_tasks_count': len(all_tasks),
|
'estimated_efficiency': optimal_strategy.get('efficiency_score', 0),
|
'decision_reason': optimal_strategy.get('reason', ''),
|
'agvDirection': agv_status.direction,
|
'agvVoltage': agv_status.vol,
|
'agvError': agv_status.error
|
}
|
|
self.logger.debug(f"[AGV {agv_id}] 最终任务信息构建完成,返回结果")
|
return result
|
|
def _determine_next_best_action(self, loaded_tasks: List[Dict],
|
unloaded_tasks: List[Dict], current_position: str) -> Optional[Dict]:
|
"""
|
确定AGV下一步最优行动
|
|
Args:
|
loaded_tasks: 已装载的任务
|
unloaded_tasks: 未装载的任务
|
current_position: 当前位置
|
|
Returns:
|
Optional[Dict]: 最优行动策略
|
"""
|
# 默认策略:综合优化决策
|
# 1. 如果有已装载任务,优先送货
|
if loaded_tasks:
|
nearest_delivery = min(loaded_tasks, key=lambda t: t['distance_to_end'])
|
|
# 2. 如果也有未装载任务,比较距离决策
|
if unloaded_tasks:
|
nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start'])
|
|
# 综合优化策略:如果取货点显著更近,优先取货;否则优先送货
|
if nearest_pickup['distance_to_start'] * 1.5 < nearest_delivery['distance_to_end']:
|
return {
|
'strategy': 'comprehensive_optimization',
|
'action': 'pickup',
|
'selected_task': nearest_pickup,
|
'target_position': nearest_pickup['task_info']['start_position'],
|
'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position),
|
'reason': f"综合优化:取货点({nearest_pickup['distance_to_start']})比送货点({nearest_delivery['distance_to_end']})显著更近"
|
}
|
else:
|
return {
|
'strategy': 'comprehensive_optimization',
|
'action': 'deliver',
|
'selected_task': nearest_delivery,
|
'target_position': nearest_delivery['task_info']['end_position'],
|
'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position),
|
'reason': f"综合优化:优先送达已装载货物,送货距离{nearest_delivery['distance_to_end']}"
|
}
|
else:
|
# 只有已装载任务,直接送货
|
return {
|
'strategy': 'comprehensive_optimization',
|
'action': 'deliver',
|
'selected_task': nearest_delivery,
|
'target_position': nearest_delivery['task_info']['end_position'],
|
'efficiency_score': self._calculate_delivery_efficiency(nearest_delivery, current_position),
|
'reason': f"综合优化:送达唯一已装载货物,距离{nearest_delivery['distance_to_end']}"
|
}
|
|
# 3. 如果只有未装载任务,取货
|
elif unloaded_tasks:
|
nearest_pickup = min(unloaded_tasks, key=lambda t: t['distance_to_start'])
|
return {
|
'strategy': 'comprehensive_optimization',
|
'action': 'pickup',
|
'selected_task': nearest_pickup,
|
'target_position': nearest_pickup['task_info']['start_position'],
|
'efficiency_score': self._calculate_pickup_efficiency(nearest_pickup, current_position),
|
'reason': f"综合优化:取货唯一未装载任务,距离{nearest_pickup['distance_to_start']}"
|
}
|
|
# 4. 没有任务
|
return None
|
|
def _parse_task_details(self, task_id: str, current_position: str) -> Optional[Dict]:
|
"""
|
解析任务详细信息,包括起终点和距离
|
|
Args:
|
task_id: 任务ID
|
current_position: 当前位置
|
|
Returns:
|
Optional[Dict]: 任务详细信息
|
"""
|
self.logger.debug(f"开始解析任务 {task_id},当前位置: {current_position}")
|
|
start_pos, end_pos = self._fast_parse_task_start_end(task_id)
|
|
if not start_pos or not end_pos:
|
self.logger.warning(f"任务 {task_id} 解析失败: start_pos={start_pos}, end_pos={end_pos}")
|
return None
|
|
self.logger.debug(f"任务 {task_id} 解析成功: start={start_pos}, end={end_pos}")
|
|
# 计算距离
|
current_coord = get_coordinate_from_path_id(current_position, self.path_mapping)
|
start_coord = get_coordinate_from_path_id(start_pos, self.path_mapping)
|
end_coord = get_coordinate_from_path_id(end_pos, self.path_mapping)
|
|
if not all([current_coord, start_coord, end_coord]):
|
return None
|
|
distance_to_start = calculate_manhattan_distance(current_coord, start_coord)
|
distance_to_end = calculate_manhattan_distance(current_coord, end_coord)
|
|
return {
|
'task_id': task_id,
|
'start_position': start_pos,
|
'end_position': end_pos,
|
'distance_to_start': distance_to_start,
|
'distance_to_end': distance_to_end,
|
'start_coord': start_coord,
|
'end_coord': end_coord
|
}
|
|
def _calculate_delivery_efficiency(self, task_data: Dict, current_position: str) -> float:
|
"""
|
计算送货效率分数
|
|
Args:
|
task_data: 任务数据
|
current_position: 当前位置
|
|
Returns:
|
float: 效率分数 (0-10)
|
"""
|
distance = task_data['distance_to_end']
|
|
# 距离越近效率越高
|
distance_score = max(0, 10 - distance / 5)
|
|
# 已装载的任务有额外奖励
|
loaded_bonus = 3.0 if task_data['backpack_item'].loaded else 0
|
|
return distance_score + loaded_bonus
|
|
def _calculate_pickup_efficiency(self, task_data: Dict, current_position: str) -> float:
|
"""
|
计算取货效率分数
|
|
Args:
|
task_data: 任务数据
|
current_position: 当前位置
|
|
Returns:
|
float: 效率分数 (0-10)
|
"""
|
distance_to_start = task_data['distance_to_start']
|
|
# 距离越近效率越高
|
distance_score = max(0, 10 - distance_to_start / 3)
|
|
# 很近的取货点有高奖励
|
if distance_to_start <= 3:
|
distance_score += 5.0
|
elif distance_to_start <= 5:
|
distance_score += 2.0
|
|
return distance_score
|
|
def _fast_parse_task_start_end(self, task_id: str) -> Tuple[Optional[str], Optional[str]]:
|
"""
|
解析任务起点和终点
|
支持两种格式:
|
1. 标准格式:xxx_start_end
|
2. 简单格式:数字ID(通过hash分配位置)
|
|
Args:
|
task_id: 任务ID
|
|
Returns:
|
Tuple[Optional[str], Optional[str]]: (起点, 终点)
|
"""
|
self.logger.debug(f"解析任务ID: {task_id}")
|
|
# 方法1:尝试标准格式解析(包含下划线)
|
if "_" in task_id:
|
parts = task_id.split("_")
|
self.logger.debug(f"任务ID包含下划线,分割结果: {parts}")
|
|
if len(parts) >= 3:
|
potential_start = parts[-2]
|
potential_end = parts[-1]
|
|
start_valid = potential_start in self.path_mapping
|
end_valid = potential_end in self.path_mapping
|
|
self.logger.debug(f"标准格式解析结果: start={potential_start}(valid={start_valid}), end={potential_end}(valid={end_valid})")
|
|
if start_valid and end_valid:
|
return potential_start, potential_end
|
|
self.logger.debug(f"任务ID分割后部分数量不足: {len(parts)} < 3")
|
|
# 方法2:简单数字ID的备选机制(通过hash分配位置)
|
self.logger.debug(f"尝试简单数字ID备选解析机制")
|
|
# 获取所有可用位置
|
all_positions = list(self.path_mapping.keys())
|
if len(all_positions) < 2:
|
self.logger.warning(f"路径映射位置不足,无法为任务分配起终点")
|
return None, None
|
|
# 使用任务ID的hash值来分配起点和终点
|
try:
|
task_hash = abs(hash(task_id))
|
|
# 分配起点(使用hash的前一半)
|
start_index = task_hash % len(all_positions)
|
start_pos = all_positions[start_index]
|
|
# 分配终点(使用hash的后一半,确保与起点不同)
|
end_index = (task_hash // len(all_positions)) % len(all_positions)
|
if end_index == start_index:
|
end_index = (end_index + 1) % len(all_positions)
|
end_pos = all_positions[end_index]
|
|
self.logger.debug(f"简单ID备选解析成功: 任务{task_id} -> start={start_pos}, end={end_pos}")
|
return start_pos, end_pos
|
|
except Exception as e:
|
self.logger.error(f"简单ID备选解析失败: {e}")
|
|
self.logger.warning(f"任务ID {task_id} 所有解析方法都失败,返回 None, None")
|
return None, None
|
|
def _fast_parse_target(self, task_id: str, current_position: str, loaded: bool) -> Optional[str]:
|
"""
|
解析任务目标位置
|
|
Args:
|
task_id: 任务ID
|
current_position: 当前位置
|
loaded: 是否已装载
|
|
Returns:
|
Optional[str]: 目标位置码
|
"""
|
_, task_end = self._fast_parse_task_start_end(task_id)
|
if task_end:
|
return task_end
|
|
# 基于hash值和装载状态快速选择
|
hash_val = hash(task_id) % 1000
|
|
if loaded:
|
# 已装载,选择出库区域
|
target_positions = [pos for pos in self.delivery_positions if pos != current_position]
|
if not target_positions:
|
target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
|
else:
|
# 未装载,选择仓库区域
|
target_positions = [pos for pos in self.pickup_positions if pos != current_position]
|
if not target_positions:
|
target_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
|
|
if target_positions:
|
return target_positions[hash_val % len(target_positions)]
|
|
return None
|
|
def extract_executing_tasks(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
|
"""提取所有AGV正在执行的任务"""
|
self.logger.debug(f"开始转换任务为标准格式,AGV数量: {len(agv_status_list)}")
|
|
optimized_tasks = self.extract_optimized_executing_tasks(agv_status_list)
|
|
self.logger.debug(f"提取结果数量: {len(optimized_tasks)}")
|
|
legacy_tasks = []
|
for i, task in enumerate(optimized_tasks):
|
try:
|
self.logger.debug(f"转换任务 #{i+1}: AGV {task['agvId']}")
|
|
primary_task = task['primary_task']
|
task_info = primary_task['task_info']
|
backpack_item = primary_task['backpack_item']
|
|
self.logger.debug(f" - primary_task存在: {primary_task is not None}")
|
self.logger.debug(f" - task_info存在: {task_info is not None}")
|
self.logger.debug(f" - backpack_item存在: {backpack_item is not None}")
|
|
# 根据action确定状态
|
is_pickup_action = task['next_action'] == 'pickup'
|
|
self.logger.debug(f" - next_action: {task['next_action']}")
|
self.logger.debug(f" - is_pickup_action: {is_pickup_action}")
|
self.logger.debug(f" - target_position: {task['target_position']}")
|
|
# 保持原有的标准JSON格式
|
legacy_task = {
|
'agvId': task['agvId'],
|
'taskId': task_info['task_id'],
|
'currentPosition': task['currentPosition'],
|
'backpackIndex': backpack_item.index,
|
'status': 'executing',
|
'agvDirection': task['agvDirection'],
|
'agvVoltage': task['agvVoltage'],
|
'agvError': task['agvError'],
|
'loaded': backpack_item.loaded,
|
'needPickup': is_pickup_action,
|
'targetPosition': task['target_position'],
|
'taskStart': task_info['start_position'],
|
'taskEnd': task_info['end_position']
|
}
|
|
self.logger.debug(f" - 任务构建成功: {legacy_task['agvId']} -> {legacy_task['targetPosition']}")
|
legacy_tasks.append(legacy_task)
|
|
except Exception as e:
|
self.logger.error(f"转换任务 #{i+1} 失败: {e}")
|
self.logger.error(f"任务内容: {task}")
|
continue
|
|
self.logger.info(f"使用综合优化策略,为 {len(legacy_tasks)} 个AGV生成标准格式路径")
|
return legacy_tasks
|
|
|
class BatchPathPlanner:
|
"""批量路径规划器"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]],
|
algorithm_type: str = "A_STAR", agv_manager=None):
|
"""
|
初始化批量路径规划器
|
|
Args:
|
path_mapping: 路径点映射字典
|
algorithm_type: 基础路径规划算法类型
|
agv_manager: 用于获取AGV状态信息
|
"""
|
self.path_mapping = path_mapping
|
self.algorithm_type = algorithm_type
|
self.agv_manager = agv_manager
|
self.logger = logging.getLogger(__name__)
|
|
# 初始化优化组件
|
self.task_extractor = ExecutingTaskExtractor(path_mapping)
|
self.collision_detector = FastCollisionDetector(min_distance=3.0)
|
self.collision_resolver = FastCollisionResolver(path_mapping, self.collision_detector, agv_manager)
|
self.base_planner = PathPlanningFactory.create_path_planner(algorithm_type, path_mapping)
|
|
def plan_all_agv_paths(self, agv_status_list: List[AGVStatus],
|
include_idle_agv: bool = False,
|
constraints: List[Tuple[int, int, float]] = None) -> Dict:
|
"""
|
为所有AGV规划路径(每个AGV只生成一个路径)
|
|
Args:
|
agv_status_list: AGV状态列表
|
include_idle_agv: 是否包含空闲AGV
|
constraints: 路径约束条件
|
|
Returns:
|
Dict: 包含所有路径信息的结果
|
"""
|
start_time = time.time()
|
|
# 1. 提取执行中的任务(使用向后兼容的方法)
|
executing_tasks = self.task_extractor.extract_executing_tasks(agv_status_list)
|
|
# 2. 路径规划
|
planned_paths = []
|
total_planning_time = 0
|
planned_agv_ids = set()
|
|
for task in executing_tasks:
|
agv_id = task['agvId']
|
current_pos = task['currentPosition']
|
target_pos = task['targetPosition']
|
task_id = task['taskId']
|
|
self.logger.debug(f"开始路径规划: AGV {agv_id}, 任务 {task_id}")
|
self.logger.debug(f" - current_pos: {current_pos} ({type(current_pos)})")
|
self.logger.debug(f" - target_pos: {target_pos} ({type(target_pos)})")
|
|
# 避免重复规划
|
if agv_id in planned_agv_ids:
|
self.logger.debug(f" - 跳过AGV {agv_id}:已规划")
|
continue
|
|
# 验证位置格式
|
if not current_pos or not target_pos:
|
self.logger.error(f" - AGV {agv_id} 位置信息无效: current={current_pos}, target={target_pos}")
|
continue
|
|
# 路径格式标准化
|
from common.utils import normalize_path_id
|
normalized_current = normalize_path_id(current_pos)
|
normalized_target = normalize_path_id(target_pos)
|
|
self.logger.debug(f" - 格式标准化: {current_pos} -> {normalized_current}, {target_pos} -> {normalized_target}")
|
|
# 路径规划
|
path_start_time = time.time()
|
self.logger.debug(f" - 调用base_planner.plan_path({normalized_current}, {normalized_target})")
|
planned_path = self.base_planner.plan_path(normalized_current, normalized_target, constraints)
|
path_end_time = time.time()
|
|
planning_time = (path_end_time - path_start_time) * 1000
|
total_planning_time += planning_time
|
|
self.logger.debug(f" - 路径规划结果: {planned_path is not None}")
|
if planned_path:
|
self.logger.debug(f" - 路径长度: {len(planned_path.codeList) if hasattr(planned_path, 'codeList') else 'N/A'}")
|
else:
|
self.logger.error(f" - AGV {agv_id} 路径规划失败: {current_pos} -> {target_pos}")
|
|
if planned_path:
|
# 生成路径字典
|
seg_id = generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2")
|
|
path_dict = {
|
'agvId': agv_id,
|
'segId': seg_id,
|
'codeList': []
|
}
|
|
# 转换路径点格式
|
for i, path_code in enumerate(planned_path.codeList):
|
if isinstance(path_code, PathCode):
|
action_type = "2" # 任务类型
|
pos_type = None
|
backpack_level = task.get('backpackIndex', 0)
|
is_target_point = False
|
|
# 确定位置类型:最后一个点的动作类型
|
if i == len(planned_path.codeList) - 1: # 最后一个路径点
|
is_target_point = True
|
if task.get('loaded', False):
|
# 已装载任务:当前路径是去终点放货
|
pos_type = "2" # 放货
|
self.logger.debug(f"AGV {agv_id} 在终点 {path_code.code} 设置放货动作")
|
else:
|
# 未装载任务:当前路径是去起点取货
|
pos_type = "1" # 取货
|
self.logger.debug(f"AGV {agv_id} 在起点 {path_code.code} 设置取货动作")
|
|
enhanced_code = generate_navigation_code(
|
code=path_code.code,
|
direction=path_code.direction,
|
action_type=action_type,
|
task_id=task_id,
|
pos_type=pos_type,
|
backpack_level=backpack_level,
|
is_target_point=is_target_point
|
)
|
path_dict['codeList'].append(enhanced_code)
|
else:
|
enhanced_code = generate_navigation_code(
|
code=path_code.get('code', ''),
|
direction=path_code.get('direction', '90'),
|
action_type="2",
|
task_id=task_id
|
)
|
path_dict['codeList'].append(enhanced_code)
|
|
planned_paths.append(path_dict)
|
planned_agv_ids.add(agv_id)
|
self.logger.debug(f"AGV {agv_id} 快速规划: {current_pos} -> {target_pos}")
|
|
# 3. 空闲AGV处理
|
if include_idle_agv:
|
idle_paths = self._fast_plan_idle_agv_paths(agv_status_list, executing_tasks, constraints, planned_agv_ids)
|
planned_paths.extend(idle_paths)
|
|
# 4. 去重检查
|
final_paths = []
|
final_agv_ids = set()
|
for path in planned_paths:
|
agv_id = path['agvId']
|
if agv_id not in final_agv_ids:
|
final_paths.append(path)
|
final_agv_ids.add(agv_id)
|
planned_paths = final_paths
|
|
# 5. 碰撞检测和解决
|
conflicts = self.collision_detector.detect_conflicts(planned_paths)
|
|
if conflicts:
|
self.logger.info(f"发现 {len(conflicts)} 个碰撞,快速解决中...")
|
planned_paths = self.collision_resolver.resolve_conflicts(planned_paths, conflicts, executing_tasks)
|
remaining_conflicts = self.collision_detector.detect_conflicts(planned_paths)
|
else:
|
remaining_conflicts = []
|
|
end_time = time.time()
|
total_time = (end_time - start_time) * 1000
|
|
# 6. 构建优化结果
|
result = {
|
'totalAgvs': len(agv_status_list),
|
'executingTasksCount': len(executing_tasks),
|
'plannedPathsCount': len(planned_paths),
|
'totalPlanningTime': round(total_time, 2),
|
'pathPlanningTime': round(total_planning_time, 2),
|
'conflictsDetected': len(conflicts),
|
'remainingConflicts': len(remaining_conflicts),
|
'algorithm': f"Optimized_{self.algorithm_type}",
|
'fourDirectionCompliant': True,
|
'collisionFree': len(remaining_conflicts) == 0,
|
'plannedPaths': planned_paths,
|
'executingTasks': executing_tasks
|
}
|
|
self.logger.info(f"优化批量路径规划完成 - 总AGV: {result['totalAgvs']}, "
|
f"执行中任务: {result['executingTasksCount']}, "
|
f"规划路径: {result['plannedPathsCount']}, "
|
f"总耗时: {result['totalPlanningTime']:.2f}ms, "
|
f"无碰撞: {result['collisionFree']}")
|
|
return result
|
|
def _fast_plan_idle_agv_paths(self, agv_status_list: List[AGVStatus],
|
executing_tasks: List[Dict],
|
constraints: List[Tuple[int, int, float]] = None,
|
planned_agv_ids: Set[str] = None) -> List[Dict]:
|
"""
|
为空闲AGV规划路径
|
"""
|
idle_paths = []
|
executing_agv_ids = {task['agvId'] for task in executing_tasks}
|
|
if planned_agv_ids is None:
|
planned_agv_ids = executing_agv_ids.copy()
|
|
for agv_status in agv_status_list:
|
agv_id = agv_status.agvId
|
|
if agv_id in planned_agv_ids:
|
continue
|
|
if self._is_agv_idle_fast(agv_status):
|
current_pos = agv_status.position
|
target_pos = self._get_idle_agv_target_fast(agv_id, current_pos)
|
|
if target_pos and target_pos != current_pos:
|
planned_path = self.base_planner.plan_path(current_pos, target_pos, constraints)
|
|
if planned_path:
|
action_type = "3" if self._is_charging_path(target_pos) else "4"
|
seg_id = generate_segment_id(agv_id=agv_id, target_position=target_pos, action_type=action_type)
|
|
path_dict = {
|
'agvId': agv_id,
|
'segId': seg_id,
|
'codeList': []
|
}
|
|
for i, path_code in enumerate(planned_path.codeList):
|
if isinstance(path_code, PathCode):
|
# 空闲AGV不需要taskId、posType和lev字段
|
enhanced_code = generate_navigation_code(
|
code=path_code.code,
|
direction=path_code.direction,
|
action_type=action_type,
|
is_target_point=False # 空闲AGV路径无目标点概念
|
)
|
path_dict['codeList'].append(enhanced_code)
|
|
idle_paths.append(path_dict)
|
planned_agv_ids.add(agv_id)
|
|
return idle_paths
|
|
def _is_agv_idle_fast(self, agv_status: AGVStatus) -> bool:
|
"""判断AGV是否空闲"""
|
if agv_status.status not in [0, 1, 2] or agv_status.error != 0:
|
return False
|
|
# 检查背篓
|
if agv_status.backpack:
|
for item in agv_status.backpack[:2]: # 只检查前两个
|
if item.loaded and item.execute:
|
return False
|
|
return True
|
|
def _get_idle_agv_target_fast(self, agv_id: str, current_position: str) -> Optional[str]:
|
"""获取空闲AGV目标位置"""
|
hash_val = hash(agv_id) % 1000
|
|
# 选择逻辑
|
charging_positions = [pos for pos in self.task_extractor.charging_positions if pos != current_position]
|
if charging_positions:
|
return charging_positions[hash_val % len(charging_positions)]
|
|
standby_positions = [pos for pos in self.task_extractor.standby_positions if pos != current_position]
|
if standby_positions:
|
return standby_positions[hash_val % len(standby_positions)]
|
|
all_positions = [pos for pos in self.path_mapping.keys() if pos != current_position]
|
if all_positions:
|
return all_positions[hash_val % len(all_positions)]
|
|
return None
|
|
def _is_charging_path(self, target_position: str) -> bool:
|
"""
|
判断目标位置是否为充电站(后期修改)
|
"""
|
if not target_position:
|
return False
|
|
# 充电站识别
|
return target_position.startswith("2") or "charge" in target_position.lower()
|
|
|
class PathPlanner:
|
"""路径规划器基类"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]]):
|
"""
|
初始化路径规划器
|
|
Args:
|
path_mapping: 路径点映射字典
|
"""
|
self.path_mapping = path_mapping
|
self.logger = logging.getLogger(__name__)
|
|
def plan_path(self, start_code: str, end_code: str,
|
constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]:
|
"""
|
规划从起点到终点的路径
|
|
Args:
|
start_code: 起始路径点ID
|
end_code: 目标路径点ID
|
constraints: 约束条件列表
|
|
Returns:
|
Optional[PlannedPath]: 规划的路径,如果无法规划则返回None
|
"""
|
raise NotImplementedError("子类必须实现此方法")
|
|
def is_valid_path_point(self, code: str) -> bool:
|
"""
|
检查路径点ID是否有效
|
|
Args:
|
code: 路径点ID
|
|
Returns:
|
bool: 是否有效
|
"""
|
return code in self.path_mapping
|
|
|
class AStarPathPlanner(PathPlanner):
|
"""A*路径规划器"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]], turn_cost: float = 0.5):
|
"""
|
初始化A*路径规划器
|
|
Args:
|
path_mapping: 路径点映射字典
|
turn_cost: 转向代价
|
"""
|
super().__init__(path_mapping)
|
self.turn_cost = turn_cost
|
self.logger = logging.getLogger(__name__)
|
|
# 四个方向:上、右、下、左 (严格四方向移动)
|
self.directions = [(0, -1), (1, 0), (0, 1), (-1, 0)]
|
self.direction_angles = ["270", "0", "90", "180"]
|
|
# 预计算坐标映射(必须在 _build_adjacency_list 之前初始化)
|
self.coord_to_code = {}
|
for code, data in path_mapping.items():
|
if isinstance(data, dict) and 'x' in data and 'y' in data:
|
self.coord_to_code[(data['x'], data['y'])] = code
|
|
# 预计算邻接表
|
self.adjacency = self._build_adjacency_list()
|
|
# 性能监控
|
self.planning_count = 0
|
self.total_planning_time = 0.0
|
|
self.logger.debug(f"A*规划器初始化完成,邻接表: {len(self.adjacency)} 个节点")
|
|
def _build_adjacency_list(self) -> Dict[str, List[Tuple[str, str]]]:
|
"""
|
预计算邻接表
|
|
Returns:
|
Dict[str, List[Tuple[str, str]]]: {节点: [(邻居节点, 移动方向), ...]}
|
"""
|
adjacency = defaultdict(list)
|
|
for code, coord_data in self.path_mapping.items():
|
x, y = coord_data['x'], coord_data['y']
|
|
# 检查四个方向的邻居
|
for i, (dx, dy) in enumerate(self.directions):
|
next_x, next_y = x + dx, y + dy
|
|
# 查找邻居代码
|
neighbor_coord = (next_x, next_y)
|
if neighbor_coord in self.coord_to_code:
|
neighbor_code = self.coord_to_code[neighbor_coord]
|
direction = self.direction_angles[i]
|
adjacency[code].append((neighbor_code, direction))
|
|
return adjacency
|
|
def plan_path(self, start_code: str, end_code: str,
|
constraints: List[Tuple[int, int, float]] = None) -> Optional[PlannedPath]:
|
"""
|
Args:
|
start_code: 起始路径点ID
|
end_code: 目标路径点ID
|
constraints: 约束条件列表
|
|
Returns:
|
Optional[PlannedPath]: 规划的路径
|
"""
|
# 验证
|
if not self.is_valid_path_point(start_code) or not self.is_valid_path_point(end_code):
|
return None
|
|
if start_code == end_code:
|
return PlannedPath(agvId="", codeList=[PathCode(code=start_code, direction="90")])
|
|
# 获取坐标(使用预计算映射)
|
start_coord = (self.path_mapping[start_code]['x'], self.path_mapping[start_code]['y'])
|
end_coord = (self.path_mapping[end_code]['x'], self.path_mapping[end_code]['y'])
|
|
# A*实现
|
# 使用元组堆,减少对象创建开销
|
open_set = [(0, start_code, None, [])] # (f_score, code, parent, path)
|
closed_set = set()
|
g_scores = {start_code: 0}
|
|
# 预计算约束检查函数
|
constraint_check = self._build_constraint_checker(constraints) if constraints else None
|
|
while open_set:
|
f_score, current_code, parent_code, path = heapq.heappop(open_set)
|
|
if current_code in closed_set:
|
continue
|
|
closed_set.add(current_code)
|
current_path = path + [current_code]
|
|
if current_code == end_code:
|
# 重建路径
|
return self._build_path_result(current_path)
|
|
# 使用预计算的邻接表
|
for neighbor_code, direction in self.adjacency[current_code]:
|
if neighbor_code in closed_set:
|
continue
|
|
# 约束检查
|
if constraint_check and constraint_check(neighbor_code):
|
continue
|
|
# 代价计算
|
g_cost = g_scores[current_code] + 1
|
|
if neighbor_code not in g_scores or g_cost < g_scores[neighbor_code]:
|
g_scores[neighbor_code] = g_cost
|
|
# 启发式计算
|
neighbor_coord = (self.path_mapping[neighbor_code]['x'],
|
self.path_mapping[neighbor_code]['y'])
|
h_cost = abs(neighbor_coord[0] - end_coord[0]) + abs(neighbor_coord[1] - end_coord[1])
|
f_cost = g_cost + h_cost
|
|
heapq.heappush(open_set, (f_cost, neighbor_code, current_code, current_path))
|
|
return None
|
|
def _build_constraint_checker(self, constraints: List[Tuple[int, int, float]]):
|
"""构建约束检查函数"""
|
def check_constraints(code: str) -> bool:
|
coord = (self.path_mapping[code]['x'], self.path_mapping[code]['y'])
|
for cx, cy, radius in constraints:
|
if (coord[0] - cx) ** 2 + (coord[1] - cy) ** 2 <= radius ** 2:
|
return True
|
return False
|
return check_constraints
|
|
def _build_path_result(self, path_codes: List[str]) -> PlannedPath:
|
"""构建路径结果"""
|
result_codes = []
|
|
for i, code in enumerate(path_codes):
|
direction = "90" # 默认方向
|
|
if i < len(path_codes) - 1:
|
# 使用预计算的邻接表查找方向
|
next_code = path_codes[i + 1]
|
for neighbor_code, neighbor_direction in self.adjacency[code]:
|
if neighbor_code == next_code:
|
direction = neighbor_direction
|
break
|
|
result_codes.append(PathCode(code=code, direction=direction))
|
|
return PlannedPath(agvId="", codeList=result_codes)
|
|
|
# 碰撞检测器
|
class FastCollisionDetector:
|
"""碰撞检测器"""
|
|
def __init__(self, min_distance: float = 3.0):
|
self.min_distance = min_distance
|
self.logger = logging.getLogger(__name__)
|
|
def detect_conflicts(self, planned_paths: List[Dict]) -> List[Dict]:
|
"""
|
碰撞检测 - 基于时间步的优化检测
|
|
Args:
|
planned_paths: 规划路径列表
|
|
Returns:
|
List[Dict]: 冲突列表
|
"""
|
conflicts = []
|
if len(planned_paths) < 2:
|
return conflicts
|
|
# 构建时间-位置映射
|
time_positions = defaultdict(list)
|
|
for path_data in planned_paths:
|
agv_id = path_data['agvId']
|
code_list = path_data.get('codeList', [])
|
|
for time_step, path_code in enumerate(code_list):
|
# 快速提取位置码
|
if isinstance(path_code, dict):
|
position = path_code.get('code', '')
|
elif hasattr(path_code, 'code'):
|
position = path_code.code
|
else:
|
position = str(path_code)
|
|
if position:
|
time_positions[time_step].append((agv_id, position))
|
|
# 快速冲突检测
|
for time_step, agv_positions in time_positions.items():
|
if len(agv_positions) < 2:
|
continue
|
|
# 位置分组检查
|
position_groups = defaultdict(list)
|
for agv_id, position in agv_positions:
|
position_groups[position].append(agv_id)
|
|
# 发现冲突
|
for position, agv_list in position_groups.items():
|
if len(agv_list) > 1:
|
conflicts.append({
|
'type': 'position_conflict',
|
'time_step': time_step,
|
'position': position,
|
'agv_ids': agv_list,
|
'severity': 'high'
|
})
|
|
self.logger.debug(f"快速检测到 {len(conflicts)} 个冲突")
|
return conflicts
|
|
|
# 碰撞解决器
|
class FastCollisionResolver:
|
"""碰撞解决器"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]],
|
collision_detector: FastCollisionDetector, agv_manager=None):
|
self.path_mapping = path_mapping
|
self.collision_detector = collision_detector
|
self.agv_manager = agv_manager
|
self.logger = logging.getLogger(__name__)
|
|
def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Dict], executing_tasks: List[Dict] = None) -> List[Dict]:
|
"""
|
解决冲突
|
|
Args:
|
planned_paths: 规划路径列表
|
conflicts: 冲突列表
|
executing_tasks: 执行任务列表
|
|
Returns:
|
List[Dict]: 解决冲突后的路径列表
|
"""
|
if not conflicts:
|
return planned_paths
|
|
# 构建路径字典便于快速访问
|
paths_dict = {path['agvId']: path for path in planned_paths}
|
|
# 按时间步排序处理冲突
|
sorted_conflicts = sorted(conflicts, key=lambda x: x.get('time_step', 0))
|
|
for conflict in sorted_conflicts:
|
if conflict['type'] == 'position_conflict':
|
agv_ids = conflict['agv_ids']
|
time_step = conflict['time_step']
|
|
# 简单策略:保留第一个AGV,其他延迟
|
sorted_agv_ids = sorted(agv_ids)
|
|
for i, agv_id in enumerate(sorted_agv_ids[1:], 1):
|
if agv_id in paths_dict:
|
self._add_delay_to_path(paths_dict[agv_id], delay_steps=i)
|
self.logger.debug(f"为AGV {agv_id} 添加 {i} 步延迟")
|
|
return list(paths_dict.values())
|
|
def _add_delay_to_path(self, path_data: Dict, delay_steps: int):
|
"""为路径添加延迟步骤"""
|
if delay_steps <= 0 or not path_data.get('codeList'):
|
return
|
|
# 在路径开始处重复第一个位置
|
first_code = path_data['codeList'][0]
|
|
# 创建延迟代码
|
if isinstance(first_code, dict):
|
delay_code = first_code.copy()
|
else:
|
delay_code = {
|
'code': first_code.code if hasattr(first_code, 'code') else str(first_code),
|
'direction': first_code.direction if hasattr(first_code, 'direction') else "90"
|
}
|
|
# 添加延迟步骤
|
for _ in range(delay_steps):
|
path_data['codeList'].insert(0, delay_code)
|
|
def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]:
|
"""验证四向移动约束"""
|
return planned_paths
|
|
|
class PathPlanningFactory:
|
"""路径规划器工厂类"""
|
|
@staticmethod
|
def create_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]]) -> PathPlanner:
|
"""
|
创建路径规划器实例
|
|
Args:
|
algorithm_type: 算法类型
|
path_mapping: 路径点映射字典
|
|
Returns:
|
PathPlanner: 路径规划器实例
|
"""
|
algorithm_upper = algorithm_type.upper()
|
|
if algorithm_upper in ["A_STAR", "ASTAR", "A*", "DIJKSTRA", "GREEDY"]:
|
return AStarPathPlanner(path_mapping)
|
else:
|
# 默认A*算法
|
logging.getLogger(__name__).warning(f"未知算法类型 {algorithm_type},使用默认A*算法")
|
return AStarPathPlanner(path_mapping)
|
|
@staticmethod
|
def create_batch_path_planner(algorithm_type: str, path_mapping: Dict[str, Dict[str, int]],
|
agv_manager=None) -> BatchPathPlanner:
|
"""
|
创建批量路径规划器实例
|
|
Args:
|
algorithm_type: 基础算法类型
|
path_mapping: 路径点映射字典
|
agv_manager: AGV管理器,用于获取AGV状态信息
|
|
Returns:
|
BatchPathPlanner: 批量路径规划器实例
|
"""
|
return BatchPathPlanner(path_mapping, algorithm_type, agv_manager)
|
|
# 导出核心类和函数
|
__all__ = [
|
'ExecutingTaskExtractor',
|
'BatchPathPlanner',
|
'PathPlanner',
|
'AStarPathPlanner',
|
'PathPlanningFactory',
|
'FastCollisionDetector',
|
'FastCollisionResolver'
|
]
|