"""
|
任务分配算法
|
"""
|
import time
|
import random
|
import logging
|
from typing import Dict, List, Tuple, Optional, Set, Any
|
from collections import defaultdict
|
from abc import ABC, abstractmethod
|
|
from common.data_models import TaskData, AGVStatus, TaskAssignment, BackpackData
|
from algorithm_system.models.agv_model import AGVModel, AGVModelManager
|
from common.utils import get_coordinate_from_path_id, calculate_distance, calculate_manhattan_distance
|
from dataclasses import dataclass
|
|
|
class TaskAllocation(ABC):
|
"""任务分配算法基类"""
|
|
def __init__(self, agv_manager: AGVModelManager):
|
"""
|
初始化任务分配算法
|
|
Args:
|
agv_manager: AGV模型管理器
|
"""
|
self.agv_manager = agv_manager
|
self.logger = logging.getLogger(__name__)
|
|
@abstractmethod
|
def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
|
"""
|
分配任务给AGV
|
|
Args:
|
tasks: 待分配的任务列表
|
|
Returns:
|
List[TaskAssignment]: 分配结果列表
|
"""
|
pass
|
|
def find_available_backpack_slot(self, agv_status: AGVStatus) -> Optional[int]:
|
"""
|
查找AGV的可用背篓位置
|
|
Args:
|
agv_status: AGV状态信息
|
|
Returns:
|
Optional[int]: 可用的背篓位置编号,如果没有可用位置则返回None
|
"""
|
if not agv_status.backpack:
|
# 如果没有背篓信息,假设从第一个位置开始
|
self.logger.warning(f"AGV {agv_status.agvId} 没有背篓信息,分配到位置0")
|
return 0
|
|
# 查找空闲且未执行任务的背篓位置
|
for backpack_item in agv_status.backpack:
|
if not backpack_item.loaded and not backpack_item.execute and not backpack_item.taskId:
|
self.logger.debug(f"AGV {agv_status.agvId} 找到可用背篓位置: {backpack_item.index}")
|
return backpack_item.index
|
|
# 如果所有位置都被占用,返回None
|
self.logger.debug(f"AGV {agv_status.agvId} 没有可用的背篓位置")
|
return None
|
|
def get_agv_available_capacity(self, agv_status: AGVStatus) -> int:
|
"""
|
获取AGV的可用背篓容量
|
|
Args:
|
agv_status: AGV状态信息
|
|
Returns:
|
int: 可用背篓数量
|
"""
|
if not agv_status.backpack:
|
return 1 # 假设至少有一个背篓位置
|
|
available_count = 0
|
for backpack_item in agv_status.backpack:
|
if not backpack_item.loaded and not backpack_item.execute and not backpack_item.taskId:
|
available_count += 1
|
|
return available_count
|
|
def assign_task_with_backpack(self, agv_model, task: TaskData, lev_id: int) -> bool:
|
"""
|
将任务分配给AGV的指定背篓位置
|
|
Args:
|
agv_model: AGV模型
|
task: 任务数据
|
lev_id: 背篓位置编号
|
|
Returns:
|
bool: 分配是否成功
|
"""
|
try:
|
# 使用AGV模型的assign_task方法
|
success = agv_model.assign_task(
|
task_id=task.taskId,
|
priority=task.priority,
|
start_code=task.start,
|
end_code=task.end
|
)
|
|
if success:
|
self.logger.info(f"任务 {task.taskId} 成功分配给AGV {agv_model.agvId} 的背篓位置 {lev_id}")
|
return True
|
else:
|
self.logger.warning(f"任务 {task.taskId} 分配给AGV {agv_model.agvId} 失败")
|
return False
|
|
except Exception as e:
|
self.logger.error(f"分配任务时发生异常: {e}")
|
return False
|
|
|
class NearestFirstAllocation(TaskAllocation):
|
"""最近优先分配算法"""
|
|
def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
|
"""
|
使用最近优先策略分配任务
|
|
Args:
|
tasks: 待分配的任务列表
|
|
Returns:
|
List[TaskAssignment]: 分配结果列表
|
"""
|
if not tasks:
|
return []
|
|
# 获取可用的AGV
|
available_agvs = self.agv_manager.get_available_agvs()
|
|
if not available_agvs:
|
self.logger.warning("没有可用的AGV进行任务分配")
|
return []
|
|
# 1. 首先检查任务是否已经分配,避免重复分配
|
already_assigned_tasks = set()
|
all_agvs = self.agv_manager.get_all_agvs()
|
for agv in all_agvs:
|
if agv.backpack:
|
for backpack_item in agv.backpack:
|
if backpack_item.taskId:
|
already_assigned_tasks.add(backpack_item.taskId)
|
self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配")
|
|
# 2. 过滤掉已分配的任务
|
unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
|
|
if not unassigned_tasks:
|
self.logger.info("所有任务都已分配,无需重新分配")
|
return []
|
|
self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}")
|
|
assignments = []
|
path_mapping = self.agv_manager.path_mapping
|
|
# 对每个任务找到最近的AGV
|
for task in unassigned_tasks:
|
if not available_agvs:
|
break
|
|
# 获取任务起点坐标
|
task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
|
if not task_start_coord:
|
self.logger.warning(f"无法获取任务 {task.taskId} 起点 {task.start} 的坐标")
|
continue
|
|
# 找到距离最近的AGV
|
nearest_agv = None
|
min_distance = float('inf')
|
|
for agv in available_agvs:
|
if agv.coordinates:
|
distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
|
|
# 如果AGV已有任务,计算完成当前任务后到新任务起点的距离
|
if agv.current_task_count > 0:
|
# 简化:假设AGV需要额外时间完成当前任务
|
distance += agv.current_task_count * 10
|
|
if distance < min_distance:
|
min_distance = distance
|
nearest_agv = agv
|
|
if nearest_agv and nearest_agv.can_accept_task(task.priority):
|
# 获取AGV的原始状态数据来查找可用背篓位置
|
agv_status = None
|
for agv_state in self.agv_manager.get_all_agv_status():
|
if agv_state.agvId == nearest_agv.agvId:
|
agv_status = agv_state
|
break
|
|
if agv_status:
|
# 查找可用的背篓位置
|
available_lev_id = self.find_available_backpack_slot(agv_status)
|
|
if available_lev_id is not None:
|
# 分配任务到指定背篓位置
|
success = self.assign_task_with_backpack(nearest_agv, task, available_lev_id)
|
if success:
|
assignments.append(TaskAssignment(
|
taskId=task.taskId,
|
agvId=nearest_agv.agvId,
|
lev_id=available_lev_id
|
))
|
|
self.logger.info(f"任务 {task.taskId} 分配给最近的AGV {nearest_agv.agvId},背篓位置: {available_lev_id},距离: {min_distance}")
|
|
# 检查AGV是否还有可用背篓位置
|
remaining_capacity = self.get_agv_available_capacity(agv_status) - 1
|
if remaining_capacity <= 0:
|
available_agvs.remove(nearest_agv)
|
else:
|
self.logger.warning(f"任务 {task.taskId} 分配给AGV {nearest_agv.agvId} 失败")
|
else:
|
self.logger.warning(f"AGV {nearest_agv.agvId} 没有可用的背篓位置")
|
available_agvs.remove(nearest_agv)
|
else:
|
self.logger.warning(f"无法获取AGV {nearest_agv.agvId} 的状态信息")
|
|
return assignments
|
|
|
class LoadBalancedAllocation(TaskAllocation):
|
"""负载均衡分配算法"""
|
|
def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
|
"""
|
使用负载均衡策略分配任务
|
|
Args:
|
tasks: 待分配的任务列表
|
|
Returns:
|
List[TaskAssignment]: 分配结果列表
|
"""
|
if not tasks:
|
return []
|
|
# 获取所有AGV
|
all_agvs = self.agv_manager.get_all_agvs()
|
|
if not all_agvs:
|
self.logger.warning("没有AGV进行任务分配")
|
return []
|
|
# 1. 首先检查任务是否已经分配,避免重复分配
|
already_assigned_tasks = set()
|
for agv in all_agvs:
|
if agv.backpack:
|
for backpack_item in agv.backpack:
|
if backpack_item.taskId:
|
already_assigned_tasks.add(backpack_item.taskId)
|
self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配")
|
|
# 2. 过滤掉已分配的任务
|
unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
|
|
if not unassigned_tasks:
|
self.logger.info("所有任务都已分配,无需重新分配")
|
return []
|
|
self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}")
|
|
assignments = []
|
path_mapping = self.agv_manager.path_mapping
|
|
# 按优先级排序任务
|
sorted_tasks = sorted(unassigned_tasks, key=lambda t: t.priority, reverse=True)
|
|
# 对每个任务分配给负载最低的AGV
|
for task in sorted_tasks:
|
# 获取任务起点坐标
|
task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
|
if not task_start_coord:
|
self.logger.warning(f"无法获取任务 {task.taskId} 起点 {task.start} 的坐标")
|
continue
|
|
# 按负载和距离排序AGV
|
agv_scores = []
|
for agv in all_agvs:
|
if not agv.can_accept_task(task.priority):
|
continue
|
|
# 计算负载得分(负载越低得分越高)
|
load_score = 1.0 - agv.get_workload_ratio()
|
|
# 计算距离得分(距离越近得分越高)
|
distance_score = 0.0
|
if agv.coordinates and task_start_coord:
|
distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
|
distance_score = 1.0 / (1.0 + distance / 100.0) # 归一化距离得分
|
|
# 计算效率得分
|
efficiency_score = agv.calculate_efficiency_score(path_mapping)
|
|
# 综合得分
|
total_score = 0.4 * load_score + 0.3 * distance_score + 0.3 * efficiency_score
|
agv_scores.append((agv, total_score))
|
|
if not agv_scores:
|
# 详细分析为什么没有AGV可以接受任务
|
total_agvs = len(all_agvs)
|
busy_count = 0
|
low_battery_count = 0
|
overloaded_count = 0
|
status_invalid_count = 0
|
|
for agv in all_agvs:
|
if agv.is_overloaded():
|
overloaded_count += 1
|
elif str(agv.status) not in ["0", "1", "2"]:
|
status_invalid_count += 1
|
elif agv.need_charging():
|
low_battery_count += 1
|
else:
|
busy_count += 1
|
|
self.logger.warning(f"没有AGV可以接受任务 {task.taskId} - 详细分析:")
|
self.logger.warning(f" 总AGV数: {total_agvs}")
|
self.logger.warning(f" 任务满载: {overloaded_count}")
|
self.logger.warning(f" 状态异常: {status_invalid_count}")
|
self.logger.warning(f" 电量过低: {low_battery_count}")
|
self.logger.warning(f" 其他原因: {busy_count}")
|
self.logger.warning(f" 任务优先级: {task.priority}")
|
continue
|
|
# 选择得分最高的AGV
|
agv_scores.sort(key=lambda x: x[1], reverse=True)
|
best_agv = agv_scores[0][0]
|
|
# 获取AGV的原始状态数据来查找可用背篓位置
|
agv_status = self.agv_manager.get_agv_status(best_agv.agvId)
|
|
if agv_status:
|
# 查找可用的背篓位置
|
available_lev_id = self.find_available_backpack_slot(agv_status)
|
|
if available_lev_id is not None:
|
# 分配任务到指定背篓位置
|
success = self.assign_task_with_backpack(best_agv, task, available_lev_id)
|
if success:
|
assignments.append(TaskAssignment(
|
taskId=task.taskId,
|
agvId=best_agv.agvId,
|
lev_id=available_lev_id
|
))
|
|
self.logger.info(f"任务 {task.taskId} 分配给负载均衡的AGV {best_agv.agvId},背篓位置: {available_lev_id},得分: {agv_scores[0][1]:.3f}")
|
else:
|
self.logger.warning(f"任务 {task.taskId} 分配给AGV {best_agv.agvId} 失败")
|
else:
|
self.logger.warning(f"AGV {best_agv.agvId} 没有可用的背篓位置")
|
else:
|
self.logger.warning(f"无法获取AGV {best_agv.agvId} 的状态信息")
|
|
return assignments
|
|
|
class PriorityFirstAllocation(TaskAllocation):
|
"""优先级优先分配算法"""
|
|
def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
|
"""
|
使用优先级优先策略分配任务
|
|
Args:
|
tasks: 待分配的任务列表
|
|
Returns:
|
List[TaskAssignment]: 分配结果列表
|
"""
|
if not tasks:
|
return []
|
|
# 获取可用的AGV
|
available_agvs = self.agv_manager.get_available_agvs()
|
|
if not available_agvs:
|
self.logger.warning("没有可用的AGV进行任务分配")
|
return []
|
|
# 1. 首先检查任务是否已经分配,避免重复分配
|
already_assigned_tasks = set()
|
all_agvs = self.agv_manager.get_all_agvs()
|
for agv in all_agvs:
|
if agv.backpack:
|
for backpack_item in agv.backpack:
|
if backpack_item.taskId:
|
already_assigned_tasks.add(backpack_item.taskId)
|
self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配")
|
|
# 2. 过滤掉已分配的任务
|
unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
|
|
if not unassigned_tasks:
|
self.logger.info("所有任务都已分配,无需重新分配")
|
return []
|
|
self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}")
|
|
# 按优先级排序任务(高优先级在前)
|
sorted_tasks = sorted(unassigned_tasks, key=lambda t: t.priority, reverse=True)
|
|
assignments = []
|
path_mapping = self.agv_manager.path_mapping
|
|
# 优先分配高优先级任务
|
for task in sorted_tasks:
|
if not available_agvs:
|
break
|
|
# 获取任务起点坐标
|
task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
|
if not task_start_coord:
|
self.logger.warning(f"无法获取任务 {task.taskId} 起点 {task.start} 的坐标")
|
continue
|
|
# 为高优先级任务选择最佳AGV
|
best_agv = None
|
best_score = -1
|
|
for agv in available_agvs:
|
if not agv.can_accept_task(task.priority):
|
continue
|
|
# 计算综合得分
|
distance_score = 0.0
|
if agv.coordinates and task_start_coord:
|
distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
|
distance_score = 1.0 / (1.0 + distance / 50.0)
|
|
efficiency_score = agv.calculate_efficiency_score(path_mapping)
|
capacity_score = agv.get_task_capacity() / agv.max_capacity
|
|
# 高优先级任务更注重效率和距离
|
total_score = 0.5 * distance_score + 0.3 * efficiency_score + 0.2 * capacity_score
|
|
if total_score > best_score:
|
best_score = total_score
|
best_agv = agv
|
|
if best_agv:
|
# 获取AGV的原始状态数据来查找可用背篓位置
|
agv_status = self.agv_manager.get_agv_status(best_agv.agvId)
|
|
if agv_status:
|
# 查找可用的背篓位置
|
available_lev_id = self.find_available_backpack_slot(agv_status)
|
|
if available_lev_id is not None:
|
# 分配任务到指定背篓位置
|
success = self.assign_task_with_backpack(best_agv, task, available_lev_id)
|
if success:
|
assignments.append(TaskAssignment(
|
taskId=task.taskId,
|
agvId=best_agv.agvId,
|
lev_id=available_lev_id
|
))
|
|
self.logger.info(f"高优先级任务 {task.taskId} (优先级: {task.priority}) 分配给AGV {best_agv.agvId},背篓位置: {available_lev_id}")
|
|
# 检查AGV是否还有可用背篓位置
|
remaining_capacity = self.get_agv_available_capacity(agv_status) - 1
|
if remaining_capacity <= 0:
|
available_agvs.remove(best_agv)
|
else:
|
self.logger.warning(f"任务 {task.taskId} 分配给AGV {best_agv.agvId} 失败")
|
else:
|
self.logger.warning(f"AGV {best_agv.agvId} 没有可用的背篓位置")
|
available_agvs.remove(best_agv)
|
else:
|
self.logger.warning(f"无法获取AGV {best_agv.agvId} 的状态信息")
|
|
return assignments
|
|
|
class MultiObjectiveAllocation(TaskAllocation):
|
"""多目标优化分配算法"""
|
|
def __init__(self, agv_manager: AGVModelManager,
|
distance_weight: float = 0.4,
|
load_weight: float = 0.3,
|
efficiency_weight: float = 0.3):
|
"""
|
初始化多目标优化分配算法
|
|
Args:
|
agv_manager: AGV模型管理器
|
distance_weight: 距离权重
|
load_weight: 负载权重
|
efficiency_weight: 效率权重
|
"""
|
super().__init__(agv_manager)
|
self.distance_weight = distance_weight
|
self.load_weight = load_weight
|
self.efficiency_weight = efficiency_weight
|
|
def allocate_tasks(self, tasks: List[TaskData]) -> List[TaskAssignment]:
|
"""
|
使用多目标优化策略分配任务
|
|
Args:
|
tasks: 待分配的任务列表
|
|
Returns:
|
List[TaskAssignment]: 分配结果列表
|
"""
|
if not tasks:
|
return []
|
|
# 获取所有AGV
|
all_agvs = self.agv_manager.get_all_agvs()
|
|
if not all_agvs:
|
self.logger.warning("没有AGV进行任务分配")
|
return []
|
|
# 1. 首先检查任务是否已经分配,避免重复分配
|
already_assigned_tasks = set()
|
for agv in all_agvs:
|
if agv.backpack:
|
for backpack_item in agv.backpack:
|
if backpack_item.taskId:
|
already_assigned_tasks.add(backpack_item.taskId)
|
self.logger.info(f"任务 {backpack_item.taskId} 已分配给 AGV {agv.agvId},跳过重复分配")
|
|
# 2. 过滤掉已分配的任务
|
unassigned_tasks = [task for task in tasks if task.taskId not in already_assigned_tasks]
|
|
if not unassigned_tasks:
|
self.logger.info("所有任务都已分配,无需重新分配")
|
return []
|
|
self.logger.info(f"总任务数: {len(tasks)}, 已分配: {len(already_assigned_tasks)}, 待分配: {len(unassigned_tasks)}")
|
|
assignments = []
|
path_mapping = self.agv_manager.path_mapping
|
|
# 对每个任务-AGV对计算得分
|
task_agv_scores = {}
|
|
for task in unassigned_tasks:
|
task_start_coord = get_coordinate_from_path_id(task.start, path_mapping)
|
if not task_start_coord:
|
continue
|
|
for agv in all_agvs:
|
if not agv.can_accept_task(task.priority):
|
continue
|
|
# 距离得分
|
distance_score = 0.0
|
if agv.coordinates:
|
distance = calculate_manhattan_distance(agv.coordinates, task_start_coord)
|
distance_score = 1.0 / (1.0 + distance / 100.0)
|
|
# 负载得分
|
load_score = 1.0 - agv.get_workload_ratio()
|
|
# 效率得分
|
efficiency_score = agv.calculate_efficiency_score(path_mapping)
|
|
# 计算综合得分
|
total_score = (
|
self.distance_weight * distance_score +
|
self.load_weight * load_score +
|
self.efficiency_weight * efficiency_score
|
)
|
|
task_agv_scores[(task.taskId, agv.agvId)] = total_score
|
|
# 使用贪心算法进行匹配
|
assignments = self._greedy_matching(unassigned_tasks, all_agvs, task_agv_scores)
|
|
return assignments
|
|
def _greedy_matching(self, tasks: List[TaskData], agvs: List[AGVModel],
|
scores: Dict[Tuple[str, str], float]) -> List[TaskAssignment]:
|
"""
|
使用贪心算法进行任务-AGV匹配
|
|
Args:
|
tasks: 任务列表
|
agvs: AGV列表
|
scores: 任务-AGV对的得分
|
|
Returns:
|
List[TaskAssignment]: 分配结果
|
"""
|
assignments = []
|
remaining_tasks = [task.taskId for task in tasks]
|
|
# 重复分配直到没有任务或没有可用AGV
|
while remaining_tasks:
|
# 找到得分最高的任务-AGV对
|
best_score = -1
|
best_task_id = None
|
best_agv = None
|
|
for task_id in remaining_tasks:
|
for agv in agvs:
|
if agv.is_overloaded():
|
continue
|
|
score = scores.get((task_id, agv.agvId), 0.0)
|
if score > best_score:
|
best_score = score
|
best_task_id = task_id
|
best_agv = agv
|
|
if best_task_id and best_agv:
|
# 获取AGV的原始状态数据来查找可用背篓位置
|
agv_status = self.agv_manager.get_agv_status(best_agv.agvId)
|
|
if agv_status:
|
# 查找可用的背篓位置
|
available_lev_id = self.find_available_backpack_slot(agv_status)
|
|
if available_lev_id is not None:
|
# 找到对应的任务对象
|
task = next((t for t in tasks if t.taskId == best_task_id), None)
|
if task:
|
# 分配任务到指定背篓位置
|
success = self.assign_task_with_backpack(best_agv, task, available_lev_id)
|
if success:
|
assignments.append(TaskAssignment(
|
taskId=best_task_id,
|
agvId=best_agv.agvId,
|
lev_id=available_lev_id
|
))
|
|
remaining_tasks.remove(best_task_id)
|
self.logger.info(f"多目标优化:任务 {best_task_id} 分配给AGV {best_agv.agvId},背篓位置: {available_lev_id},得分: {best_score:.3f}")
|
else:
|
self.logger.warning(f"任务 {best_task_id} 分配给AGV {best_agv.agvId} 失败")
|
break
|
else:
|
self.logger.error(f"找不到任务 {best_task_id} 的详细信息")
|
break
|
else:
|
self.logger.debug(f"AGV {best_agv.agvId} 没有可用的背篓位置,跳过")
|
break
|
else:
|
self.logger.warning(f"无法获取AGV {best_agv.agvId} 的状态信息")
|
break
|
else:
|
break
|
|
return assignments
|
|
|
class TaskAllocationFactory:
|
"""任务分配算法工厂类"""
|
|
@staticmethod
|
def create_allocator(algorithm_type: str, agv_manager: AGVModelManager) -> TaskAllocation:
|
"""
|
创建任务分配算法
|
|
Args:
|
algorithm_type: 算法类型
|
agv_manager: AGV模型管理器
|
|
Returns:
|
TaskAllocation: 任务分配算法对象
|
"""
|
if algorithm_type == "NEAREST_FIRST":
|
return NearestFirstAllocation(agv_manager)
|
elif algorithm_type == "LOAD_BALANCED":
|
return LoadBalancedAllocation(agv_manager)
|
elif algorithm_type == "PRIORITY_FIRST":
|
return PriorityFirstAllocation(agv_manager)
|
elif algorithm_type == "MULTI_OBJECTIVE":
|
return MultiObjectiveAllocation(agv_manager)
|
else:
|
# 默认使用负载均衡算法
|
return LoadBalancedAllocation(agv_manager)
|