import json
|
import pandas as pd
|
import os
|
from datetime import datetime
|
|
|
def load_edges_from_excel(excel_file):
|
"""
|
从 Excel 文件加载边
|
"""
|
|
try:
|
df = pd.read_excel(excel_file, sheet_name=0)
|
direction_col = df.columns[0]
|
start_col = df.columns[1]
|
end_col = df.columns[2]
|
|
edges = []
|
for idx, row in df.iterrows():
|
try:
|
direction = int(row[direction_col])
|
start = str(int(row[start_col]))
|
end = str(int(row[end_col]))
|
edges.append((start, end, direction))
|
except (ValueError, TypeError):
|
continue
|
return edges
|
|
except Exception as e:
|
print(f"error: {e}")
|
raise
|
|
|
def build_adjacency_from_edges(edges, path_mapping_file):
|
"""
|
根据边构建邻接表
|
"""
|
|
with open(path_mapping_file, 'r', encoding='utf-8') as f:
|
pm = json.load(f)['path_id_to_coordinates']
|
|
adjacency = {}
|
|
for start, end, edge_direction in edges:
|
if start not in adjacency:
|
adjacency[start] = []
|
if end not in adjacency:
|
adjacency[end] = []
|
|
# 计算方向(基于坐标)
|
direction = calculate_direction(start, end, pm)
|
reverse_direction = calculate_direction(end, start, pm)
|
|
if edge_direction == 0: # 双向
|
adjacency[start].append({"code": end, "direction": direction})
|
adjacency[end].append({"code": start, "direction": reverse_direction})
|
elif edge_direction == 1: # 正向(start → end)
|
adjacency[start].append({"code": end, "direction": direction})
|
elif edge_direction == 2: # 反向(end → start)
|
adjacency[end].append({"code": start, "direction": reverse_direction})
|
|
for node in adjacency:
|
# 使用 (code, direction) 作为唯一键去重
|
unique = {}
|
for neighbor in adjacency[node]:
|
key = neighbor['code']
|
unique[key] = neighbor
|
adjacency[node] = list(unique.values())
|
|
print(f"邻接表包含 {len(adjacency)} 个节点")
|
|
total_edges = sum(len(neighbors) for neighbors in adjacency.values())
|
avg_degree = total_edges / len(adjacency) if adjacency else 0
|
|
return adjacency
|
|
|
def calculate_direction(from_id, to_id, path_mapping):
|
"""
|
根据坐标计算方向
|
"""
|
if from_id not in path_mapping or to_id not in path_mapping:
|
return "90"
|
|
from_coords = path_mapping[from_id]
|
to_coords = path_mapping[to_id]
|
|
if not from_coords or not to_coords:
|
return "90"
|
|
fx, fy = from_coords[0]['x'], from_coords[0]['y']
|
tx, ty = to_coords[0]['x'], to_coords[0]['y']
|
|
dx = tx - fx
|
dy = ty - fy
|
|
if abs(dx) > abs(dy):
|
if dx > 0:
|
return "0" # 右
|
else:
|
return "180" # 左
|
else:
|
# 主要是垂直移动
|
if dy > 0:
|
return "90" # 下
|
else:
|
return "270" # 上
|
|
return "90" # 默认
|
|
|
def save_adjacency_json(adjacency, output_file, create_backup=True):
|
"""
|
保存邻接表到 JSON 文件
|
"""
|
|
try:
|
# 备份
|
if create_backup and os.path.exists(output_file):
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
backup_path = f"{output_file}.backup_{timestamp}"
|
import shutil
|
shutil.copy2(output_file, backup_path)
|
|
# 保存
|
with open(output_file, 'w', encoding='utf-8') as f:
|
json.dump(adjacency, f, indent=2, ensure_ascii=False)
|
|
|
except Exception as e:
|
print(f"error: {e}")
|
raise
|
|
|
def verify_connectivity(adjacency, test_pairs):
|
"""
|
验证连通性
|
"""
|
from collections import deque
|
|
def bfs(start, end):
|
if start not in adjacency or end not in adjacency:
|
return False, 0
|
|
visited = set()
|
queue = deque([(start, 0)])
|
visited.add(start)
|
|
while queue:
|
current, dist = queue.popleft()
|
|
if current == end:
|
return True, dist
|
|
for neighbor in adjacency.get(current, []):
|
neighbor_id = neighbor['code']
|
if neighbor_id not in visited:
|
visited.add(neighbor_id)
|
queue.append((neighbor_id, dist + 1))
|
|
return False, -1
|
|
for start, end in test_pairs:
|
connected, distance = bfs(start, end)
|
status = "Connected" if connected else "Disconnected"
|
if connected:
|
print(f" {start} → {end}: {status} (距离={distance})")
|
else:
|
print(f" {start} → {end}: {status} (不连通)")
|
|
|
def main():
|
|
excel_file = "route.xlsx"
|
path_mapping_file = "path_mapping.json"
|
output_file = "adjacency.json"
|
|
# 检查文件
|
if not os.path.exists(excel_file):
|
print(f"找不到 {excel_file}")
|
return
|
|
if not os.path.exists(path_mapping_file):
|
print(f"找不到 {path_mapping_file}")
|
return
|
|
try:
|
edges = load_edges_from_excel(excel_file)
|
|
if not edges:
|
print("未能提取到边")
|
return
|
|
adjacency = build_adjacency_from_edges(edges, path_mapping_file)
|
|
save_adjacency_json(adjacency, output_file, create_backup=True)
|
|
except Exception as e:
|
print(f"error: {e}")
|
import traceback
|
traceback.print_exc()
|
|
|
if __name__ == "__main__":
|
main()
|