| | |
| | | import ast |
| | | import sys |
| | | |
| | | import numpy as np |
| | | import json |
| | | import time |
| | | import redis |
| | | |
| | | radiusLen = None |
| | | |
| | | #with open("./codeMatrix.txt", "r") as file: |
| | | # codeMatrix = np.array(json.loads(file.read())) |
| | | |
| | | #with open("./cdaMatrix.txt", "r") as file: |
| | | # data = json.loads(file.read()) |
| | | # cdaMatrix = np.array(data) |
| | | codeMatrix = None # 全局变量 |
| | | cdaMatrix = None # 全局变量 |
| | | waveMatrix = None # 全局变量 |
| | | |
| | | # 将字符串转换为浮点型数组 |
| | | def convert_to_float_array(str_array): |
| | |
| | | return list(zip(indices[0], indices[1])) |
| | | |
| | | def initWaveMatrix(): |
| | | global codeMatrix, waveMatrix # 声明使用全局变量 |
| | | lev = 1 |
| | | waveMatrix = np.empty_like(codeMatrix, dtype=object) |
| | | |
| | |
| | | # 优化版本:使用集合来提高性能 |
| | | def mergeWave(originWave, vehicle): |
| | | # 将字符串解析为集合 |
| | | try: |
| | | set_data = set(ast.literal_eval(originWave)) |
| | | except (ValueError, SyntaxError): |
| | | set_data = set() |
| | | # 如果 vehicle 不在集合中,则添加 |
| | | set_data.add(vehicle) |
| | | # 返回序列化后的字符串 |
| | |
| | | |
| | | # 使用 numpy 加速的代码 |
| | | def process_dynamic_matrix(dynamicMatrix, codeMatrix): |
| | | global waveMatrix # 声明使用全局变量 |
| | | # 将 dynamicMatrix 转换为结构化数组 |
| | | dynamicMatrix = convert_to_structured_array(dynamicMatrix) |
| | | |
| | |
| | | # 遍历满足条件的坐标 |
| | | for x, y in zip(x_indices, y_indices): |
| | | # print(code) |
| | | data = dynamicMatrix[x][y] |
| | | vehicle = data['vehicle'] |
| | | vehicle = dynamicMatrix[x][y]['vehicle'] |
| | | includeList = getWaveScopeByCode(x,y) |
| | | for include in includeList: |
| | | originWave = waveMatrix[include['x']][include['y']] |
| | | waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle) |
| | | |
| | | def main(): |
| | | global radiusLen, codeMatrix, cdaMatrix, waveMatrix # 声明使用全局变量 |
| | | |
| | | if len(sys.argv) != 6: |
| | | print("用法: python agv.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>") |
| | | sys.exit(1) |
| | | |
| | | radiusLenStr = sys.argv[1] |
| | | try: |
| | | radiusLen = float(radiusLenStr) |
| | | except ValueError: |
| | | print("radiusLen 必须是一个浮点数") |
| | | sys.exit(1) |
| | | |
| | | codeMatrixPath = sys.argv[2] |
| | | cdaMatrixPath = sys.argv[3] |
| | | |
| | | redisHost = sys.argv[4] |
| | | redisPwd = sys.argv[5] |
| | | redisPort = sys.argv[6] |
| | | redisIdx = sys.argv[7] |
| | | |
| | | with open(codeMatrixPath, "r") as file: |
| | | codeMatrix = np.array(json.loads(file.read())) |
| | | |
| | | with open(cdaMatrixPath, "r") as file: |
| | | data = json.loads(file.read()) |
| | | cdaMatrix = np.array(data) |
| | | redisHost = sys.argv[2] |
| | | redisPwd = sys.argv[3] |
| | | redisPort = sys.argv[4] |
| | | redisIdx = sys.argv[5] |
| | | |
| | | startTime = time.perf_counter() |
| | | |
| | | waveMatrix = initWaveMatrix() |
| | | |
| | | # 创建一个连接池 |
| | | try: |
| | | pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx)) |
| | | r = redis.Redis(connection_pool=pool) |
| | | r = redis.Redis(connection_pool=pool, decode_responses=True) |
| | | except Exception as e: |
| | | print(f"无法连接到 Redis: {e}") |
| | | sys.exit(1) |
| | | |
| | | try: |
| | | codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1') |
| | | if codeMatrixStr is None: |
| | | raise ValueError("Redis 中未找到键: KV.AGV_MAP_ASTAR_CODE_FLAG.1") |
| | | codeMatrix = np.array(json.loads(codeMatrixStr)) |
| | | except Exception as e: |
| | | print(f"获取 codeMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | |
| | | try: |
| | | cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1') |
| | | if cdaMatrixStr is None: |
| | | raise ValueError("Redis 中未找到键: KV.AGV_MAP_ASTAR_CDA_FLAG.1") |
| | | cdaMatrix = np.array(json.loads(cdaMatrixStr)) |
| | | except Exception as e: |
| | | print(f"获取 cdaMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | |
| | | try: |
| | | dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1') |
| | | |
| | | if dynamicMatrixStr is None: |
| | | raise ValueError("Redis 中未找到键: KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1") |
| | | dynamicMatrix = np.array(json.loads(dynamicMatrixStr)) |
| | | # # 使用 numpy 加速的代码 |
| | | process_dynamic_matrix(dynamicMatrix, codeMatrix) |
| | | except Exception as e: |
| | | print(f"获取 dynamicMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | |
| | | # for x in range(dynamicMatrix.shape[0]): |
| | | # for y in range(dynamicMatrix.shape[1]): |
| | | # data = dynamicMatrix[x, y] |
| | | # vehicle = data['vehicle'] |
| | | # if vehicle != '0' and vehicle != '-1': |
| | | # getWaveScopeByCode(x, y) |
| | | # 初始化 waveMatrix |
| | | try: |
| | | waveMatrix = initWaveMatrix() |
| | | except Exception as e: |
| | | print(f"初始化 waveMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | |
| | | # 处理 dynamicMatrix |
| | | try: |
| | | process_dynamic_matrix(dynamicMatrix, codeMatrix) |
| | | except Exception as e: |
| | | print(f"处理 dynamicMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | |
| | | # 将 numpy.ndarray 转换为嵌套列表 |
| | | waveMatrixList = waveMatrix.tolist() |
| | | # 将嵌套列表转换为 JSON 字符串 |
| | | waveMatrixJsonStr = json.dumps(waveMatrixList) |
| | | |
| | | try: |
| | | r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1",waveMatrixJsonStr) |
| | | except Exception as e: |
| | | print(f"将 waveMatrix 写入 Redis 失败: {e}") |
| | | sys.exit(1) |
| | | |
| | | end = time.perf_counter() |
| | | # print('程序运行时间为: %s Seconds' % (end - startTime)) |
| | | print("1") |
| | | # 如果需要,可以打印运行时间 |
| | | # print(f"程序运行时间为: {end - startTime:.2f} 秒") |
| | | |
| | | if __name__ == "__main__": |
| | | main() |