import json import logging import os import random import time import uuid import string from typing import Dict, List, Tuple, Optional, Any from datetime import datetime def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None) -> None: """设置日志配置""" log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' formatter = logging.Formatter(log_format) root_logger = logging.getLogger() root_logger.setLevel(getattr(logging, log_level.upper())) for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) if log_file: file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) def load_path_mapping(mapping_file: str = "path_mapping.json") -> Dict[str, Dict[str, int]]: """加载路径映射文件""" logger = logging.getLogger(__name__) try: with open(mapping_file, 'r', encoding='utf-8') as f: data = json.load(f) path_mapping = {} if "path_id_to_coordinates" in data: for path_id, coordinates in data["path_id_to_coordinates"].items(): if coordinates and len(coordinates) > 0: coord = coordinates[0] path_mapping[path_id] = { "x": coord["x"], "y": coord["y"] } logger.info(f"成功加载路径映射,共 {len(path_mapping)} 个路径点") return path_mapping except FileNotFoundError: logger.error(f"路径映射文件不存在: {mapping_file}") return {} except json.JSONDecodeError as e: logger.error(f"路径映射文件格式错误: {e}") return {} except Exception as e: logger.error(f"加载路径映射文件失败: {e}") return {} def get_coordinate_from_path_id(path_id: str, path_mapping: Dict[str, Dict[str, int]]) -> Optional[Tuple[int, int]]: """ 根据路径点ID获取坐标 Args: path_id: 路径点ID(支持带8位带零的格式,e.g.'00000206') path_mapping: 路径映射字典 Returns: Optional[Tuple[int, int]]: 坐标(x, y),如果找不到则返回None """ logger = logging.getLogger(__name__) if path_id in path_mapping: coord = path_mapping[path_id] logger.debug(f"路径ID {path_id} 匹配成功,坐标: ({coord['x']}, {coord['y']})") return (coord["x"], coord["y"]) # 如果匹配失败,尝试去掉零后匹配 try: normalized_path_id = str(int(path_id)) if normalized_path_id in path_mapping: coord = path_mapping[normalized_path_id] logger.debug(f"路径ID {path_id} 规范化为 {normalized_path_id} 后匹配成功,坐标: ({coord['x']}, {coord['y']})") return (coord["x"], coord["y"]) else: logger.warning(f"规范化后的路径ID {normalized_path_id}(原始: {path_id})在路径映射中未找到") except (ValueError, TypeError): logger.warning(f"路径ID {path_id} 不是有效的数字格式") logger.warning(f"无法找到路径ID {path_id} 对应的坐标") return None def get_path_id_from_coordinate(x: int, y: int, path_mapping: Dict[str, Dict[str, int]]) -> Optional[str]: """根据坐标获取路径点ID""" for path_id, coord in path_mapping.items(): if coord["x"] == x and coord["y"] == y: return path_id return None def calculate_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> float: """计算两点之间的欧氏距离""" return ((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2) ** 0.5 def calculate_manhattan_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> int: """计算两点之间的曼哈顿距离""" return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) def generate_random_agv_id() -> str: """生成随机AGV ID""" return f"AGV_{random.randint(10001, 99999)}" def generate_random_task_id() -> str: """生成随机任务ID""" timestamp = int(time.time() * 1000) # 毫秒时间戳 random_suffix = random.randint(100, 999) return f"TASK_{timestamp}_{random_suffix}" def get_random_path_ids(path_mapping: Dict[str, Dict[str, int]], count: int = 2) -> List[str]: """随机获取路径点ID""" if not path_mapping: return [] path_ids = list(path_mapping.keys()) if len(path_ids) < count: return path_ids return random.sample(path_ids, count) def format_timestamp(timestamp: Optional[float] = None) -> str: """格式化时间戳""" if timestamp is None: timestamp = time.time() return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") def normalize_path_id(path_id: str) -> str: """标准化路径点ID格式""" if not path_id: return path_id try: normalized_path_id = str(int(path_id)) return normalized_path_id except (ValueError, TypeError): return path_id def validate_path_id(path_id: str) -> bool: """验证路径点ID格式""" try: # 路径点ID应该是数字字符串,在1-1696范围内 path_num = int(path_id) return 1 <= path_num <= 1696 except (ValueError, TypeError): return False def validate_agv_id(agv_id: str) -> bool: """验证AGV ID格式""" if not agv_id or not isinstance(agv_id, str): return False # AGV ID不能为空且长度合理 return len(agv_id.strip()) > 0 and len(agv_id) <= 50 def save_json_file(data: Any, file_path: str) -> bool: """保存数据到JSON""" logger = logging.getLogger(__name__) try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"数据已保存到文件: {file_path}") return True except Exception as e: logger.error(f"保存文件失败: {file_path} - {e}") return False def load_json_file(file_path: str) -> Optional[Any]: """从JSON文件加载数据""" logger = logging.getLogger(__name__) try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"数据已从文件加载: {file_path}") return data except FileNotFoundError: logger.warning(f"文件不存在: {file_path}") return None except json.JSONDecodeError as e: logger.error(f"JSON格式错误: {file_path} - {e}") return None except Exception as e: logger.error(f"加载文件失败: {file_path} - {e}") return None def ensure_directory_exists(directory: str) -> bool: """确保目录存在,如果不存在则创建""" try: os.makedirs(directory, exist_ok=True) return True except Exception as e: logging.getLogger(__name__).error(f"创建目录失败: {directory} - {e}") return False def generate_segment_id(agv_id: str, task_id: Optional[str] = None, target_position: Optional[str] = None, action_type: str = "2") -> str: """ 生成导航段ID,用于去重标识 基于AGV ID、任务ID、目标位置和动作类型生成固定的11位ID Args: agv_id: AGV ID task_id: 任务ID target_position: 目标位置 action_type: 动作类型 Returns: str: 11位导航段ID """ import hashlib # 构建用于哈希的字符串 if task_id: # 任务类型:使用agv_id + task_id hash_input = f"{agv_id}_{task_id}_{action_type}" else: # 非任务类型:使用agv_id + target_position + action_type target = target_position or "unknown" hash_input = f"{agv_id}_{target}_{action_type}" # 使用MD5哈希生成固定值 hash_object = hashlib.md5(hash_input.encode()) hash_hex = hash_object.hexdigest() # 将16进制转换为数字并截取11位 hash_int = int(hash_hex[:8], 16) seg_id = str(hash_int)[-11:].zfill(11) return seg_id def generate_navigation_code(code: str, direction: str, action_type: str = "2", task_id: Optional[str] = None, pos_type: Optional[str] = None, backpack_level: Optional[int] = None, is_target_point: bool = False) -> Dict: """生成导航路径点代码""" path_code = { 'code': code, 'direction': direction, 'type': action_type } if task_id and action_type == "2": path_code['taskId'] = task_id # 只有到达目标点时才添加posType和lev,且有值时才包含字段 if is_target_point: if pos_type: path_code['posType'] = pos_type if backpack_level is not None: path_code['lev'] = backpack_level return path_code