import ast import sys import numpy as np import json import time import redis from collections import deque radiusLen = None # 将字符串转换为浮点型数组 def convert_to_float_array(str_array): if isinstance(str_array, str): return np.array(ast.literal_eval(str_array), dtype=float) return str_array def getWaveScopeByCode(x, y): code = codeMatrix[x, y] includeList = [] existNodes = set() spreadWaveNode({"x": x, "y": y}, {"x": x, "y": y}, existNodes, includeList) return includeList def spreadWaveNode(originNode, currNode, existNodes, includeList): x, y = currNode['x'], currNode['y'] neighbors = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)] for neighbor in neighbors: extendNeighborNodes(originNode, {"x": neighbor[0], "y": neighbor[1]}, existNodes, includeList) def extendNeighborNodes(originNode, nextNode, existNodes, includeList): x, y = nextNode['x'], nextNode['y'] if (x < 0 or x >= codeMatrix.shape[0] or y < 0 or y >= codeMatrix.shape[1]): return if (x, y) in existNodes: return existNodes.add((x, y)) nextNodeCodeData = codeMatrix[x, y] if nextNodeCodeData == 'NONE': spreadWaveNode(originNode, nextNode, existNodes, includeList) else: o1Cda = convert_to_float_array(cdaMatrix[originNode['x'], originNode['y']]) o2Cda = convert_to_float_array(cdaMatrix[x, y]) num1 = (o1Cda[0] - o2Cda[0]) ** 2 num2 = (o1Cda[1] - o2Cda[1]) ** 2 if num1 + num2 <= radiusLen ** 2: includeList.append({"x": int(x), "y": int(y), "code": str(codeMatrix[x, y])}) spreadWaveNode(originNode, nextNode, existNodes, includeList) # 找到某个值对应的 x, y 下标 def find_value_in_matrix(value): indices = np.where(codeMatrix == value) return list(zip(indices[0], indices[1])) def initWaveMatrix(): lev = 1 waveMatrix = np.empty_like(codeMatrix, dtype=object) for x in range(codeMatrix.shape[0]): for y in range(codeMatrix.shape[1]): if codeMatrix[x][y] == 'NONE': waveMatrix[x][y] = "-" else: waveMatrix[x][y] = '[]' return waveMatrix # 优化版本:使用集合来提高性能 def mergeWave(originWave, vehicle): # 将字符串解析为集合 set_data = set(ast.literal_eval(originWave)) # 如果 vehicle 不在集合中,则添加 set_data.add(vehicle) # 返回序列化后的字符串 return json.dumps(list(set_data)) # 将 dynamicMatrix 转换为 numpy 结构化数组 def convert_to_structured_array(dynamicMatrix): # 定义结构化数组的 dtype dtype = [('serial', int), ('vehicle', 'U2')] # 将嵌套的列表转换为结构化数组 structured_array = np.array([tuple(d.values()) for row in dynamicMatrix for d in row], dtype=dtype) # 重塑为原始的二维形状 return structured_array.reshape(len(dynamicMatrix), -1) # 使用 numpy 加速的代码 def process_dynamic_matrix(dynamicMatrix, codeMatrix): # 将 dynamicMatrix 转换为结构化数组 dynamicMatrix = convert_to_structured_array(dynamicMatrix) # 获取 dynamicMatrix 的形状 rows, cols = dynamicMatrix.shape # 创建一个布尔掩码,用于筛选出 vehicle 不为 '0' 和 '-1' 的元素 mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1') # 获取满足条件的 x 和 y 坐标 x_indices, y_indices = np.where(mask) # 遍历满足条件的坐标 for x, y in zip(x_indices, y_indices): # print(code) data = dynamicMatrix[x][y] vehicle = data['vehicle'] includeList = getWaveScopeByCode(x,y) for include in includeList: originWave = waveMatrix[include['x']][include['y']] waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle) radiusLenStr = sys.argv[1] radiusLen = float(radiusLenStr) redisHost = sys.argv[2] redisPwd = sys.argv[3] redisPort = sys.argv[4] redisIdx = sys.argv[5] startTime = time.perf_counter() # 创建一个连接池 pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx)) r = redis.Redis(connection_pool=pool) codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1') codeMatrix = np.array(json.loads(codeMatrixStr)) cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1') cdaMatrix = np.array(json.loads(cdaMatrixStr)) dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1') dynamicMatrix = np.array(json.loads(dynamicMatrixStr)) waveMatrix = initWaveMatrix() # # 使用 numpy 加速的代码 process_dynamic_matrix(dynamicMatrix, codeMatrix) # for x in range(dynamicMatrix.shape[0]): # for y in range(dynamicMatrix.shape[1]): # data = dynamicMatrix[x, y] # vehicle = data['vehicle'] # if vehicle != '0' and vehicle != '-1': # getWaveScopeByCode(x, y) # 将 numpy.ndarray 转换为嵌套列表 waveMatrixList = waveMatrix.tolist() # 将嵌套列表转换为 JSON 字符串 waveMatrixJsonStr = json.dumps(waveMatrixList) r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1",waveMatrixJsonStr) end = time.perf_counter() # print('程序运行时间为: %s Seconds' % (end - startTime)) print("1")