zhang
2025-11-03 b479429080acc0a69933a906b09bca77599f3c4a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
import pandas as pd
import os
from datetime import datetime
 
 
def load_edges_from_excel(excel_file):
    """
    从 Excel 文件加载边
    """
    
    try:
        df = pd.read_excel(excel_file, sheet_name=0)
        direction_col = df.columns[0]
        start_col = df.columns[1]
        end_col = df.columns[2]
        
        edges = []
        for idx, row in df.iterrows():
            try:
                direction = int(row[direction_col])
                start = str(int(row[start_col]))
                end = str(int(row[end_col]))
                edges.append((start, end, direction))
            except (ValueError, TypeError):
                continue
        return edges
        
    except Exception as e:
        print(f"error: {e}")
        raise
 
 
def build_adjacency_from_edges(edges, path_mapping_file):
    """
    根据边构建邻接表
    """
    
    with open(path_mapping_file, 'r', encoding='utf-8') as f:
        pm = json.load(f)['path_id_to_coordinates']
    
    adjacency = {}
    
    for start, end, edge_direction in edges:
        if start not in adjacency:
            adjacency[start] = []
        if end not in adjacency:
            adjacency[end] = []
        
        # 计算方向(基于坐标)
        direction = calculate_direction(start, end, pm)
        reverse_direction = calculate_direction(end, start, pm)
        
        if edge_direction == 0:  # 双向
            adjacency[start].append({"code": end, "direction": direction})
            adjacency[end].append({"code": start, "direction": reverse_direction})
        elif edge_direction == 1:  # 正向(start → end)
            adjacency[start].append({"code": end, "direction": direction})
        elif edge_direction == 2:  # 反向(end → start)
            adjacency[end].append({"code": start, "direction": reverse_direction})
    
    for node in adjacency:
        # 使用 (code, direction) 作为唯一键去重
        unique = {}
        for neighbor in adjacency[node]:
            key = neighbor['code']
            unique[key] = neighbor
        adjacency[node] = list(unique.values())
    
    print(f"邻接表包含 {len(adjacency)} 个节点")
    
    total_edges = sum(len(neighbors) for neighbors in adjacency.values())
    avg_degree = total_edges / len(adjacency) if adjacency else 0
    
    return adjacency
 
 
def calculate_direction(from_id, to_id, path_mapping):
    """
    根据坐标计算方向
    """
    if from_id not in path_mapping or to_id not in path_mapping:
        return "90" 
    
    from_coords = path_mapping[from_id]
    to_coords = path_mapping[to_id]
    
    if not from_coords or not to_coords:
        return "90"
    
    fx, fy = from_coords[0]['x'], from_coords[0]['y']
    tx, ty = to_coords[0]['x'], to_coords[0]['y']
    
    dx = tx - fx
    dy = ty - fy
    
    if abs(dx) > abs(dy):
        if dx > 0:
            return "0"    # 右
        else:
            return "180"  # 左
    else:
        # 主要是垂直移动
        if dy > 0:
            return "90"   # 下
        else:
            return "270"  # 上
    
    return "90"  # 默认
 
 
def save_adjacency_json(adjacency, output_file, create_backup=True):
    """
    保存邻接表到 JSON 文件
    """
    
    try:
        # 备份
        if create_backup and os.path.exists(output_file):
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_path = f"{output_file}.backup_{timestamp}"
            import shutil
            shutil.copy2(output_file, backup_path)
        
        # 保存
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(adjacency, f, indent=2, ensure_ascii=False)
 
        
    except Exception as e:
        print(f"error: {e}")
        raise
 
 
def verify_connectivity(adjacency, test_pairs):
    """
    验证连通性
    """
    from collections import deque
    
    def bfs(start, end):
        if start not in adjacency or end not in adjacency:
            return False, 0
        
        visited = set()
        queue = deque([(start, 0)])
        visited.add(start)
        
        while queue:
            current, dist = queue.popleft()
            
            if current == end:
                return True, dist
            
            for neighbor in adjacency.get(current, []):
                neighbor_id = neighbor['code']
                if neighbor_id not in visited:
                    visited.add(neighbor_id)
                    queue.append((neighbor_id, dist + 1))
        
        return False, -1
    
    for start, end in test_pairs:
        connected, distance = bfs(start, end)
        status = "Connected" if connected else "Disconnected"
        if connected:
            print(f"  {start} → {end}: {status} (距离={distance})")
        else:
            print(f"  {start} → {end}: {status} (不连通)")
 
 
def main():
    
    excel_file = "route.xlsx"
    path_mapping_file = "path_mapping.json"
    output_file = "adjacency.json"
    
    # 检查文件
    if not os.path.exists(excel_file):
        print(f"找不到 {excel_file}")
        return
    
    if not os.path.exists(path_mapping_file):
        print(f"找不到 {path_mapping_file}")
        return
    
    try:
        edges = load_edges_from_excel(excel_file)
        
        if not edges:
            print("未能提取到边")
            return
 
        adjacency = build_adjacency_from_edges(edges, path_mapping_file)
 
        save_adjacency_json(adjacency, output_file, create_backup=True)
        
    except Exception as e:
        print(f"error: {e}")
        import traceback
        traceback.print_exc()
 
 
if __name__ == "__main__":
    main()