From 2fa19599467263dcf582bb12906e03328e03b4a4 Mon Sep 17 00:00:00 2001
From: zhang <zc857179121@qq.com>
Date: 星期三, 02 七月 2025 13:12:26 +0800
Subject: [PATCH] 初版提交

---
 common/utils.py |  286 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 286 insertions(+), 0 deletions(-)

diff --git a/common/utils.py b/common/utils.py
new file mode 100644
index 0000000..6340727
--- /dev/null
+++ b/common/utils.py
@@ -0,0 +1,286 @@
+import json
+import logging
+import os
+import random
+import time
+import uuid
+import string
+from typing import Dict, List, Tuple, Optional, Any
+from datetime import datetime
+
+
+def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None) -> None:
+    """璁剧疆鏃ュ織閰嶇疆"""
+    log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+    formatter = logging.Formatter(log_format)
+    
+    root_logger = logging.getLogger()
+    root_logger.setLevel(getattr(logging, log_level.upper()))
+    
+    for handler in root_logger.handlers[:]:
+        root_logger.removeHandler(handler)
+
+    console_handler = logging.StreamHandler()
+    console_handler.setFormatter(formatter)
+    root_logger.addHandler(console_handler)
+
+    if log_file:
+        file_handler = logging.FileHandler(log_file, encoding='utf-8')
+        file_handler.setFormatter(formatter)
+        root_logger.addHandler(file_handler)
+
+
+def load_path_mapping(mapping_file: str = "path_mapping.json") -> Dict[str, Dict[str, int]]:
+    """鍔犺浇璺緞鏄犲皠鏂囦欢"""
+    logger = logging.getLogger(__name__)
+    
+    try:
+        with open(mapping_file, 'r', encoding='utf-8') as f:
+            data = json.load(f)
+        
+        path_mapping = {}
+        if "path_id_to_coordinates" in data:
+            for path_id, coordinates in data["path_id_to_coordinates"].items():
+                if coordinates and len(coordinates) > 0:
+                    coord = coordinates[0]
+                    path_mapping[path_id] = {
+                        "x": coord["x"],
+                        "y": coord["y"]
+                    }
+        
+        logger.info(f"鎴愬姛鍔犺浇璺緞鏄犲皠锛屽叡 {len(path_mapping)} 涓矾寰勭偣")
+        return path_mapping
+        
+    except FileNotFoundError:
+        logger.error(f"璺緞鏄犲皠鏂囦欢涓嶅瓨鍦�: {mapping_file}")
+        return {}
+    except json.JSONDecodeError as e:
+        logger.error(f"璺緞鏄犲皠鏂囦欢鏍煎紡閿欒: {e}")
+        return {}
+    except Exception as e:
+        logger.error(f"鍔犺浇璺緞鏄犲皠鏂囦欢澶辫触: {e}")
+        return {}
+
+
+def get_coordinate_from_path_id(path_id: str, path_mapping: Dict[str, Dict[str, int]]) -> Optional[Tuple[int, int]]:
+    """
+    鏍规嵁璺緞鐐笽D鑾峰彇鍧愭爣
+    
+    Args:
+        path_id: 璺緞鐐笽D锛堟敮鎸佸甫8浣嶅甫闆剁殑鏍煎紡锛宔.g.'00000206'锛�
+        path_mapping: 璺緞鏄犲皠瀛楀吀
+        
+    Returns:
+        Optional[Tuple[int, int]]: 鍧愭爣(x, y)锛屽鏋滄壘涓嶅埌鍒欒繑鍥濶one
+    """
+    logger = logging.getLogger(__name__)
+    
+    if path_id in path_mapping:
+        coord = path_mapping[path_id]
+        logger.debug(f"璺緞ID {path_id} 鍖归厤鎴愬姛锛屽潗鏍�: ({coord['x']}, {coord['y']})")
+        return (coord["x"], coord["y"])
+    
+    # 濡傛灉鍖归厤澶辫触锛屽皾璇曞幓鎺夐浂鍚庡尮閰�
+    try:
+        normalized_path_id = str(int(path_id))
+        if normalized_path_id in path_mapping:
+            coord = path_mapping[normalized_path_id]
+            logger.debug(f"璺緞ID {path_id} 瑙勮寖鍖栦负 {normalized_path_id} 鍚庡尮閰嶆垚鍔燂紝鍧愭爣: ({coord['x']}, {coord['y']})")
+            return (coord["x"], coord["y"])
+        else:
+            logger.warning(f"瑙勮寖鍖栧悗鐨勮矾寰処D {normalized_path_id}锛堝師濮�: {path_id}锛夊湪璺緞鏄犲皠涓湭鎵惧埌")
+    except (ValueError, TypeError):
+        logger.warning(f"璺緞ID {path_id} 涓嶆槸鏈夋晥鐨勬暟瀛楁牸寮�")
+    
+    logger.warning(f"鏃犳硶鎵惧埌璺緞ID {path_id} 瀵瑰簲鐨勫潗鏍�")
+    return None
+
+
+def get_path_id_from_coordinate(x: int, y: int, path_mapping: Dict[str, Dict[str, int]]) -> Optional[str]:
+    """鏍规嵁鍧愭爣鑾峰彇璺緞鐐笽D"""
+    for path_id, coord in path_mapping.items():
+        if coord["x"] == x and coord["y"] == y:
+            return path_id
+    return None
+
+
+def calculate_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> float:
+    """璁$畻涓ょ偣涔嬮棿鐨勬姘忚窛绂�"""
+    return ((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2) ** 0.5
+
+
+def calculate_manhattan_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> int:
+    """璁$畻涓ょ偣涔嬮棿鐨勬浖鍝堥】璺濈"""
+    return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
+
+
+def generate_random_agv_id() -> str:
+    """鐢熸垚闅忔満AGV ID"""
+    return f"AGV_{random.randint(10001, 99999)}"
+
+
+def generate_random_task_id() -> str:
+    """鐢熸垚闅忔満浠诲姟ID"""
+    timestamp = int(time.time() * 1000)  # 姣鏃堕棿鎴�
+    random_suffix = random.randint(100, 999)
+    return f"TASK_{timestamp}_{random_suffix}"
+
+
+def get_random_path_ids(path_mapping: Dict[str, Dict[str, int]], count: int = 2) -> List[str]:
+    """闅忔満鑾峰彇璺緞鐐笽D"""
+    if not path_mapping:
+        return []
+    
+    path_ids = list(path_mapping.keys())
+    if len(path_ids) < count:
+        return path_ids
+    
+    return random.sample(path_ids, count)
+
+
+def format_timestamp(timestamp: Optional[float] = None) -> str:
+    """鏍煎紡鍖栨椂闂存埑"""
+    if timestamp is None:
+        timestamp = time.time()
+    
+    return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
+
+
+def normalize_path_id(path_id: str) -> str:
+    """鏍囧噯鍖栬矾寰勭偣ID鏍煎紡"""
+    if not path_id:
+        return path_id
+    
+    try:
+        normalized_path_id = str(int(path_id))
+        return normalized_path_id
+    except (ValueError, TypeError):
+        return path_id
+
+
+def validate_path_id(path_id: str) -> bool:
+    """楠岃瘉璺緞鐐笽D鏍煎紡"""
+    try:
+        # 璺緞鐐笽D搴旇鏄暟瀛楀瓧绗︿覆锛屽湪1-1696鑼冨洿鍐�
+        path_num = int(path_id)
+        return 1 <= path_num <= 1696
+    except (ValueError, TypeError):
+        return False
+
+
+def validate_agv_id(agv_id: str) -> bool:
+    """楠岃瘉AGV ID鏍煎紡"""
+    if not agv_id or not isinstance(agv_id, str):
+        return False
+    # AGV ID涓嶈兘涓虹┖涓旈暱搴﹀悎鐞�
+    return len(agv_id.strip()) > 0 and len(agv_id) <= 50
+
+
+def save_json_file(data: Any, file_path: str) -> bool:
+    """淇濆瓨鏁版嵁鍒癑SON"""
+    logger = logging.getLogger(__name__)
+    
+    try:
+        with open(file_path, 'w', encoding='utf-8') as f:
+            json.dump(data, f, ensure_ascii=False, indent=2)
+        
+        logger.info(f"鏁版嵁宸蹭繚瀛樺埌鏂囦欢: {file_path}")
+        return True
+        
+    except Exception as e:
+        logger.error(f"淇濆瓨鏂囦欢澶辫触: {file_path} - {e}")
+        return False
+
+
+def load_json_file(file_path: str) -> Optional[Any]:
+    """浠嶫SON鏂囦欢鍔犺浇鏁版嵁"""
+    logger = logging.getLogger(__name__)
+    
+    try:
+        with open(file_path, 'r', encoding='utf-8') as f:
+            data = json.load(f)
+        
+        logger.info(f"鏁版嵁宸蹭粠鏂囦欢鍔犺浇: {file_path}")
+        return data
+        
+    except FileNotFoundError:
+        logger.warning(f"鏂囦欢涓嶅瓨鍦�: {file_path}")
+        return None
+    except json.JSONDecodeError as e:
+        logger.error(f"JSON鏍煎紡閿欒: {file_path} - {e}")
+        return None
+    except Exception as e:
+        logger.error(f"鍔犺浇鏂囦欢澶辫触: {file_path} - {e}")
+        return None
+
+
+def ensure_directory_exists(directory: str) -> bool:
+    """纭繚鐩綍瀛樺湪锛屽鏋滀笉瀛樺湪鍒欏垱寤�"""
+    try:
+        os.makedirs(directory, exist_ok=True)
+        return True
+    except Exception as e:
+        logging.getLogger(__name__).error(f"鍒涘缓鐩綍澶辫触: {directory} - {e}")
+        return False
+
+
+def generate_segment_id(agv_id: str, task_id: Optional[str] = None, 
+                       target_position: Optional[str] = None, 
+                       action_type: str = "2") -> str:
+    """
+    鐢熸垚瀵艰埅娈礗D锛岀敤浜庡幓閲嶆爣璇�
+    鍩轰簬AGV ID銆佷换鍔D銆佺洰鏍囦綅缃拰鍔ㄤ綔绫诲瀷鐢熸垚鍥哄畾鐨�11浣岻D
+    
+    Args:
+        agv_id: AGV ID
+        task_id: 浠诲姟ID
+        target_position: 鐩爣浣嶇疆
+        action_type: 鍔ㄤ綔绫诲瀷
+        
+    Returns:
+        str: 11浣嶅鑸ID
+    """
+    import hashlib
+    
+    # 鏋勫缓鐢ㄤ簬鍝堝笇鐨勫瓧绗︿覆
+    if task_id:
+        # 浠诲姟绫诲瀷锛氫娇鐢╝gv_id + task_id
+        hash_input = f"{agv_id}_{task_id}_{action_type}"
+    else:
+        # 闈炰换鍔$被鍨嬶細浣跨敤agv_id + target_position + action_type
+        target = target_position or "unknown"
+        hash_input = f"{agv_id}_{target}_{action_type}"
+    
+    # 浣跨敤MD5鍝堝笇鐢熸垚鍥哄畾鍊�
+    hash_object = hashlib.md5(hash_input.encode())
+    hash_hex = hash_object.hexdigest()
+    
+    # 灏�16杩涘埗杞崲涓烘暟瀛楀苟鎴彇11浣�
+    hash_int = int(hash_hex[:8], 16) 
+    seg_id = str(hash_int)[-11:].zfill(11) 
+    
+    return seg_id
+
+
+def generate_navigation_code(code: str, direction: str, action_type: str = "2", 
+                           task_id: Optional[str] = None, pos_type: Optional[str] = None,
+                           backpack_level: Optional[int] = None, is_target_point: bool = False) -> Dict:
+    """鐢熸垚瀵艰埅璺緞鐐逛唬鐮�"""
+    path_code = {
+        'code': code,
+        'direction': direction,
+        'type': action_type
+    }
+    
+    if task_id and action_type == "2":
+        path_code['taskId'] = task_id
+    
+    # 鍙湁鍒拌揪鐩爣鐐规椂鎵嶆坊鍔爌osType鍜宭ev锛屼笖鏈夊�兼椂鎵嶅寘鍚瓧娈�
+    if is_target_point:
+        if pos_type:
+            path_code['posType'] = pos_type
+
+        if backpack_level is not None:
+            path_code['lev'] = backpack_level
+    
+    return path_code 
\ No newline at end of file

--
Gitblit v1.9.1