import json
|
import logging
|
import os
|
import random
|
import time
|
import uuid
|
import string
|
from typing import Dict, List, Tuple, Optional, Any
|
from datetime import datetime
|
|
|
def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None) -> None:
|
"""设置日志配置"""
|
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
formatter = logging.Formatter(log_format)
|
|
root_logger = logging.getLogger()
|
root_logger.setLevel(getattr(logging, log_level.upper()))
|
|
for handler in root_logger.handlers[:]:
|
root_logger.removeHandler(handler)
|
|
console_handler = logging.StreamHandler()
|
console_handler.setFormatter(formatter)
|
root_logger.addHandler(console_handler)
|
|
if log_file:
|
file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
file_handler.setFormatter(formatter)
|
root_logger.addHandler(file_handler)
|
|
|
def load_path_mapping(mapping_file: str = "path_mapping.json") -> Dict[str, Dict[str, int]]:
|
"""加载路径映射文件"""
|
logger = logging.getLogger(__name__)
|
|
try:
|
with open(mapping_file, 'r', encoding='utf-8') as f:
|
data = json.load(f)
|
|
path_mapping = {}
|
if "path_id_to_coordinates" in data:
|
for path_id, coordinates in data["path_id_to_coordinates"].items():
|
if coordinates and len(coordinates) > 0:
|
coord = coordinates[0]
|
path_mapping[path_id] = {
|
"x": coord["x"],
|
"y": coord["y"]
|
}
|
|
logger.info(f"成功加载路径映射,共 {len(path_mapping)} 个路径点")
|
return path_mapping
|
|
except FileNotFoundError:
|
logger.error(f"路径映射文件不存在: {mapping_file}")
|
return {}
|
except json.JSONDecodeError as e:
|
logger.error(f"路径映射文件格式错误: {e}")
|
return {}
|
except Exception as e:
|
logger.error(f"加载路径映射文件失败: {e}")
|
return {}
|
|
|
def get_coordinate_from_path_id(path_id: str, path_mapping: Dict[str, Dict[str, int]]) -> Optional[Tuple[int, int]]:
|
"""
|
根据路径点ID获取坐标
|
|
Args:
|
path_id: 路径点ID(支持带8位带零的格式,e.g.'00000206')
|
path_mapping: 路径映射字典
|
|
Returns:
|
Optional[Tuple[int, int]]: 坐标(x, y),如果找不到则返回None
|
"""
|
logger = logging.getLogger(__name__)
|
|
if path_id in path_mapping:
|
coord = path_mapping[path_id]
|
logger.debug(f"路径ID {path_id} 匹配成功,坐标: ({coord['x']}, {coord['y']})")
|
return (coord["x"], coord["y"])
|
|
# 如果匹配失败,尝试去掉零后匹配
|
try:
|
normalized_path_id = str(int(path_id))
|
if normalized_path_id in path_mapping:
|
coord = path_mapping[normalized_path_id]
|
logger.debug(f"路径ID {path_id} 规范化为 {normalized_path_id} 后匹配成功,坐标: ({coord['x']}, {coord['y']})")
|
return (coord["x"], coord["y"])
|
else:
|
logger.warning(f"规范化后的路径ID {normalized_path_id}(原始: {path_id})在路径映射中未找到")
|
except (ValueError, TypeError):
|
logger.warning(f"路径ID {path_id} 不是有效的数字格式")
|
|
logger.warning(f"无法找到路径ID {path_id} 对应的坐标")
|
return None
|
|
|
def get_path_id_from_coordinate(x: int, y: int, path_mapping: Dict[str, Dict[str, int]]) -> Optional[str]:
|
"""根据坐标获取路径点ID"""
|
for path_id, coord in path_mapping.items():
|
if coord["x"] == x and coord["y"] == y:
|
return path_id
|
return None
|
|
|
def calculate_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> float:
|
"""计算两点之间的欧氏距离"""
|
return ((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2) ** 0.5
|
|
|
def calculate_manhattan_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> int:
|
"""计算两点之间的曼哈顿距离"""
|
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
|
|
|
def generate_random_agv_id() -> str:
|
"""生成随机AGV ID"""
|
return f"AGV_{random.randint(10001, 99999)}"
|
|
|
def generate_random_task_id() -> str:
|
"""生成随机任务ID"""
|
timestamp = int(time.time() * 1000) # 毫秒时间戳
|
random_suffix = random.randint(100, 999)
|
return f"TASK_{timestamp}_{random_suffix}"
|
|
|
def get_random_path_ids(path_mapping: Dict[str, Dict[str, int]], count: int = 2) -> List[str]:
|
"""随机获取路径点ID"""
|
if not path_mapping:
|
return []
|
|
path_ids = list(path_mapping.keys())
|
if len(path_ids) < count:
|
return path_ids
|
|
return random.sample(path_ids, count)
|
|
|
def format_timestamp(timestamp: Optional[float] = None) -> str:
|
"""格式化时间戳"""
|
if timestamp is None:
|
timestamp = time.time()
|
|
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
def normalize_path_id(path_id: str) -> str:
|
"""标准化路径点ID格式"""
|
if not path_id:
|
return path_id
|
|
try:
|
normalized_path_id = str(int(path_id))
|
return normalized_path_id
|
except (ValueError, TypeError):
|
return path_id
|
|
|
def validate_path_id(path_id: str) -> bool:
|
"""验证路径点ID格式"""
|
try:
|
# 路径点ID应该是数字字符串,在1-1696范围内
|
path_num = int(path_id)
|
return 1 <= path_num <= 1696
|
except (ValueError, TypeError):
|
return False
|
|
|
def validate_agv_id(agv_id: str) -> bool:
|
"""验证AGV ID格式"""
|
if not agv_id or not isinstance(agv_id, str):
|
return False
|
# AGV ID不能为空且长度合理
|
return len(agv_id.strip()) > 0 and len(agv_id) <= 50
|
|
|
def save_json_file(data: Any, file_path: str) -> bool:
|
"""保存数据到JSON"""
|
logger = logging.getLogger(__name__)
|
|
try:
|
with open(file_path, 'w', encoding='utf-8') as f:
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
logger.info(f"数据已保存到文件: {file_path}")
|
return True
|
|
except Exception as e:
|
logger.error(f"保存文件失败: {file_path} - {e}")
|
return False
|
|
|
def load_json_file(file_path: str) -> Optional[Any]:
|
"""从JSON文件加载数据"""
|
logger = logging.getLogger(__name__)
|
|
try:
|
with open(file_path, 'r', encoding='utf-8') as f:
|
data = json.load(f)
|
|
logger.info(f"数据已从文件加载: {file_path}")
|
return data
|
|
except FileNotFoundError:
|
logger.warning(f"文件不存在: {file_path}")
|
return None
|
except json.JSONDecodeError as e:
|
logger.error(f"JSON格式错误: {file_path} - {e}")
|
return None
|
except Exception as e:
|
logger.error(f"加载文件失败: {file_path} - {e}")
|
return None
|
|
|
def ensure_directory_exists(directory: str) -> bool:
|
"""确保目录存在,如果不存在则创建"""
|
try:
|
os.makedirs(directory, exist_ok=True)
|
return True
|
except Exception as e:
|
logging.getLogger(__name__).error(f"创建目录失败: {directory} - {e}")
|
return False
|
|
|
def generate_segment_id(agv_id: str, task_id: Optional[str] = None,
|
target_position: Optional[str] = None,
|
action_type: str = "2") -> str:
|
"""
|
生成导航段ID,用于去重标识
|
基于AGV ID、任务ID、目标位置和动作类型生成固定的11位ID
|
|
Args:
|
agv_id: AGV ID
|
task_id: 任务ID
|
target_position: 目标位置
|
action_type: 动作类型
|
|
Returns:
|
str: 11位导航段ID
|
"""
|
import hashlib
|
|
# 构建用于哈希的字符串
|
if task_id:
|
# 任务类型:使用agv_id + task_id
|
hash_input = f"{agv_id}_{task_id}_{action_type}"
|
else:
|
# 非任务类型:使用agv_id + target_position + action_type
|
target = target_position or "unknown"
|
hash_input = f"{agv_id}_{target}_{action_type}"
|
|
# 使用MD5哈希生成固定值
|
hash_object = hashlib.md5(hash_input.encode())
|
hash_hex = hash_object.hexdigest()
|
|
# 将16进制转换为数字并截取11位
|
hash_int = int(hash_hex[:8], 16)
|
seg_id = str(hash_int)[-11:].zfill(11)
|
|
return seg_id
|
|
|
def generate_navigation_code(code: str, direction: str, action_type: str = "2",
|
task_id: Optional[str] = None, pos_type: Optional[str] = None,
|
backpack_level: Optional[int] = None, is_target_point: bool = False) -> Dict:
|
"""生成导航路径点代码"""
|
path_code = {
|
'code': code,
|
'direction': direction,
|
'type': action_type
|
}
|
|
if task_id and action_type == "2":
|
path_code['taskId'] = task_id
|
|
# 只有到达目标点时才添加posType和lev,且有值时才包含字段
|
if is_target_point:
|
if pos_type:
|
path_code['posType'] = pos_type
|
|
if backpack_level is not None:
|
path_code['lev'] = backpack_level
|
|
return path_code
|