| | |
| | | # -*- coding: utf-8 -*- |
| | | |
| | | import ast |
| | | import multiprocessing |
| | | import sys |
| | | import numpy as np |
| | | import json |
| | | import time |
| | | import redis |
| | | from collections import deque, defaultdict |
| | | |
| | | radiusLen = None |
| | | codeMatrix = None # 全局变量 |
| | | cdaMatrix = None # 全局变量 |
| | | waveMatrix = None # 全局变量 |
| | | |
| | | # 将字符串转换为浮点型数组 |
| | | def convert_to_float_array(str_array): |
| | | if isinstance(str_array, str): |
| | | return np.array(ast.literal_eval(str_array), dtype=float) |
| | | return str_array |
| | | elif isinstance(str_array, list) or isinstance(str_array, np.ndarray): |
| | | return np.array(str_array, dtype=float) |
| | | return np.array([], dtype=float) |
| | | |
| | | def getWaveScopeByCode(x, y): |
| | | code = codeMatrix[x, y] |
| | | def getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen): |
| | | """ |
| | | 使用广度优先搜索(BFS)并跟踪扩展方向,以避免递归深度过大和不必要的资源浪费。 |
| | | 当遇到 'NONE' 节点时,仅在当前方向上继续扩展。 |
| | | """ |
| | | includeList = [] |
| | | existNodes = set() |
| | | spreadWaveNode({"x": x, "y": y}, {"x": x, "y": y}, existNodes, includeList) |
| | | return includeList |
| | | queue = deque() |
| | | |
| | | def spreadWaveNode(originNode, currNode, existNodes, includeList): |
| | | x, y = currNode['x'], currNode['y'] |
| | | neighbors = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)] |
| | | for neighbor in neighbors: |
| | | extendNeighborNodes(originNode, {"x": neighbor[0], "y": neighbor[1]}, existNodes, includeList) |
| | | |
| | | def extendNeighborNodes(originNode, nextNode, existNodes, includeList): |
| | | x, y = nextNode['x'], nextNode['y'] |
| | | if (x < 0 or x >= codeMatrix.shape[0] or y < 0 or y >= codeMatrix.shape[1]): |
| | | return |
| | | |
| | | if (x, y) in existNodes: |
| | | return |
| | | |
| | | # 初始节点,没有方向 |
| | | originNode = {"x": x, "y": y, "dir": None} |
| | | queue.append(originNode) |
| | | existNodes.add((x, y)) |
| | | |
| | | nextNodeCodeData = codeMatrix[x, y] |
| | | while queue: |
| | | node = queue.popleft() |
| | | node_x, node_y, current_dir = node['x'], node['y'], node['dir'] |
| | | |
| | | if nextNodeCodeData == 'NONE': |
| | | spreadWaveNode(originNode, nextNode, existNodes, includeList) |
| | | else: |
| | | o1Cda = convert_to_float_array(cdaMatrix[originNode['x'], originNode['y']]) |
| | | o2Cda = convert_to_float_array(cdaMatrix[x, y]) |
| | | # 根据当前方向决定扩展的方向 |
| | | if current_dir is None: |
| | | # 如果没有方向,向四个方向扩展 |
| | | neighbors = [ |
| | | (node_x + 1, node_y, 'right'), |
| | | (node_x - 1, node_y, 'left'), |
| | | (node_x, node_y + 1, 'down'), |
| | | (node_x, node_y - 1, 'up') |
| | | ] |
| | | else: |
| | | # 如果有方向,仅在该方向上扩展 |
| | | if current_dir == 'right': |
| | | neighbors = [(node_x + 1, node_y, 'right')] |
| | | elif current_dir == 'left': |
| | | neighbors = [(node_x - 1, node_y, 'left')] |
| | | elif current_dir == 'down': |
| | | neighbors = [(node_x, node_y + 1, 'down')] |
| | | elif current_dir == 'up': |
| | | neighbors = [(node_x, node_y - 1, 'up')] |
| | | else: |
| | | neighbors = [] |
| | | |
| | | num1 = (o1Cda[0] - o2Cda[0]) ** 2 |
| | | num2 = (o1Cda[1] - o2Cda[1]) ** 2 |
| | | if num1 + num2 <= radiusLen ** 2: |
| | | includeList.append({"x": int(x), "y": int(y), "code": str(codeMatrix[x, y])}) |
| | | spreadWaveNode(originNode, nextNode, existNodes, includeList) |
| | | for nx, ny, direction in neighbors: |
| | | # 检查边界条件 |
| | | if (nx < 0 or nx >= codeMatrix.shape[0] or ny < 0 or ny >= codeMatrix.shape[1]): |
| | | continue |
| | | if (nx, ny) in existNodes: |
| | | continue |
| | | |
| | | # 找到某个值对应的 x, y 下标 |
| | | def find_value_in_matrix(value): |
| | | existNodes.add((nx, ny)) |
| | | neighbor_code = codeMatrix[nx, ny] |
| | | |
| | | if neighbor_code == 'NONE': |
| | | # 遇到 'NONE' 节点,继续在当前方向上扩展 |
| | | queue.append({"x": nx, "y": ny, "dir": direction}) |
| | | else: |
| | | # 检查距离条件 |
| | | o1Cda = convert_to_float_array(cdaMatrix[x, y]) |
| | | o2Cda = convert_to_float_array(cdaMatrix[nx, ny]) |
| | | |
| | | num1 = (o1Cda[0] - o2Cda[0]) ** 2 |
| | | num2 = (o1Cda[1] - o2Cda[1]) ** 2 |
| | | if num1 + num2 <= radiusLen ** 2: |
| | | includeList.append({ |
| | | "x": int(nx), |
| | | "y": int(ny), |
| | | "code": str(codeMatrix[nx, ny]) |
| | | }) |
| | | # 非 'NONE' 节点,重置方向 |
| | | queue.append({"x": nx, "y": ny, "dir": None}) |
| | | |
| | | return includeList |
| | | |
| | | def find_value_in_matrix(value, codeMatrix): |
| | | indices = np.where(codeMatrix == value) |
| | | return list(zip(indices[0], indices[1])) |
| | | |
| | | def initWaveMatrix(): |
| | | global codeMatrix, waveMatrix # 声明使用全局变量 |
| | | lev = 1 |
| | | def initWaveMatrix(codeMatrix): |
| | | waveMatrix = np.empty_like(codeMatrix, dtype=object) |
| | | |
| | | for x in range(codeMatrix.shape[0]): |
| | |
| | | # 将 dynamicMatrix 转换为 numpy 结构化数组 |
| | | def convert_to_structured_array(dynamicMatrix): |
| | | # 定义结构化数组的 dtype |
| | | dtype = [('serial', int), ('vehicle', 'U2')] |
| | | dtype = [('serial', int), ('vehicle', 'U2'), ('time', int)] |
| | | |
| | | # 确保每个字典包含所有字段 |
| | | structured_list = [] |
| | | for row in dynamicMatrix: |
| | | for d in row: |
| | | # 提取字段,确保 'time' 存在,否则设置为默认值(例如 0) |
| | | serial = d.get('serial', 0) |
| | | vehicle = d.get('vehicle', '0') |
| | | time_val = d.get('time', 0) |
| | | structured_list.append((serial, vehicle, time_val)) |
| | | |
| | | # 将嵌套的列表转换为结构化数组 |
| | | structured_array = np.array([tuple(d.values()) for row in dynamicMatrix for d in row], dtype=dtype) |
| | | structured_array = np.array(structured_list, dtype=dtype) |
| | | # 重塑为原始的二维形状 |
| | | return structured_array.reshape(len(dynamicMatrix), -1) |
| | | |
| | | # 使用 numpy 加速的代码 |
| | | def process_dynamic_matrix(dynamicMatrix, codeMatrix): |
| | | global waveMatrix # 声明使用全局变量 |
| | | def process_dynamic_matrix(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen, waveMatrix): |
| | | # 将 dynamicMatrix 转换为结构化数组 |
| | | dynamicMatrix = convert_to_structured_array(dynamicMatrix) |
| | | |
| | |
| | | |
| | | # 遍历满足条件的坐标 |
| | | for x, y in zip(x_indices, y_indices): |
| | | # print(code) |
| | | vehicle = dynamicMatrix[x][y]['vehicle'] |
| | | includeList = getWaveScopeByCode(x, y) |
| | | includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen) |
| | | for include in includeList: |
| | | originWave = waveMatrix[include['x']][include['y']] |
| | | waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle) |
| | | |
| | | return waveMatrix |
| | | |
| | | def process_chunk(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen): |
| | | """处理数据块的函数,返回需要合并的 (x, y, vehicle) 列表""" |
| | | local_wave_updates = [] |
| | | for data in chunk: |
| | | x, y = data # 假设每个数据项包含x和y坐标 |
| | | vehicle = dynamicMatrix[x][y]['vehicle'] |
| | | includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen) |
| | | for include in includeList: |
| | | local_wave_updates.append((include['x'], include['y'], vehicle)) |
| | | return local_wave_updates |
| | | |
| | | def process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen): |
| | | # 将 dynamicMatrix 转换为结构化数组 |
| | | dynamicMatrix = convert_to_structured_array(dynamicMatrix) |
| | | |
| | | # 创建布尔掩码 |
| | | mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1') |
| | | x_indices, y_indices = np.where(mask) |
| | | |
| | | # 将满足条件的坐标组合成任务,每个任务包含一个数据块 |
| | | tasks = list(zip(x_indices, y_indices)) |
| | | # num_processes = multiprocessing.cpu_count() |
| | | num_processes = 5 |
| | | chunk_size = max(1, len(tasks) // num_processes) |
| | | chunks = [tasks[i:i + chunk_size] for i in range(0, len(tasks), chunk_size)] |
| | | |
| | | all_results = [] # 存储所有进程的结果 |
| | | |
| | | # 设置进程池 |
| | | with multiprocessing.Pool(processes=num_processes) as pool: |
| | | # 使用map方法并行处理任务 |
| | | results = pool.starmap(process_chunk, [(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen) for chunk in chunks]) |
| | | for result in results: |
| | | all_results.extend(result) |
| | | |
| | | # 使用 defaultdict 来收集每个 (x, y) 对应的所有 vehicle |
| | | wave_updates = defaultdict(set) |
| | | for x, y, vehicle in all_results: |
| | | wave_updates[(x, y)].add(vehicle) |
| | | |
| | | return wave_updates |
| | | |
| | | def main(): |
| | | global radiusLen, codeMatrix, cdaMatrix, waveMatrix # 声明使用全局变量 |
| | | global radiusLen, codeMatrix, cdaMatrix, waveMatrix # 声明为全局变量 |
| | | |
| | | if len(sys.argv) != 6: |
| | | print("用法: python agv.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>") |
| | | print("Usage: python script.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>") |
| | | sys.exit(1) |
| | | |
| | | radiusLenStr = sys.argv[1] |
| | | try: |
| | | radiusLen = float(radiusLenStr) |
| | | except ValueError: |
| | | print("radiusLen 必须是一个浮点数") |
| | | print("Error: radiusLen must be a float.") |
| | | sys.exit(1) |
| | | |
| | | redisHost = sys.argv[2] |
| | |
| | | |
| | | startTime = time.perf_counter() |
| | | |
| | | # 创建一个连接池 |
| | | try: |
| | | # 创建一个连接池 |
| | | pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx)) |
| | | r = redis.Redis(connection_pool=pool, decode_responses=True) |
| | | except Exception as e: |
| | | print(f"无法连接到 Redis: {e}") |
| | | sys.exit(1) |
| | | r = redis.Redis(connection_pool=pool) |
| | | |
| | | try: |
| | | # 获取并加载 codeMatrix |
| | | codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1') |
| | | if codeMatrixStr is None: |
| | | raise ValueError("Redis 中未找到键: KV.AGV_MAP_ASTAR_CODE_FLAG.1") |
| | | codeMatrix = np.array(json.loads(codeMatrixStr)) |
| | | except Exception as e: |
| | | print(f"获取 codeMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | print("Error: 'KV.AGV_MAP_ASTAR_CODE_FLAG.1' not found in Redis.") |
| | | sys.exit(1) |
| | | codeMatrix = np.array(json.loads(codeMatrixStr.decode('utf-8')), dtype=str) |
| | | |
| | | try: |
| | | # 获取并加载 cdaMatrix |
| | | cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1') |
| | | if cdaMatrixStr is None: |
| | | raise ValueError("Redis 中未找到键: KV.AGV_MAP_ASTAR_CDA_FLAG.1") |
| | | cdaMatrix = np.array(json.loads(cdaMatrixStr)) |
| | | except Exception as e: |
| | | print(f"获取 cdaMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | print("Error: 'KV.AGV_MAP_ASTAR_CDA_FLAG.1' not found in Redis.") |
| | | sys.exit(1) |
| | | cdaMatrix = np.array(json.loads(cdaMatrixStr.decode('utf-8')), dtype=object) |
| | | |
| | | try: |
| | | # 获取并加载 dynamicMatrix |
| | | dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1') |
| | | if dynamicMatrixStr is None: |
| | | raise ValueError("Redis 中未找到键: KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1") |
| | | dynamicMatrix = np.array(json.loads(dynamicMatrixStr)) |
| | | except Exception as e: |
| | | print(f"获取 dynamicMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | print("Error: 'KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1' not found in Redis.") |
| | | sys.exit(1) |
| | | dynamicMatrix = np.array(json.loads(dynamicMatrixStr.decode('utf-8')), dtype=object) |
| | | |
| | | # 初始化 waveMatrix |
| | | try: |
| | | waveMatrix = initWaveMatrix() |
| | | except Exception as e: |
| | | print(f"初始化 waveMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | # 初始化 waveMatrix |
| | | waveMatrix = initWaveMatrix(codeMatrix) |
| | | |
| | | # 处理 dynamicMatrix |
| | | try: |
| | | process_dynamic_matrix(dynamicMatrix, codeMatrix) |
| | | except Exception as e: |
| | | print(f"处理 dynamicMatrix 失败: {e}") |
| | | sys.exit(1) |
| | | # 调用并行处理的函数 |
| | | wave_updates = process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen) |
| | | |
| | | # 将 numpy.ndarray 转换为嵌套列表 |
| | | waveMatrixList = waveMatrix.tolist() |
| | | # 将嵌套列表转换为 JSON 字符串 |
| | | waveMatrixJsonStr = json.dumps(waveMatrixList) |
| | | # 应用所有更新到 waveMatrix |
| | | for (x, y), vehicles in wave_updates.items(): |
| | | originWave = waveMatrix[x][y] |
| | | for vehicle in vehicles: |
| | | originWave = mergeWave(originWave, vehicle) |
| | | waveMatrix[x][y] = originWave |
| | | |
| | | try: |
| | | # 将 numpy.ndarray 转换为嵌套列表 |
| | | waveMatrixList = waveMatrix.tolist() |
| | | # 将嵌套列表转换为 JSON 字符串 |
| | | waveMatrixJsonStr = json.dumps(waveMatrixList) |
| | | |
| | | # 将结果保存回 Redis |
| | | r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1", waveMatrixJsonStr) |
| | | except Exception as e: |
| | | print(f"将 waveMatrix 写入 Redis 失败: {e}") |
| | | sys.exit(1) |
| | | |
| | | end = time.perf_counter() |
| | | # print('程序运行时间为: %s Seconds' % (end - startTime)) |
| | | print("1") |
| | | # 如果需要,可以打印运行时间 |
| | | # print(f"程序运行时间为: {end - startTime:.2f} 秒") |
| | | end = time.perf_counter() |
| | | # 打印程序运行时间 |
| | | # print(f"程序运行时间为: {end - startTime} Seconds") |
| | | print("1") |
| | | except Exception as e: |
| | | print(f"An error occurred: {e}") |
| | | sys.exit(1) |
| | | |
| | | if __name__ == "__main__": |
| | | main() |
| | | main() |