"""
|
环境配置模块 - 负责定义加载仓库环境配置
|
"""
|
import json
|
from enum import Enum, auto
|
import numpy as np
|
from typing import Dict, List, Tuple, Optional, Union
|
|
|
class CellType(Enum):
|
"""单元格类型枚举"""
|
EMPTY = auto() # 空单元格
|
STATION = auto() # 站点
|
PATH = auto() # 路径
|
STORAGE = auto() # 仓储区域
|
OBSTACLE = auto() # 障碍物
|
LOADING = auto() # 装载区
|
UNLOADING = auto() # 卸载区
|
|
|
class EnvironmentConfig:
|
"""环境配置类"""
|
|
def __init__(self, width: int, height: int):
|
"""初始化环境配置"""
|
self.width = width
|
self.height = height
|
# 初始化所有单元格为空
|
self.grid = np.full((height, width), CellType.EMPTY)
|
# 站点位置及其配置
|
self.stations = {} # 格式: {(x, y): {"capacity": 4, "load_position": (x-1, y), "unload_position": (x+1, y)}}
|
# 路径点位置
|
self.paths = set() # 格式: {(x1, y1), (x2, y2), ...}
|
# 仓储区域位置
|
self.storage_areas = set() # 格式: {(x1, y1), (x2, y2), ...}
|
# 装卸区位置
|
self.loading_areas = set() # 格式: {(x1, y1), (x2, y2), ...}
|
self.unloading_areas = set() # 格式: {(x1, y1), (x2, y2), ...}
|
# 障碍物位置
|
self.obstacles = set() # 格式: {(x1, y1), (x2, y2), ...}
|
# 网格点邻接表(用于路径规划)
|
self.adjacency_list = {} # 格式: {(x1, y1): {(x2, y2): distance, ...}, ...}
|
|
def set_cell_type(self, x: int, y: int, cell_type: CellType) -> bool:
|
"""
|
设置单元格类型
|
|
Args:
|
x: x坐标
|
y: y坐标
|
cell_type: 单元格类型
|
|
Returns:
|
bool: 设置是否成功
|
"""
|
if 0 <= x < self.width and 0 <= y < self.height:
|
self.grid[y, x] = cell_type
|
|
# 根据单元格类型更新相应的集合
|
pos = (x, y)
|
if cell_type == CellType.PATH:
|
self.paths.add(pos)
|
elif cell_type == CellType.STATION:
|
if pos not in self.stations:
|
self.stations[pos] = {
|
"capacity": 4, # 默认容量
|
"load_position": (x-1, y), # 默认加载位置
|
"unload_position": (x+1, y) # 默认卸载位置
|
}
|
elif cell_type == CellType.STORAGE:
|
self.storage_areas.add(pos)
|
elif cell_type == CellType.LOADING:
|
self.loading_areas.add(pos)
|
elif cell_type == CellType.UNLOADING:
|
self.unloading_areas.add(pos)
|
elif cell_type == CellType.OBSTACLE:
|
self.obstacles.add(pos)
|
|
return True
|
return False
|
|
def add_station(self, x: int, y: int, capacity: int = 4,
|
load_position: Tuple[int, int] = None,
|
unload_position: Tuple[int, int] = None) -> bool:
|
"""
|
添加站点
|
|
Args:
|
x: 站点x坐标
|
y: 站点y坐标
|
capacity: 站点容量
|
load_position: 加载位置
|
unload_position: 卸载位置
|
|
Returns:
|
bool: 添加是否成功
|
"""
|
if 0 <= x < self.width and 0 <= y < self.height:
|
# 设置默认的装卸位置
|
if load_position is None:
|
load_position = (x-1, y) # 左侧为卸料箱位置
|
if unload_position is None:
|
unload_position = (x+1, y) # 右侧为取料箱位置
|
|
# 更新网格和站点信息
|
self.grid[y, x] = CellType.STATION
|
self.stations[(x, y)] = {
|
"capacity": capacity,
|
"load_position": load_position,
|
"unload_position": unload_position
|
}
|
|
# 确保装卸位置也被标记
|
self.set_cell_type(load_position[0], load_position[1], CellType.LOADING)
|
self.set_cell_type(unload_position[0], unload_position[1], CellType.UNLOADING)
|
|
return True
|
return False
|
|
def add_path(self, x: int, y: int) -> bool:
|
"""
|
添加路径点
|
|
Args:
|
x: x坐标
|
y: y坐标
|
|
Returns:
|
bool: 添加是否成功
|
"""
|
return self.set_cell_type(x, y, CellType.PATH)
|
|
def add_storage_area(self, x: int, y: int) -> bool:
|
"""
|
添加仓储区域
|
|
Args:
|
x: x坐标
|
y: y坐标
|
|
Returns:
|
bool: 添加是否成功
|
"""
|
return self.set_cell_type(x, y, CellType.STORAGE)
|
|
def add_obstacle(self, x: int, y: int) -> bool:
|
"""
|
添加障碍物
|
|
Args:
|
x: x坐标
|
y: y坐标
|
|
Returns:
|
bool: 添加是否成功
|
"""
|
return self.set_cell_type(x, y, CellType.OBSTACLE)
|
|
def is_valid_position(self, x: int, y: int) -> bool:
|
"""
|
检查位置是否合法(在网格内且非障碍物)
|
|
Args:
|
x: x坐标
|
y: y坐标
|
|
Returns:
|
bool: 位置是否合法
|
"""
|
return (0 <= x < self.width and
|
0 <= y < self.height and
|
self.grid[y, x] != CellType.OBSTACLE)
|
|
def build_adjacency_list(self):
|
"""构建网格点邻接表(用于路径规划)"""
|
# 清空现有邻接表
|
self.adjacency_list = {}
|
|
# AGV进行四方向移动
|
directions = [
|
(0, 1), # 上
|
(1, 0), # 右
|
(0, -1), # 下
|
(-1, 0) # 左
|
]
|
|
# 为每个路径点构建邻接表
|
for x, y in self.paths:
|
if (x, y) not in self.adjacency_list:
|
self.adjacency_list[(x, y)] = {}
|
|
# 检查四个方向的邻居
|
for dx, dy in directions:
|
nx, ny = x + dx, y + dy
|
# 检查相邻点是否为有效路径点
|
if (nx, ny) in self.paths:
|
# 移动距离统一为1.0
|
distance = 1.0
|
self.adjacency_list[(x, y)][(nx, ny)] = distance
|
|
# 添加站点的装卸点到邻接表
|
for station_pos, station_info in self.stations.items():
|
load_pos = station_info["load_position"]
|
unload_pos = station_info["unload_position"]
|
|
# 确保装卸点已添加到邻接表
|
if load_pos not in self.adjacency_list:
|
self.adjacency_list[load_pos] = {}
|
if unload_pos not in self.adjacency_list:
|
self.adjacency_list[unload_pos] = {}
|
|
# 连接装卸点到最近的路径点
|
for pos in [load_pos, unload_pos]:
|
# 找到最近的路径点并连接
|
min_dist = float('inf')
|
nearest_path = None
|
|
for path_pos in self.paths:
|
dist = ((pos[0] - path_pos[0]) ** 2 + (pos[1] - path_pos[1]) ** 2) ** 0.5
|
if dist < min_dist:
|
min_dist = dist
|
nearest_path = path_pos
|
|
if nearest_path:
|
self.adjacency_list[pos][nearest_path] = min_dist
|
self.adjacency_list[nearest_path][pos] = min_dist
|
|
def save_to_file(self, filepath: str):
|
"""
|
将环境配置保存到文件
|
|
Args:
|
filepath: 文件路径
|
"""
|
config_data = {
|
"width": self.width,
|
"height": self.height,
|
"stations": {f"{x},{y}": info for (x, y), info in self.stations.items()},
|
"paths": [{"x": x, "y": y} for x, y in self.paths],
|
"storage_areas": [{"x": x, "y": y} for x, y in self.storage_areas],
|
"loading_areas": [{"x": x, "y": y} for x, y in self.loading_areas],
|
"unloading_areas": [{"x": x, "y": y} for x, y in self.unloading_areas],
|
"obstacles": [{"x": x, "y": y} for x, y in self.obstacles]
|
}
|
|
with open(filepath, 'w') as f:
|
json.dump(config_data, f, indent=2)
|
|
@classmethod
|
def load_from_file(cls, filepath: str) -> 'EnvironmentConfig':
|
"""
|
从文件加载环境配置
|
|
Args:
|
filepath: 文件路径
|
|
Returns:
|
EnvironmentConfig: 环境配置对象
|
"""
|
with open(filepath, 'r') as f:
|
config_data = json.load(f)
|
|
env_config = cls(config_data["width"], config_data["height"])
|
|
# 加载站点
|
for pos_str, info in config_data["stations"].items():
|
x, y = map(int, pos_str.split(','))
|
load_x, load_y = info["load_position"]
|
unload_x, unload_y = info["unload_position"]
|
env_config.add_station(x, y, info["capacity"],
|
(load_x, load_y), (unload_x, unload_y))
|
|
# 加载路径
|
for path in config_data["paths"]:
|
env_config.add_path(path["x"], path["y"])
|
|
# 加载仓储区域
|
for area in config_data["storage_areas"]:
|
env_config.add_storage_area(area["x"], area["y"])
|
|
# 加载障碍物
|
for obstacle in config_data["obstacles"]:
|
env_config.add_obstacle(obstacle["x"], obstacle["y"])
|
|
# 构建邻接表
|
env_config.build_adjacency_list()
|
|
return env_config
|
|
def create_default_environment(self) -> 'EnvironmentConfig':
|
"""
|
创建默认环境配置(按照需求描述配置站点和路径)
|
|
Returns:
|
EnvironmentConfig: 环境配置对象
|
"""
|
# 清空当前配置
|
self.grid = np.full((self.height, self.width), CellType.EMPTY)
|
self.stations = {}
|
self.paths = set()
|
self.storage_areas = set()
|
self.loading_areas = set()
|
self.unloading_areas = set()
|
self.obstacles = set()
|
|
# 配置20个站点
|
station_y = self.height - 5
|
station_spacing = self.width // 25 # 站点间距
|
|
for i in range(20):
|
station_x = (i + 1) * station_spacing
|
self.add_station(station_x, station_y)
|
|
# 仓储区域
|
storage_start_y = 5
|
storage_end_y = station_y - 10
|
|
for col in range(5):
|
col_x = (col + 1) * (self.width // 6)
|
|
for row_y in range(storage_start_y, storage_end_y, 5):
|
for dx in range(-1, 2):
|
for dy in range(-1, 2):
|
self.add_storage_area(col_x + dx, row_y + dy)
|
|
# 配置路径
|
# 水平主路径
|
for x in range(5, self.width - 5):
|
# 站点前水平路径
|
self.add_path(x, station_y - 5)
|
# 货架区域水平路径
|
for path_y in range(10, storage_end_y, 10):
|
self.add_path(x, path_y)
|
|
# 垂直连接路径
|
for col in range(7):
|
path_x = col * (self.width // 7)
|
for y in range(5, station_y):
|
self.add_path(path_x, y)
|
|
# 构建邻接表
|
self.build_adjacency_list()
|
|
return self
|
|
|
# 创建和初始化默认环境的辅助函数
|
def create_default_environment(width=100, height=100) -> EnvironmentConfig:
|
"""
|
创建默认环境配置
|
|
Args:
|
width: 环境宽度
|
height: 环境高度
|
|
Returns:
|
EnvironmentConfig: 默认环境配置
|
"""
|
env_config = EnvironmentConfig(width, height)
|
return env_config.create_default_environment()
|