From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001 From: zhang <zc857179121@qq.com> Date: 星期三, 02 七月 2025 13:12:26 +0800 Subject: [PATCH] 初版提交 --- common/utils.py | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 286 insertions(+), 0 deletions(-) diff --git a/common/utils.py b/common/utils.py new file mode 100644 index 0000000..6340727 --- /dev/null +++ b/common/utils.py @@ -0,0 +1,286 @@ +import json +import logging +import os +import random +import time +import uuid +import string +from typing import Dict, List, Tuple, Optional, Any +from datetime import datetime + + +def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None) -> None: + """璁剧疆鏃ュ織閰嶇疆""" + log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + formatter = logging.Formatter(log_format) + + root_logger = logging.getLogger() + root_logger.setLevel(getattr(logging, log_level.upper())) + + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + root_logger.addHandler(console_handler) + + if log_file: + file_handler = logging.FileHandler(log_file, encoding='utf-8') + file_handler.setFormatter(formatter) + root_logger.addHandler(file_handler) + + +def load_path_mapping(mapping_file: str = "path_mapping.json") -> Dict[str, Dict[str, int]]: + """鍔犺浇璺緞鏄犲皠鏂囦欢""" + logger = logging.getLogger(__name__) + + try: + with open(mapping_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + path_mapping = {} + if "path_id_to_coordinates" in data: + for path_id, coordinates in data["path_id_to_coordinates"].items(): + if coordinates and len(coordinates) > 0: + coord = coordinates[0] + path_mapping[path_id] = { + "x": coord["x"], + "y": coord["y"] + } + + logger.info(f"鎴愬姛鍔犺浇璺緞鏄犲皠锛屽叡 {len(path_mapping)} 涓矾寰勭偣") + return path_mapping + + except FileNotFoundError: + logger.error(f"璺緞鏄犲皠鏂囦欢涓嶅瓨鍦�: {mapping_file}") + return {} + except json.JSONDecodeError as e: + logger.error(f"璺緞鏄犲皠鏂囦欢鏍煎紡閿欒: {e}") + return {} + except Exception as e: + logger.error(f"鍔犺浇璺緞鏄犲皠鏂囦欢澶辫触: {e}") + return {} + + +def get_coordinate_from_path_id(path_id: str, path_mapping: Dict[str, Dict[str, int]]) -> Optional[Tuple[int, int]]: + """ + 鏍规嵁璺緞鐐笽D鑾峰彇鍧愭爣 + + Args: + path_id: 璺緞鐐笽D锛堟敮鎸佸甫8浣嶅甫闆剁殑鏍煎紡锛宔.g.'00000206'锛� + path_mapping: 璺緞鏄犲皠瀛楀吀 + + Returns: + Optional[Tuple[int, int]]: 鍧愭爣(x, y)锛屽鏋滄壘涓嶅埌鍒欒繑鍥濶one + """ + logger = logging.getLogger(__name__) + + if path_id in path_mapping: + coord = path_mapping[path_id] + logger.debug(f"璺緞ID {path_id} 鍖归厤鎴愬姛锛屽潗鏍�: ({coord['x']}, {coord['y']})") + return (coord["x"], coord["y"]) + + # 濡傛灉鍖归厤澶辫触锛屽皾璇曞幓鎺夐浂鍚庡尮閰� + try: + normalized_path_id = str(int(path_id)) + if normalized_path_id in path_mapping: + coord = path_mapping[normalized_path_id] + logger.debug(f"璺緞ID {path_id} 瑙勮寖鍖栦负 {normalized_path_id} 鍚庡尮閰嶆垚鍔燂紝鍧愭爣: ({coord['x']}, {coord['y']})") + return (coord["x"], coord["y"]) + else: + logger.warning(f"瑙勮寖鍖栧悗鐨勮矾寰処D {normalized_path_id}锛堝師濮�: {path_id}锛夊湪璺緞鏄犲皠涓湭鎵惧埌") + except (ValueError, TypeError): + logger.warning(f"璺緞ID {path_id} 涓嶆槸鏈夋晥鐨勬暟瀛楁牸寮�") + + logger.warning(f"鏃犳硶鎵惧埌璺緞ID {path_id} 瀵瑰簲鐨勫潗鏍�") + return None + + +def get_path_id_from_coordinate(x: int, y: int, path_mapping: Dict[str, Dict[str, int]]) -> Optional[str]: + """鏍规嵁鍧愭爣鑾峰彇璺緞鐐笽D""" + for path_id, coord in path_mapping.items(): + if coord["x"] == x and coord["y"] == y: + return path_id + return None + + +def calculate_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> float: + """璁$畻涓ょ偣涔嬮棿鐨勬姘忚窛绂�""" + return ((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2) ** 0.5 + + +def calculate_manhattan_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> int: + """璁$畻涓ょ偣涔嬮棿鐨勬浖鍝堥】璺濈""" + return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) + + +def generate_random_agv_id() -> str: + """鐢熸垚闅忔満AGV ID""" + return f"AGV_{random.randint(10001, 99999)}" + + +def generate_random_task_id() -> str: + """鐢熸垚闅忔満浠诲姟ID""" + timestamp = int(time.time() * 1000) # 姣鏃堕棿鎴� + random_suffix = random.randint(100, 999) + return f"TASK_{timestamp}_{random_suffix}" + + +def get_random_path_ids(path_mapping: Dict[str, Dict[str, int]], count: int = 2) -> List[str]: + """闅忔満鑾峰彇璺緞鐐笽D""" + if not path_mapping: + return [] + + path_ids = list(path_mapping.keys()) + if len(path_ids) < count: + return path_ids + + return random.sample(path_ids, count) + + +def format_timestamp(timestamp: Optional[float] = None) -> str: + """鏍煎紡鍖栨椂闂存埑""" + if timestamp is None: + timestamp = time.time() + + return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") + + +def normalize_path_id(path_id: str) -> str: + """鏍囧噯鍖栬矾寰勭偣ID鏍煎紡""" + if not path_id: + return path_id + + try: + normalized_path_id = str(int(path_id)) + return normalized_path_id + except (ValueError, TypeError): + return path_id + + +def validate_path_id(path_id: str) -> bool: + """楠岃瘉璺緞鐐笽D鏍煎紡""" + try: + # 璺緞鐐笽D搴旇鏄暟瀛楀瓧绗︿覆锛屽湪1-1696鑼冨洿鍐� + path_num = int(path_id) + return 1 <= path_num <= 1696 + except (ValueError, TypeError): + return False + + +def validate_agv_id(agv_id: str) -> bool: + """楠岃瘉AGV ID鏍煎紡""" + if not agv_id or not isinstance(agv_id, str): + return False + # AGV ID涓嶈兘涓虹┖涓旈暱搴﹀悎鐞� + return len(agv_id.strip()) > 0 and len(agv_id) <= 50 + + +def save_json_file(data: Any, file_path: str) -> bool: + """淇濆瓨鏁版嵁鍒癑SON""" + logger = logging.getLogger(__name__) + + try: + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + logger.info(f"鏁版嵁宸蹭繚瀛樺埌鏂囦欢: {file_path}") + return True + + except Exception as e: + logger.error(f"淇濆瓨鏂囦欢澶辫触: {file_path} - {e}") + return False + + +def load_json_file(file_path: str) -> Optional[Any]: + """浠嶫SON鏂囦欢鍔犺浇鏁版嵁""" + logger = logging.getLogger(__name__) + + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + logger.info(f"鏁版嵁宸蹭粠鏂囦欢鍔犺浇: {file_path}") + return data + + except FileNotFoundError: + logger.warning(f"鏂囦欢涓嶅瓨鍦�: {file_path}") + return None + except json.JSONDecodeError as e: + logger.error(f"JSON鏍煎紡閿欒: {file_path} - {e}") + return None + except Exception as e: + logger.error(f"鍔犺浇鏂囦欢澶辫触: {file_path} - {e}") + return None + + +def ensure_directory_exists(directory: str) -> bool: + """纭繚鐩綍瀛樺湪锛屽鏋滀笉瀛樺湪鍒欏垱寤�""" + try: + os.makedirs(directory, exist_ok=True) + return True + except Exception as e: + logging.getLogger(__name__).error(f"鍒涘缓鐩綍澶辫触: {directory} - {e}") + return False + + +def generate_segment_id(agv_id: str, task_id: Optional[str] = None, + target_position: Optional[str] = None, + action_type: str = "2") -> str: + """ + 鐢熸垚瀵艰埅娈礗D锛岀敤浜庡幓閲嶆爣璇� + 鍩轰簬AGV ID銆佷换鍔D銆佺洰鏍囦綅缃拰鍔ㄤ綔绫诲瀷鐢熸垚鍥哄畾鐨�11浣岻D + + Args: + agv_id: AGV ID + task_id: 浠诲姟ID + target_position: 鐩爣浣嶇疆 + action_type: 鍔ㄤ綔绫诲瀷 + + Returns: + str: 11浣嶅鑸ID + """ + import hashlib + + # 鏋勫缓鐢ㄤ簬鍝堝笇鐨勫瓧绗︿覆 + if task_id: + # 浠诲姟绫诲瀷锛氫娇鐢╝gv_id + task_id + hash_input = f"{agv_id}_{task_id}_{action_type}" + else: + # 闈炰换鍔$被鍨嬶細浣跨敤agv_id + target_position + action_type + target = target_position or "unknown" + hash_input = f"{agv_id}_{target}_{action_type}" + + # 浣跨敤MD5鍝堝笇鐢熸垚鍥哄畾鍊� + hash_object = hashlib.md5(hash_input.encode()) + hash_hex = hash_object.hexdigest() + + # 灏�16杩涘埗杞崲涓烘暟瀛楀苟鎴彇11浣� + hash_int = int(hash_hex[:8], 16) + seg_id = str(hash_int)[-11:].zfill(11) + + return seg_id + + +def generate_navigation_code(code: str, direction: str, action_type: str = "2", + task_id: Optional[str] = None, pos_type: Optional[str] = None, + backpack_level: Optional[int] = None, is_target_point: bool = False) -> Dict: + """鐢熸垚瀵艰埅璺緞鐐逛唬鐮�""" + path_code = { + 'code': code, + 'direction': direction, + 'type': action_type + } + + if task_id and action_type == "2": + path_code['taskId'] = task_id + + # 鍙湁鍒拌揪鐩爣鐐规椂鎵嶆坊鍔爌osType鍜宭ev锛屼笖鏈夊�兼椂鎵嶅寘鍚瓧娈� + if is_target_point: + if pos_type: + path_code['posType'] = pos_type + + if backpack_level is not None: + path_code['lev'] = backpack_level + + return path_code \ No newline at end of file -- Gitblit v1.9.1