"""
|
AGV路径碰撞检测和解决模块
|
"""
|
import math
|
import logging
|
from typing import Dict, List, Tuple, Optional, Set
|
from dataclasses import dataclass
|
from collections import defaultdict
|
|
from common.data_models import PlannedPath, PathCode, AGVActionTypeEnum
|
from common.utils import get_coordinate_from_path_id
|
|
|
@dataclass
|
class SpaceTimeNode:
|
"""时空节点 - 表示AGV在特定时间和位置的状态"""
|
agv_id: str
|
position: str # 位置码
|
coordinates: Tuple[int, int] # 坐标
|
time_step: int # 时间步
|
direction: str # 方向
|
|
|
@dataclass
|
class Conflict:
|
"""冲突描述"""
|
type: str # 冲突类型: "vertex", "edge", "follow"
|
agv1: str
|
agv2: str
|
time_step: int
|
position1: str
|
position2: str
|
description: str
|
|
|
@dataclass
|
class AGVPriority:
|
"""AGV优先级评估结果"""
|
agv_id: str
|
priority_score: float # 优先级分数,越高越优先
|
task_status: str # 任务状态
|
action_type: str # 动作类型
|
task_priority: int # 任务优先级
|
voltage: int # 电量
|
explanation: str # 优先级说明
|
|
|
class CollisionDetector:
|
"""AGV路径碰撞检测器"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]],
|
min_distance: float = 3.0, time_buffer: int = 1):
|
"""
|
初始化碰撞检测器
|
|
Args:
|
path_mapping: 路径点映射字典
|
min_distance: AGV之间的最小安全距离
|
time_buffer: 时间缓冲区(时间步数)
|
"""
|
self.path_mapping = path_mapping
|
self.min_distance = min_distance
|
self.time_buffer = time_buffer
|
self.logger = logging.getLogger(__name__)
|
|
def detect_conflicts(self, planned_paths: List[Dict]) -> List[Conflict]:
|
"""
|
检测所有AGV路径之间的冲突
|
|
Args:
|
planned_paths: 规划路径列表
|
|
Returns:
|
List[Conflict]: 冲突列表
|
"""
|
conflicts = []
|
|
# 构建时空表
|
space_time_table = self._build_space_time_table(planned_paths)
|
|
# 检测顶点冲突(同一时间同一位置)
|
conflicts.extend(self._detect_vertex_conflicts(space_time_table))
|
|
# 检测边冲突(AGV交换位置)
|
conflicts.extend(self._detect_edge_conflicts(space_time_table))
|
|
# 检测跟随冲突(AGV距离过近)
|
conflicts.extend(self._detect_following_conflicts(space_time_table))
|
|
self.logger.info(f"检测到 {len(conflicts)} 个路径冲突")
|
return conflicts
|
|
def _build_space_time_table(self, planned_paths: List[Dict]) -> Dict[int, List[SpaceTimeNode]]:
|
"""
|
构建时空表
|
|
Args:
|
planned_paths: 规划路径列表
|
|
Returns:
|
Dict[int, List[SpaceTimeNode]]: 时间步 -> 时空节点列表
|
"""
|
space_time_table = defaultdict(list)
|
|
for path_data in planned_paths:
|
agv_id = path_data.get('agvId', '')
|
code_list = path_data.get('codeList', [])
|
|
for time_step, path_code in enumerate(code_list):
|
position = path_code.get('code', '') if isinstance(path_code, dict) else path_code.code
|
direction = path_code.get('direction', '90') if isinstance(path_code, dict) else path_code.direction
|
|
coordinates = get_coordinate_from_path_id(position, self.path_mapping)
|
if coordinates:
|
node = SpaceTimeNode(
|
agv_id=agv_id,
|
position=position,
|
coordinates=coordinates,
|
time_step=time_step,
|
direction=direction
|
)
|
space_time_table[time_step].append(node)
|
|
return space_time_table
|
|
def _detect_vertex_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
|
"""检测顶点冲突(同一时间同一位置)"""
|
conflicts = []
|
|
for time_step, nodes in space_time_table.items():
|
# 按位置分组
|
position_groups = defaultdict(list)
|
for node in nodes:
|
position_groups[node.position].append(node)
|
|
# 检查每个位置是否有多个AGV
|
for position, agv_nodes in position_groups.items():
|
if len(agv_nodes) > 1:
|
# 发现冲突
|
for i in range(len(agv_nodes)):
|
for j in range(i + 1, len(agv_nodes)):
|
conflict = Conflict(
|
type="vertex",
|
agv1=agv_nodes[i].agv_id,
|
agv2=agv_nodes[j].agv_id,
|
time_step=time_step,
|
position1=position,
|
position2=position,
|
description=f"AGV {agv_nodes[i].agv_id} 和 {agv_nodes[j].agv_id} 在时间 {time_step} 占用同一位置 {position}"
|
)
|
conflicts.append(conflict)
|
|
return conflicts
|
|
def _detect_edge_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
|
"""检测边冲突(AGV交换位置)"""
|
conflicts = []
|
|
max_time = max(space_time_table.keys()) if space_time_table else 0
|
|
for time_step in range(max_time):
|
current_nodes = space_time_table.get(time_step, [])
|
next_nodes = space_time_table.get(time_step + 1, [])
|
|
# 构建当前和下一时刻的位置映射
|
current_positions = {node.agv_id: node.position for node in current_nodes}
|
next_positions = {node.agv_id: node.position for node in next_nodes}
|
|
# 检查是否有AGV交换位置
|
for agv1, pos1_current in current_positions.items():
|
for agv2, pos2_current in current_positions.items():
|
if agv1 >= agv2: # 避免重复检查
|
continue
|
|
pos1_next = next_positions.get(agv1)
|
pos2_next = next_positions.get(agv2)
|
|
if (pos1_next and pos2_next and
|
pos1_current == pos2_next and pos2_current == pos1_next):
|
# 发现边冲突
|
conflict = Conflict(
|
type="edge",
|
agv1=agv1,
|
agv2=agv2,
|
time_step=time_step,
|
position1=pos1_current,
|
position2=pos2_current,
|
description=f"AGV {agv1} 和 {agv2} 在时间 {time_step}-{time_step+1} 交换位置 {pos1_current}<->{pos2_current}"
|
)
|
conflicts.append(conflict)
|
|
return conflicts
|
|
def _detect_following_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
|
"""检测跟随冲突(AGV距离过近)"""
|
conflicts = []
|
|
for time_step, nodes in space_time_table.items():
|
# 检查所有AGV对之间的距离
|
for i in range(len(nodes)):
|
for j in range(i + 1, len(nodes)):
|
node1, node2 = nodes[i], nodes[j]
|
|
# 计算欧几里得距离
|
distance = math.sqrt(
|
(node1.coordinates[0] - node2.coordinates[0]) ** 2 +
|
(node1.coordinates[1] - node2.coordinates[1]) ** 2
|
)
|
|
if distance < self.min_distance and distance > 0:
|
conflict = Conflict(
|
type="follow",
|
agv1=node1.agv_id,
|
agv2=node2.agv_id,
|
time_step=time_step,
|
position1=node1.position,
|
position2=node2.position,
|
description=f"AGV {node1.agv_id} 和 {node2.agv_id} 在时间 {time_step} 距离过近 ({distance:.2f} < {self.min_distance})"
|
)
|
conflicts.append(conflict)
|
|
return conflicts
|
|
|
class CollisionResolver:
|
"""AGV路径碰撞解决器"""
|
|
def __init__(self, path_mapping: Dict[str, Dict[str, int]], detector: CollisionDetector, agv_manager=None):
|
"""
|
初始化碰撞解决器
|
|
Args:
|
path_mapping: 路径点映射字典
|
detector: 碰撞检测器
|
agv_manager: AGV管理器,用于获取AGV状态信息
|
"""
|
self.path_mapping = path_mapping
|
self.detector = detector
|
self.agv_manager = agv_manager
|
self.logger = logging.getLogger(__name__)
|
|
def evaluate_agv_priority(self, agv_id: str, path: Dict, executing_tasks: List[Dict] = None) -> AGVPriority:
|
"""
|
评估AGV的避让优先级
|
|
Args:
|
agv_id: AGV ID
|
path: AGV路径信息
|
executing_tasks: 执行中的任务列表
|
|
Returns:
|
AGVPriority: AGV优先级评估结果
|
"""
|
# 默认值
|
task_status = "idle"
|
action_type = "unknown"
|
task_priority = 1
|
voltage = 100
|
priority_score = 0.0
|
explanation_parts = []
|
|
# 获取AGV状态信息
|
agv_model = None
|
if self.agv_manager:
|
agv_model = self.agv_manager.get_agv_by_id(agv_id)
|
if agv_model:
|
voltage = agv_model.voltage
|
|
# 从路径信息中获取动作类型
|
code_list = path.get('codeList', [])
|
if code_list:
|
first_code = code_list[0]
|
if isinstance(first_code, dict):
|
action_type = first_code.get('type', 'unknown')
|
|
# 从执行中任务获取任务状态和优先级
|
if executing_tasks:
|
for task in executing_tasks:
|
if task.get('agvId') == agv_id:
|
task_status = task.get('status', 'idle')
|
# 尝试从任务ID获取优先级(简化处理)
|
task_id = task.get('taskId', '')
|
if 'HIGH_PRIORITY' in task_id:
|
task_priority = 10
|
elif 'PRIORITY' in task_id:
|
task_priority = 8
|
else:
|
task_priority = 5
|
break
|
|
# 计算优先级分数(分数越高优先级越高,越不容易让步)
|
priority_score = 0.0
|
|
# 1. 任务状态优先级 (40%权重)
|
if task_status == "executing":
|
priority_score += 40.0
|
explanation_parts.append("执行任务中(+40)")
|
elif task_status == "assigned":
|
priority_score += 20.0
|
explanation_parts.append("已分配任务(+20)")
|
else:
|
priority_score += 5.0
|
explanation_parts.append("空闲状态(+5)")
|
|
# 2. 动作类型优先级 (30%权重)
|
if action_type == AGVActionTypeEnum.TASK.value:
|
priority_score += 30.0
|
explanation_parts.append("任务行为(+30)")
|
elif action_type == AGVActionTypeEnum.CHARGING.value:
|
priority_score += 25.0
|
explanation_parts.append("充电行为(+25)")
|
elif action_type == AGVActionTypeEnum.AVOIDANCE.value:
|
priority_score += 5.0
|
explanation_parts.append("避让行为(+5)")
|
elif action_type == AGVActionTypeEnum.STANDBY.value:
|
priority_score += 10.0
|
explanation_parts.append("待机行为(+10)")
|
else:
|
priority_score += 15.0
|
explanation_parts.append("未知行为(+15)")
|
|
# 3. 任务优先级 (20%权重)
|
task_priority_score = (task_priority / 10.0) * 20.0
|
priority_score += task_priority_score
|
explanation_parts.append(f"任务优先级{task_priority}(+{task_priority_score:.1f})")
|
|
# 4. 电量状态 (10%权重)
|
if voltage <= 10: # 必须充电
|
priority_score += 10.0
|
explanation_parts.append("电量危险(+10)")
|
elif voltage <= 20: # 可自动充电
|
priority_score += 8.0
|
explanation_parts.append("电量偏低(+8)")
|
elif voltage <= 50:
|
priority_score += 5.0
|
explanation_parts.append("电量一般(+5)")
|
else:
|
priority_score += 2.0
|
explanation_parts.append("电量充足(+2)")
|
|
explanation = f"总分{priority_score:.1f}: " + ", ".join(explanation_parts)
|
|
return AGVPriority(
|
agv_id=agv_id,
|
priority_score=priority_score,
|
task_status=task_status,
|
action_type=action_type,
|
task_priority=task_priority,
|
voltage=voltage,
|
explanation=explanation
|
)
|
|
def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Conflict], executing_tasks: List[Dict] = None) -> List[Dict]:
|
"""
|
解决冲突
|
|
Args:
|
planned_paths: 原始规划路径
|
conflicts: 冲突列表
|
executing_tasks: 执行中的任务列表
|
|
Returns:
|
List[Dict]: 解决冲突后的路径
|
"""
|
if not conflicts:
|
return planned_paths
|
|
resolved_paths = [path.copy() for path in planned_paths]
|
|
# 按时间步排序冲突,优先解决早期冲突
|
conflicts_sorted = sorted(conflicts, key=lambda c: c.time_step)
|
|
for conflict in conflicts_sorted:
|
resolved_paths = self._resolve_single_conflict(resolved_paths, conflict, executing_tasks)
|
|
self.logger.info(f"解决了 {len(conflicts)} 个路径冲突")
|
return resolved_paths
|
|
def _resolve_single_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
|
"""
|
解决单个冲突
|
|
Args:
|
paths: 当前路径列表
|
conflict: 冲突
|
executing_tasks: 执行中的任务列表
|
|
Returns:
|
List[Dict]: 更新后的路径列表
|
"""
|
if conflict.type == "vertex":
|
return self._resolve_vertex_conflict(paths, conflict, executing_tasks)
|
elif conflict.type == "edge":
|
return self._resolve_edge_conflict(paths, conflict, executing_tasks)
|
elif conflict.type == "follow":
|
return self._resolve_following_conflict(paths, conflict, executing_tasks)
|
|
return paths
|
|
def _resolve_vertex_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
|
"""解决顶点冲突 - 基于优先级评估选择让步的AGV"""
|
updated_paths = paths.copy()
|
|
# 找到冲突的AGV路径
|
agv1_path = None
|
agv2_path = None
|
agv1_index = agv2_index = -1
|
|
for i, path in enumerate(updated_paths):
|
if path.get('agvId') == conflict.agv1:
|
agv1_path = path
|
agv1_index = i
|
elif path.get('agvId') == conflict.agv2:
|
agv2_path = path
|
agv2_index = i
|
|
if not agv1_path or not agv2_path:
|
return updated_paths
|
|
# 评估两个AGV的优先级
|
agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
|
agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
|
|
# 选择优先级较低的AGV进行等待
|
if agv1_priority.priority_score <= agv2_priority.priority_score:
|
waiting_agv_path = agv1_path
|
waiting_agv_index = agv1_index
|
waiting_agv_id = conflict.agv1
|
higher_priority_agv_id = conflict.agv2
|
else:
|
waiting_agv_path = agv2_path
|
waiting_agv_index = agv2_index
|
waiting_agv_id = conflict.agv2
|
higher_priority_agv_id = conflict.agv1
|
|
# 记录详细的避让决策信息
|
self.logger.info(f"顶点冲突避让决策 - 位置: {conflict.position1}, 时间: {conflict.time_step}")
|
self.logger.info(f" AGV {conflict.agv1}: {agv1_priority.explanation}")
|
self.logger.info(f" AGV {conflict.agv2}: {agv2_priority.explanation}")
|
self.logger.info(f" 决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}")
|
|
# 在冲突位置之前插入等待步骤
|
self._insert_wait_step(waiting_agv_path, conflict.time_step)
|
updated_paths[waiting_agv_index] = waiting_agv_path
|
|
return updated_paths
|
|
def _resolve_edge_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
|
"""解决边冲突 - 基于优先级评估选择让步的AGV"""
|
updated_paths = paths.copy()
|
|
# 找到冲突的AGV路径
|
agv1_path = None
|
agv2_path = None
|
agv1_index = agv2_index = -1
|
|
for i, path in enumerate(updated_paths):
|
if path.get('agvId') == conflict.agv1:
|
agv1_path = path
|
agv1_index = i
|
elif path.get('agvId') == conflict.agv2:
|
agv2_path = path
|
agv2_index = i
|
|
if not agv1_path or not agv2_path:
|
return updated_paths
|
|
# 评估两个AGV的优先级
|
agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
|
agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
|
|
# 选择优先级较低的AGV进行等待
|
if agv1_priority.priority_score <= agv2_priority.priority_score:
|
waiting_agv_path = agv1_path
|
waiting_agv_index = agv1_index
|
waiting_agv_id = conflict.agv1
|
higher_priority_agv_id = conflict.agv2
|
else:
|
waiting_agv_path = agv2_path
|
waiting_agv_index = agv2_index
|
waiting_agv_id = conflict.agv2
|
higher_priority_agv_id = conflict.agv1
|
|
# 记录详细的避让决策信息
|
self.logger.info(f"边冲突避让决策 - 位置交换: {conflict.position1}<->{conflict.position2}, 时间: {conflict.time_step}")
|
self.logger.info(f" AGV {conflict.agv1}: {agv1_priority.explanation}")
|
self.logger.info(f" AGV {conflict.agv2}: {agv2_priority.explanation}")
|
self.logger.info(f" 决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}")
|
|
# 在冲突位置之前插入等待步骤
|
self._insert_wait_step(waiting_agv_path, conflict.time_step)
|
updated_paths[waiting_agv_index] = waiting_agv_path
|
|
return updated_paths
|
|
def _resolve_following_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
|
"""解决跟随冲突 - 基于优先级评估选择让步的AGV"""
|
updated_paths = paths.copy()
|
|
# 找到冲突的AGV路径
|
agv1_path = None
|
agv2_path = None
|
agv1_index = agv2_index = -1
|
|
for i, path in enumerate(updated_paths):
|
if path.get('agvId') == conflict.agv1:
|
agv1_path = path
|
agv1_index = i
|
elif path.get('agvId') == conflict.agv2:
|
agv2_path = path
|
agv2_index = i
|
|
if not agv1_path or not agv2_path:
|
return updated_paths
|
|
# 评估两个AGV的优先级
|
agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
|
agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
|
|
# 选择优先级较低的AGV进行等待
|
if agv1_priority.priority_score <= agv2_priority.priority_score:
|
waiting_agv_path = agv1_path
|
waiting_agv_index = agv1_index
|
waiting_agv_id = conflict.agv1
|
higher_priority_agv_id = conflict.agv2
|
else:
|
waiting_agv_path = agv2_path
|
waiting_agv_index = agv2_index
|
waiting_agv_id = conflict.agv2
|
higher_priority_agv_id = conflict.agv1
|
|
# 记录详细的避让决策信息
|
self.logger.info(f"跟随冲突避让决策 - 距离过近, 时间: {conflict.time_step}")
|
self.logger.info(f" AGV {conflict.agv1} (位置: {conflict.position1}): {agv1_priority.explanation}")
|
self.logger.info(f" AGV {conflict.agv2} (位置: {conflict.position2}): {agv2_priority.explanation}")
|
self.logger.info(f" 决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}")
|
|
# 在冲突位置之前插入等待步骤
|
self._insert_wait_step(waiting_agv_path, conflict.time_step)
|
updated_paths[waiting_agv_index] = waiting_agv_path
|
|
return updated_paths
|
|
def _insert_wait_step(self, path: Dict, time_step: int):
|
"""
|
在指定时间步插入等待步骤
|
|
Args:
|
path: AGV路径
|
time_step: 插入位置的时间步
|
"""
|
code_list = path.get('codeList', [])
|
|
if time_step < len(code_list) and time_step > 0:
|
# 在指定位置插入等待步骤(重复前一个位置)
|
prev_code = code_list[time_step - 1]
|
|
# 确保复制所有字段
|
if isinstance(prev_code, dict):
|
wait_code = prev_code.copy()
|
else:
|
wait_code = {
|
'code': prev_code.code,
|
'direction': prev_code.direction
|
}
|
|
code_list.insert(time_step, wait_code)
|
path['codeList'] = code_list
|
|
def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]:
|
"""
|
验证并修正路径,确保AGV只能以0、90、180、270度移动
|
|
Args:
|
planned_paths: 规划路径列表
|
|
Returns:
|
List[Dict]: 修正后的路径列表
|
"""
|
validated_paths = []
|
|
for path_data in planned_paths:
|
validated_path = self._validate_single_path_directions(path_data)
|
validated_paths.append(validated_path)
|
|
return validated_paths
|
|
def _validate_single_path_directions(self, path_data: Dict) -> Dict:
|
"""
|
验证单个路径的移动方向
|
|
Args:
|
path_data: 单个AGV路径数据
|
|
Returns:
|
Dict: 修正后的路径数据
|
"""
|
validated_path = path_data.copy()
|
code_list = path_data.get('codeList', [])
|
|
if len(code_list) < 2:
|
return validated_path
|
|
validated_code_list = []
|
|
for i in range(len(code_list)):
|
current_code = code_list[i]
|
|
if isinstance(current_code, dict):
|
position = current_code.get('code', '')
|
direction = current_code.get('direction', '90')
|
|
# 保留原有的所有字段
|
validated_code = current_code.copy()
|
else:
|
position = current_code.code
|
direction = current_code.direction
|
|
# 转换为字典格式并保留基本字段
|
validated_code = {
|
'code': position,
|
'direction': direction
|
}
|
|
# 如果不是第一个点,计算正确的方向
|
if i > 0:
|
prev_code = code_list[i - 1]
|
prev_position = prev_code.get('code', '') if isinstance(prev_code, dict) else prev_code.code
|
|
# 计算移动方向
|
correct_direction = self._calculate_movement_direction(prev_position, position)
|
if correct_direction:
|
direction = correct_direction
|
|
# 确保方向是有效的四方向之一
|
direction = self._normalize_direction(direction)
|
|
# 更新方向,保留其他所有字段
|
validated_code['direction'] = direction
|
validated_code_list.append(validated_code)
|
|
validated_path['codeList'] = validated_code_list
|
return validated_path
|
|
def _calculate_movement_direction(self, from_position: str, to_position: str) -> Optional[str]:
|
"""
|
计算从一个位置到另一个位置的移动方向
|
|
Args:
|
from_position: 起始位置码
|
to_position: 目标位置码
|
|
Returns:
|
Optional[str]: 移动方向(0, 90, 180, 270)
|
"""
|
from_coord = get_coordinate_from_path_id(from_position, self.path_mapping)
|
to_coord = get_coordinate_from_path_id(to_position, self.path_mapping)
|
|
if not from_coord or not to_coord:
|
return None
|
|
dx = to_coord[0] - from_coord[0]
|
dy = to_coord[1] - from_coord[1]
|
|
# 只允许四个方向的移动
|
if dx > 0 and dy == 0:
|
return "0" # 向东
|
elif dx < 0 and dy == 0:
|
return "180" # 向西
|
elif dx == 0 and dy > 0:
|
return "90" # 向南
|
elif dx == 0 and dy < 0:
|
return "270" # 向北
|
else:
|
# 不是标准四方向移动,返回默认方向
|
return "90"
|
|
def _normalize_direction(self, direction: str) -> str:
|
"""
|
标准化方向值,确保只有0、90、180、270
|
|
Args:
|
direction: 原始方向值
|
|
Returns:
|
str: 标准化后的方向值
|
"""
|
try:
|
angle = int(direction) % 360
|
|
# 将角度映射到最近的四方向
|
if angle <= 45 or angle > 315:
|
return "0"
|
elif 45 < angle <= 135:
|
return "90"
|
elif 135 < angle <= 225:
|
return "180"
|
else:
|
return "270"
|
except:
|
return "90" # 默认方向
|