import json import pandas as pd import os from datetime import datetime def load_edges_from_excel(excel_file): """ 从 Excel 文件加载边 """ try: df = pd.read_excel(excel_file, sheet_name=0) direction_col = df.columns[0] start_col = df.columns[1] end_col = df.columns[2] edges = [] for idx, row in df.iterrows(): try: direction = int(row[direction_col]) start = str(int(row[start_col])) end = str(int(row[end_col])) edges.append((start, end, direction)) except (ValueError, TypeError): continue return edges except Exception as e: print(f"error: {e}") raise def build_adjacency_from_edges(edges, path_mapping_file): """ 根据边构建邻接表 """ with open(path_mapping_file, 'r', encoding='utf-8') as f: pm = json.load(f)['path_id_to_coordinates'] adjacency = {} for start, end, edge_direction in edges: if start not in adjacency: adjacency[start] = [] if end not in adjacency: adjacency[end] = [] # 计算方向(基于坐标) direction = calculate_direction(start, end, pm) reverse_direction = calculate_direction(end, start, pm) if edge_direction == 0: # 双向 adjacency[start].append({"code": end, "direction": direction}) adjacency[end].append({"code": start, "direction": reverse_direction}) elif edge_direction == 1: # 正向(start → end) adjacency[start].append({"code": end, "direction": direction}) elif edge_direction == 2: # 反向(end → start) adjacency[end].append({"code": start, "direction": reverse_direction}) for node in adjacency: # 使用 (code, direction) 作为唯一键去重 unique = {} for neighbor in adjacency[node]: key = neighbor['code'] unique[key] = neighbor adjacency[node] = list(unique.values()) print(f"邻接表包含 {len(adjacency)} 个节点") total_edges = sum(len(neighbors) for neighbors in adjacency.values()) avg_degree = total_edges / len(adjacency) if adjacency else 0 return adjacency def calculate_direction(from_id, to_id, path_mapping): """ 根据坐标计算方向 """ if from_id not in path_mapping or to_id not in path_mapping: return "90" from_coords = path_mapping[from_id] to_coords = path_mapping[to_id] if not from_coords or not to_coords: return "90" fx, fy = from_coords[0]['x'], from_coords[0]['y'] tx, ty = to_coords[0]['x'], to_coords[0]['y'] dx = tx - fx dy = ty - fy if abs(dx) > abs(dy): if dx > 0: return "0" # 右 else: return "180" # 左 else: # 主要是垂直移动 if dy > 0: return "90" # 下 else: return "270" # 上 return "90" # 默认 def save_adjacency_json(adjacency, output_file, create_backup=True): """ 保存邻接表到 JSON 文件 """ try: # 备份 if create_backup and os.path.exists(output_file): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"{output_file}.backup_{timestamp}" import shutil shutil.copy2(output_file, backup_path) # 保存 with open(output_file, 'w', encoding='utf-8') as f: json.dump(adjacency, f, indent=2, ensure_ascii=False) except Exception as e: print(f"error: {e}") raise def verify_connectivity(adjacency, test_pairs): """ 验证连通性 """ from collections import deque def bfs(start, end): if start not in adjacency or end not in adjacency: return False, 0 visited = set() queue = deque([(start, 0)]) visited.add(start) while queue: current, dist = queue.popleft() if current == end: return True, dist for neighbor in adjacency.get(current, []): neighbor_id = neighbor['code'] if neighbor_id not in visited: visited.add(neighbor_id) queue.append((neighbor_id, dist + 1)) return False, -1 for start, end in test_pairs: connected, distance = bfs(start, end) status = "Connected" if connected else "Disconnected" if connected: print(f" {start} → {end}: {status} (距离={distance})") else: print(f" {start} → {end}: {status} (不连通)") def main(): excel_file = "route.xlsx" path_mapping_file = "path_mapping.json" output_file = "adjacency.json" # 检查文件 if not os.path.exists(excel_file): print(f"找不到 {excel_file}") return if not os.path.exists(path_mapping_file): print(f"找不到 {path_mapping_file}") return try: edges = load_edges_from_excel(excel_file) if not edges: print("未能提取到边") return adjacency = build_adjacency_from_edges(edges, path_mapping_file) save_adjacency_json(adjacency, output_file, create_backup=True) except Exception as e: print(f"error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()