zhang
2025-11-03 b479429080acc0a69933a906b09bca77599f3c4a
json路径文件,从excel转json的python脚本
1个文件已修改
4个文件已添加
235 ■■■■■ 已修改文件
.idea/algo-python.iml 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/misc.xml 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/modules.xml 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/vcs.xml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
algorithm_system/adjacency_from_excel.py 207 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/algo-python.iml
New file
@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
  <component name="NewModuleRootManager" inherit-compiler-output="true">
    <exclude-output />
    <content url="file://$MODULE_DIR$" />
    <orderEntry type="inheritedJdk" />
    <orderEntry type="sourceFolder" forTests="false" />
  </component>
</module>
.idea/misc.xml
@@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="KubernetesApiProvider">{}</component>
  <component name="KubernetesApiProvider"><![CDATA[{}]]></component>
  <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
    <output url="file://$PROJECT_DIR$/out" />
  </component>
</project>
.idea/modules.xml
New file
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="ProjectModuleManager">
    <modules>
      <module fileurl="file://$PROJECT_DIR$/.idea/algo-python.iml" filepath="$PROJECT_DIR$/.idea/algo-python.iml" />
    </modules>
  </component>
</project>
.idea/vcs.xml
New file
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="VcsDirectoryMappings">
    <mapping directory="" vcs="Git" />
  </component>
</project>
algorithm_system/adjacency_from_excel.py
New file
@@ -0,0 +1,207 @@
import json
import pandas as pd
import os
from datetime import datetime
def load_edges_from_excel(excel_file):
    """
    从 Excel 文件加载边
    """
    try:
        df = pd.read_excel(excel_file, sheet_name=0)
        direction_col = df.columns[0]
        start_col = df.columns[1]
        end_col = df.columns[2]
        edges = []
        for idx, row in df.iterrows():
            try:
                direction = int(row[direction_col])
                start = str(int(row[start_col]))
                end = str(int(row[end_col]))
                edges.append((start, end, direction))
            except (ValueError, TypeError):
                continue
        return edges
    except Exception as e:
        print(f"error: {e}")
        raise
def build_adjacency_from_edges(edges, path_mapping_file):
    """
    根据边构建邻接表
    """
    with open(path_mapping_file, 'r', encoding='utf-8') as f:
        pm = json.load(f)['path_id_to_coordinates']
    adjacency = {}
    for start, end, edge_direction in edges:
        if start not in adjacency:
            adjacency[start] = []
        if end not in adjacency:
            adjacency[end] = []
        # 计算方向(基于坐标)
        direction = calculate_direction(start, end, pm)
        reverse_direction = calculate_direction(end, start, pm)
        if edge_direction == 0:  # 双向
            adjacency[start].append({"code": end, "direction": direction})
            adjacency[end].append({"code": start, "direction": reverse_direction})
        elif edge_direction == 1:  # 正向(start → end)
            adjacency[start].append({"code": end, "direction": direction})
        elif edge_direction == 2:  # 反向(end → start)
            adjacency[end].append({"code": start, "direction": reverse_direction})
    for node in adjacency:
        # 使用 (code, direction) 作为唯一键去重
        unique = {}
        for neighbor in adjacency[node]:
            key = neighbor['code']
            unique[key] = neighbor
        adjacency[node] = list(unique.values())
    print(f"邻接表包含 {len(adjacency)} 个节点")
    total_edges = sum(len(neighbors) for neighbors in adjacency.values())
    avg_degree = total_edges / len(adjacency) if adjacency else 0
    return adjacency
def calculate_direction(from_id, to_id, path_mapping):
    """
    根据坐标计算方向
    """
    if from_id not in path_mapping or to_id not in path_mapping:
        return "90"
    from_coords = path_mapping[from_id]
    to_coords = path_mapping[to_id]
    if not from_coords or not to_coords:
        return "90"
    fx, fy = from_coords[0]['x'], from_coords[0]['y']
    tx, ty = to_coords[0]['x'], to_coords[0]['y']
    dx = tx - fx
    dy = ty - fy
    if abs(dx) > abs(dy):
        if dx > 0:
            return "0"    # 右
        else:
            return "180"  # 左
    else:
        # 主要是垂直移动
        if dy > 0:
            return "90"   # 下
        else:
            return "270"  # 上
    return "90"  # 默认
def save_adjacency_json(adjacency, output_file, create_backup=True):
    """
    保存邻接表到 JSON 文件
    """
    try:
        # 备份
        if create_backup and os.path.exists(output_file):
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_path = f"{output_file}.backup_{timestamp}"
            import shutil
            shutil.copy2(output_file, backup_path)
        # 保存
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(adjacency, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f"error: {e}")
        raise
def verify_connectivity(adjacency, test_pairs):
    """
    验证连通性
    """
    from collections import deque
    def bfs(start, end):
        if start not in adjacency or end not in adjacency:
            return False, 0
        visited = set()
        queue = deque([(start, 0)])
        visited.add(start)
        while queue:
            current, dist = queue.popleft()
            if current == end:
                return True, dist
            for neighbor in adjacency.get(current, []):
                neighbor_id = neighbor['code']
                if neighbor_id not in visited:
                    visited.add(neighbor_id)
                    queue.append((neighbor_id, dist + 1))
        return False, -1
    for start, end in test_pairs:
        connected, distance = bfs(start, end)
        status = "Connected" if connected else "Disconnected"
        if connected:
            print(f"  {start} → {end}: {status} (距离={distance})")
        else:
            print(f"  {start} → {end}: {status} (不连通)")
def main():
    excel_file = "route.xlsx"
    path_mapping_file = "path_mapping.json"
    output_file = "adjacency.json"
    # 检查文件
    if not os.path.exists(excel_file):
        print(f"找不到 {excel_file}")
        return
    if not os.path.exists(path_mapping_file):
        print(f"找不到 {path_mapping_file}")
        return
    try:
        edges = load_edges_from_excel(excel_file)
        if not edges:
            print("未能提取到边")
            return
        adjacency = build_adjacency_from_edges(edges, path_mapping_file)
        save_adjacency_json(adjacency, output_file, create_backup=True)
    except Exception as e:
        print(f"error: {e}")
        import traceback
        traceback.print_exc()
if __name__ == "__main__":
    main()