| New file |
| | |
| | | import json |
| | | import pandas as pd |
| | | import os |
| | | from datetime import datetime |
| | | |
| | | |
| | | def load_edges_from_excel(excel_file): |
| | | """ |
| | | 从 Excel 文件加载边 |
| | | """ |
| | | |
| | | try: |
| | | df = pd.read_excel(excel_file, sheet_name=0) |
| | | direction_col = df.columns[0] |
| | | start_col = df.columns[1] |
| | | end_col = df.columns[2] |
| | | |
| | | edges = [] |
| | | for idx, row in df.iterrows(): |
| | | try: |
| | | direction = int(row[direction_col]) |
| | | start = str(int(row[start_col])) |
| | | end = str(int(row[end_col])) |
| | | edges.append((start, end, direction)) |
| | | except (ValueError, TypeError): |
| | | continue |
| | | return edges |
| | | |
| | | except Exception as e: |
| | | print(f"error: {e}") |
| | | raise |
| | | |
| | | |
| | | def build_adjacency_from_edges(edges, path_mapping_file): |
| | | """ |
| | | 根据边构建邻接表 |
| | | """ |
| | | |
| | | with open(path_mapping_file, 'r', encoding='utf-8') as f: |
| | | pm = json.load(f)['path_id_to_coordinates'] |
| | | |
| | | adjacency = {} |
| | | |
| | | for start, end, edge_direction in edges: |
| | | if start not in adjacency: |
| | | adjacency[start] = [] |
| | | if end not in adjacency: |
| | | adjacency[end] = [] |
| | | |
| | | # 计算方向(基于坐标) |
| | | direction = calculate_direction(start, end, pm) |
| | | reverse_direction = calculate_direction(end, start, pm) |
| | | |
| | | if edge_direction == 0: # 双向 |
| | | adjacency[start].append({"code": end, "direction": direction}) |
| | | adjacency[end].append({"code": start, "direction": reverse_direction}) |
| | | elif edge_direction == 1: # 正向(start → end) |
| | | adjacency[start].append({"code": end, "direction": direction}) |
| | | elif edge_direction == 2: # 反向(end → start) |
| | | adjacency[end].append({"code": start, "direction": reverse_direction}) |
| | | |
| | | for node in adjacency: |
| | | # 使用 (code, direction) 作为唯一键去重 |
| | | unique = {} |
| | | for neighbor in adjacency[node]: |
| | | key = neighbor['code'] |
| | | unique[key] = neighbor |
| | | adjacency[node] = list(unique.values()) |
| | | |
| | | print(f"邻接表包含 {len(adjacency)} 个节点") |
| | | |
| | | total_edges = sum(len(neighbors) for neighbors in adjacency.values()) |
| | | avg_degree = total_edges / len(adjacency) if adjacency else 0 |
| | | |
| | | return adjacency |
| | | |
| | | |
| | | def calculate_direction(from_id, to_id, path_mapping): |
| | | """ |
| | | 根据坐标计算方向 |
| | | """ |
| | | if from_id not in path_mapping or to_id not in path_mapping: |
| | | return "90" |
| | | |
| | | from_coords = path_mapping[from_id] |
| | | to_coords = path_mapping[to_id] |
| | | |
| | | if not from_coords or not to_coords: |
| | | return "90" |
| | | |
| | | fx, fy = from_coords[0]['x'], from_coords[0]['y'] |
| | | tx, ty = to_coords[0]['x'], to_coords[0]['y'] |
| | | |
| | | dx = tx - fx |
| | | dy = ty - fy |
| | | |
| | | if abs(dx) > abs(dy): |
| | | if dx > 0: |
| | | return "0" # 右 |
| | | else: |
| | | return "180" # 左 |
| | | else: |
| | | # 主要是垂直移动 |
| | | if dy > 0: |
| | | return "90" # 下 |
| | | else: |
| | | return "270" # 上 |
| | | |
| | | return "90" # 默认 |
| | | |
| | | |
| | | def save_adjacency_json(adjacency, output_file, create_backup=True): |
| | | """ |
| | | 保存邻接表到 JSON 文件 |
| | | """ |
| | | |
| | | try: |
| | | # 备份 |
| | | if create_backup and os.path.exists(output_file): |
| | | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | | backup_path = f"{output_file}.backup_{timestamp}" |
| | | import shutil |
| | | shutil.copy2(output_file, backup_path) |
| | | |
| | | # 保存 |
| | | with open(output_file, 'w', encoding='utf-8') as f: |
| | | json.dump(adjacency, f, indent=2, ensure_ascii=False) |
| | | |
| | | |
| | | except Exception as e: |
| | | print(f"error: {e}") |
| | | raise |
| | | |
| | | |
| | | def verify_connectivity(adjacency, test_pairs): |
| | | """ |
| | | 验证连通性 |
| | | """ |
| | | from collections import deque |
| | | |
| | | def bfs(start, end): |
| | | if start not in adjacency or end not in adjacency: |
| | | return False, 0 |
| | | |
| | | visited = set() |
| | | queue = deque([(start, 0)]) |
| | | visited.add(start) |
| | | |
| | | while queue: |
| | | current, dist = queue.popleft() |
| | | |
| | | if current == end: |
| | | return True, dist |
| | | |
| | | for neighbor in adjacency.get(current, []): |
| | | neighbor_id = neighbor['code'] |
| | | if neighbor_id not in visited: |
| | | visited.add(neighbor_id) |
| | | queue.append((neighbor_id, dist + 1)) |
| | | |
| | | return False, -1 |
| | | |
| | | for start, end in test_pairs: |
| | | connected, distance = bfs(start, end) |
| | | status = "Connected" if connected else "Disconnected" |
| | | if connected: |
| | | print(f" {start} → {end}: {status} (距离={distance})") |
| | | else: |
| | | print(f" {start} → {end}: {status} (不连通)") |
| | | |
| | | |
| | | def main(): |
| | | |
| | | excel_file = "route.xlsx" |
| | | path_mapping_file = "path_mapping.json" |
| | | output_file = "adjacency.json" |
| | | |
| | | # 检查文件 |
| | | if not os.path.exists(excel_file): |
| | | print(f"找不到 {excel_file}") |
| | | return |
| | | |
| | | if not os.path.exists(path_mapping_file): |
| | | print(f"找不到 {path_mapping_file}") |
| | | return |
| | | |
| | | try: |
| | | edges = load_edges_from_excel(excel_file) |
| | | |
| | | if not edges: |
| | | print("未能提取到边") |
| | | return |
| | | |
| | | adjacency = build_adjacency_from_edges(edges, path_mapping_file) |
| | | |
| | | save_adjacency_json(adjacency, output_file, create_backup=True) |
| | | |
| | | except Exception as e: |
| | | print(f"error: {e}") |
| | | import traceback |
| | | traceback.print_exc() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | main() |
| | | |
| | | |