From 9483baffba9a24a2a36fc8739fc65b59317d9142 Mon Sep 17 00:00:00 2001 From: zhang <zc857179121@qq.com> Date: 星期四, 03 七月 2025 14:18:46 +0800 Subject: [PATCH] 队列拆分 --- zy-acs-manager/src/main/resources/agv.py | 280 ++++++++++++++++++++++++++++++++++++++++--------------- 1 files changed, 202 insertions(+), 78 deletions(-) diff --git a/zy-acs-manager/src/main/resources/agv.py b/zy-acs-manager/src/main/resources/agv.py index a366634..ad188ed 100644 --- a/zy-acs-manager/src/main/resources/agv.py +++ b/zy-acs-manager/src/main/resources/agv.py @@ -1,64 +1,100 @@ -import ast -import sys +# -*- coding: utf-8 -*- +import ast +import multiprocessing +import sys import numpy as np import json import time import redis +from collections import deque, defaultdict radiusLen = None - # 灏嗗瓧绗︿覆杞崲涓烘诞鐐瑰瀷鏁扮粍 def convert_to_float_array(str_array): if isinstance(str_array, str): return np.array(ast.literal_eval(str_array), dtype=float) - return str_array + elif isinstance(str_array, list) or isinstance(str_array, np.ndarray): + return np.array(str_array, dtype=float) + return np.array([], dtype=float) -def getWaveScopeByCode(x, y): - code = codeMatrix[x, y] +def getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen): + """ + 浣跨敤骞垮害浼樺厛鎼滅储锛圔FS锛夊苟璺熻釜鎵╁睍鏂瑰悜锛屼互閬垮厤閫掑綊娣卞害杩囧ぇ鍜屼笉蹇呰鐨勮祫婧愭氮璐广�� + 褰撻亣鍒� 'NONE' 鑺傜偣鏃讹紝浠呭湪褰撳墠鏂瑰悜涓婄户缁墿灞曘�� + """ includeList = [] existNodes = set() - spreadWaveNode({"x": x, "y": y}, {"x": x, "y": y}, existNodes, includeList) - return includeList + queue = deque() -def spreadWaveNode(originNode, currNode, existNodes, includeList): - x, y = currNode['x'], currNode['y'] - neighbors = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)] - for neighbor in neighbors: - extendNeighborNodes(originNode, {"x": neighbor[0], "y": neighbor[1]}, existNodes, includeList) - -def extendNeighborNodes(originNode, nextNode, existNodes, includeList): - x, y = nextNode['x'], nextNode['y'] - if (x < 0 or x >= codeMatrix.shape[0] or y < 0 or y >= codeMatrix.shape[1]): - return - - if (x, y) in existNodes: - return - + # 鍒濆鑺傜偣锛屾病鏈夋柟鍚� + originNode = {"x": x, "y": y, "dir": None} + queue.append(originNode) existNodes.add((x, y)) - nextNodeCodeData = codeMatrix[x, y] + while queue: + node = queue.popleft() + node_x, node_y, current_dir = node['x'], node['y'], node['dir'] - if nextNodeCodeData == 'NONE': - spreadWaveNode(originNode, nextNode, existNodes, includeList) - else: - o1Cda = convert_to_float_array(cdaMatrix[originNode['x'], originNode['y']]) - o2Cda = convert_to_float_array(cdaMatrix[x, y]) + # 鏍规嵁褰撳墠鏂瑰悜鍐冲畾鎵╁睍鐨勬柟鍚� + if current_dir is None: + # 濡傛灉娌℃湁鏂瑰悜锛屽悜鍥涗釜鏂瑰悜鎵╁睍 + neighbors = [ + (node_x + 1, node_y, 'right'), + (node_x - 1, node_y, 'left'), + (node_x, node_y + 1, 'down'), + (node_x, node_y - 1, 'up') + ] + else: + # 濡傛灉鏈夋柟鍚戯紝浠呭湪璇ユ柟鍚戜笂鎵╁睍 + if current_dir == 'right': + neighbors = [(node_x + 1, node_y, 'right')] + elif current_dir == 'left': + neighbors = [(node_x - 1, node_y, 'left')] + elif current_dir == 'down': + neighbors = [(node_x, node_y + 1, 'down')] + elif current_dir == 'up': + neighbors = [(node_x, node_y - 1, 'up')] + else: + neighbors = [] - num1 = (o1Cda[0] - o2Cda[0]) ** 2 - num2 = (o1Cda[1] - o2Cda[1]) ** 2 - if num1 + num2 <= radiusLen ** 2: - includeList.append({"x": int(x), "y": int(y), "code": str(codeMatrix[x, y])}) - spreadWaveNode(originNode, nextNode, existNodes, includeList) + for nx, ny, direction in neighbors: + # 妫�鏌ヨ竟鐣屾潯浠� + if (nx < 0 or nx >= codeMatrix.shape[0] or ny < 0 or ny >= codeMatrix.shape[1]): + continue + if (nx, ny) in existNodes: + continue -# 鎵惧埌鏌愪釜鍊煎搴旂殑 x, y 涓嬫爣 -def find_value_in_matrix(value): + existNodes.add((nx, ny)) + neighbor_code = codeMatrix[nx, ny] + + if neighbor_code == 'NONE': + # 閬囧埌 'NONE' 鑺傜偣锛岀户缁湪褰撳墠鏂瑰悜涓婃墿灞� + queue.append({"x": nx, "y": ny, "dir": direction}) + else: + # 妫�鏌ヨ窛绂绘潯浠� + o1Cda = convert_to_float_array(cdaMatrix[x, y]) + o2Cda = convert_to_float_array(cdaMatrix[nx, ny]) + + num1 = (o1Cda[0] - o2Cda[0]) ** 2 + num2 = (o1Cda[1] - o2Cda[1]) ** 2 + if num1 + num2 <= radiusLen ** 2: + includeList.append({ + "x": int(nx), + "y": int(ny), + "code": str(codeMatrix[nx, ny]) + }) + # 闈� 'NONE' 鑺傜偣锛岄噸缃柟鍚� + queue.append({"x": nx, "y": ny, "dir": None}) + + return includeList + +def find_value_in_matrix(value, codeMatrix): indices = np.where(codeMatrix == value) return list(zip(indices[0], indices[1])) -def initWaveMatrix(): - lev = 1 +def initWaveMatrix(codeMatrix): waveMatrix = np.empty_like(codeMatrix, dtype=object) for x in range(codeMatrix.shape[0]): @@ -73,7 +109,10 @@ # 浼樺寲鐗堟湰锛氫娇鐢ㄩ泦鍚堟潵鎻愰珮鎬ц兘 def mergeWave(originWave, vehicle): # 灏嗗瓧绗︿覆瑙f瀽涓洪泦鍚� - set_data = set(ast.literal_eval(originWave)) + try: + set_data = set(ast.literal_eval(originWave)) + except (ValueError, SyntaxError): + set_data = set() # 濡傛灉 vehicle 涓嶅湪闆嗗悎涓紝鍒欐坊鍔� set_data.add(vehicle) # 杩斿洖搴忓垪鍖栧悗鐨勫瓧绗︿覆 @@ -82,14 +121,25 @@ # 灏� dynamicMatrix 杞崲涓� numpy 缁撴瀯鍖栨暟缁� def convert_to_structured_array(dynamicMatrix): # 瀹氫箟缁撴瀯鍖栨暟缁勭殑 dtype - dtype = [('serial', int), ('vehicle', 'U2')] + dtype = [('serial', int), ('vehicle', 'U2'), ('time', int)] + + # 纭繚姣忎釜瀛楀吀鍖呭惈鎵�鏈夊瓧娈� + structured_list = [] + for row in dynamicMatrix: + for d in row: + # 鎻愬彇瀛楁锛岀‘淇� 'time' 瀛樺湪锛屽惁鍒欒缃负榛樿鍊硷紙渚嬪 0锛� + serial = d.get('serial', 0) + vehicle = d.get('vehicle', '0') + time_val = d.get('time', 0) + structured_list.append((serial, vehicle, time_val)) + # 灏嗗祵濂楃殑鍒楄〃杞崲涓虹粨鏋勫寲鏁扮粍 - structured_array = np.array([tuple(d.values()) for row in dynamicMatrix for d in row], dtype=dtype) + structured_array = np.array(structured_list, dtype=dtype) # 閲嶅涓哄師濮嬬殑浜岀淮褰㈢姸 return structured_array.reshape(len(dynamicMatrix), -1) # 浣跨敤 numpy 鍔犻�熺殑浠g爜 -def process_dynamic_matrix(dynamicMatrix, codeMatrix): +def process_dynamic_matrix(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen, waveMatrix): # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍 dynamicMatrix = convert_to_structured_array(dynamicMatrix) @@ -104,57 +154,131 @@ # 閬嶅巻婊¤冻鏉′欢鐨勫潗鏍� for x, y in zip(x_indices, y_indices): - # print(code) - data = dynamicMatrix[x][y] - vehicle = data['vehicle'] - includeList = getWaveScopeByCode(x,y) + vehicle = dynamicMatrix[x][y]['vehicle'] + includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen) for include in includeList: originWave = waveMatrix[include['x']][include['y']] waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle) -radiusLenStr = sys.argv[1] -radiusLen = float(radiusLenStr) + return waveMatrix -redisHost = sys.argv[2] -redisPwd = sys.argv[3] -redisPort = sys.argv[4] -redisIdx = sys.argv[5] +def process_chunk(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen): + """澶勭悊鏁版嵁鍧楃殑鍑芥暟锛岃繑鍥為渶瑕佸悎骞剁殑 (x, y, vehicle) 鍒楄〃""" + local_wave_updates = [] + for data in chunk: + x, y = data # 鍋囪姣忎釜鏁版嵁椤瑰寘鍚玿鍜寉鍧愭爣 + vehicle = dynamicMatrix[x][y]['vehicle'] + includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen) + for include in includeList: + local_wave_updates.append((include['x'], include['y'], vehicle)) + return local_wave_updates -startTime = time.perf_counter() +def process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen): + # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍 + dynamicMatrix = convert_to_structured_array(dynamicMatrix) -# 鍒涘缓涓�涓繛鎺ユ睜 -pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx)) -r = redis.Redis(connection_pool=pool) + # 鍒涘缓甯冨皵鎺╃爜 + mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1') + x_indices, y_indices = np.where(mask) -codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1') -codeMatrix = np.array(json.loads(codeMatrixStr)) + # 灏嗘弧瓒虫潯浠剁殑鍧愭爣缁勫悎鎴愪换鍔★紝姣忎釜浠诲姟鍖呭惈涓�涓暟鎹潡 + tasks = list(zip(x_indices, y_indices)) +# num_processes = multiprocessing.cpu_count() + num_processes = 5 + chunk_size = max(1, len(tasks) // num_processes) + chunks = [tasks[i:i + chunk_size] for i in range(0, len(tasks), chunk_size)] -cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1') -cdaMatrix = np.array(json.loads(cdaMatrixStr)) + all_results = [] # 瀛樺偍鎵�鏈夎繘绋嬬殑缁撴灉 -dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1') -dynamicMatrix = np.array(json.loads(dynamicMatrixStr)) + # 璁剧疆杩涚▼姹� + with multiprocessing.Pool(processes=num_processes) as pool: + # 浣跨敤map鏂规硶骞惰澶勭悊浠诲姟 + results = pool.starmap(process_chunk, [(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen) for chunk in chunks]) + for result in results: + all_results.extend(result) -waveMatrix = initWaveMatrix() + # 浣跨敤 defaultdict 鏉ユ敹闆嗘瘡涓� (x, y) 瀵瑰簲鐨勬墍鏈� vehicle + wave_updates = defaultdict(set) + for x, y, vehicle in all_results: + wave_updates[(x, y)].add(vehicle) -# # 浣跨敤 numpy 鍔犻�熺殑浠g爜 -process_dynamic_matrix(dynamicMatrix, codeMatrix) + return wave_updates -# for x in range(dynamicMatrix.shape[0]): -# for y in range(dynamicMatrix.shape[1]): -# data = dynamicMatrix[x, y] -# vehicle = data['vehicle'] -# if vehicle != '0' and vehicle != '-1': -# getWaveScopeByCode(x, y) +def main(): + global radiusLen, codeMatrix, cdaMatrix, waveMatrix # 澹版槑涓哄叏灞�鍙橀噺 -# 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛� -waveMatrixList = waveMatrix.tolist() -# 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓� -waveMatrixJsonStr = json.dumps(waveMatrixList) + if len(sys.argv) != 6: + print("Usage: python script.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>") + sys.exit(1) -r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1",waveMatrixJsonStr) + radiusLenStr = sys.argv[1] + try: + radiusLen = float(radiusLenStr) + except ValueError: + print("Error: radiusLen must be a float.") + sys.exit(1) -end = time.perf_counter() -# print('绋嬪簭杩愯鏃堕棿涓�: %s Seconds' % (end - startTime)) -print("1") + redisHost = sys.argv[2] + redisPwd = sys.argv[3] + redisPort = sys.argv[4] + redisIdx = sys.argv[5] + startTime = time.perf_counter() + + try: + # 鍒涘缓涓�涓繛鎺ユ睜 + pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx)) + r = redis.Redis(connection_pool=pool) + + # 鑾峰彇骞跺姞杞� codeMatrix + codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1') + if codeMatrixStr is None: + print("Error: 'KV.AGV_MAP_ASTAR_CODE_FLAG.1' not found in Redis.") + sys.exit(1) + codeMatrix = np.array(json.loads(codeMatrixStr.decode('utf-8')), dtype=str) + + # 鑾峰彇骞跺姞杞� cdaMatrix + cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1') + if cdaMatrixStr is None: + print("Error: 'KV.AGV_MAP_ASTAR_CDA_FLAG.1' not found in Redis.") + sys.exit(1) + cdaMatrix = np.array(json.loads(cdaMatrixStr.decode('utf-8')), dtype=object) + + # 鑾峰彇骞跺姞杞� dynamicMatrix + dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1') + if dynamicMatrixStr is None: + print("Error: 'KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1' not found in Redis.") + sys.exit(1) + dynamicMatrix = np.array(json.loads(dynamicMatrixStr.decode('utf-8')), dtype=object) + + # 鍒濆鍖� waveMatrix + waveMatrix = initWaveMatrix(codeMatrix) + + # 璋冪敤骞惰澶勭悊鐨勫嚱鏁� + wave_updates = process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen) + + # 搴旂敤鎵�鏈夋洿鏂板埌 waveMatrix + for (x, y), vehicles in wave_updates.items(): + originWave = waveMatrix[x][y] + for vehicle in vehicles: + originWave = mergeWave(originWave, vehicle) + waveMatrix[x][y] = originWave + + # 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛� + waveMatrixList = waveMatrix.tolist() + # 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓� + waveMatrixJsonStr = json.dumps(waveMatrixList) + + # 灏嗙粨鏋滀繚瀛樺洖 Redis + r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1", waveMatrixJsonStr) + + end = time.perf_counter() + # 鎵撳嵃绋嬪簭杩愯鏃堕棿 +# print(f"绋嬪簭杩愯鏃堕棿涓�: {end - startTime} Seconds") + print("1") + except Exception as e: + print(f"An error occurred: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file -- Gitblit v1.9.1