From 9483baffba9a24a2a36fc8739fc65b59317d9142 Mon Sep 17 00:00:00 2001
From: zhang <zc857179121@qq.com>
Date: 星期四, 03 七月 2025 14:18:46 +0800
Subject: [PATCH] 队列拆分

---
 zy-acs-manager/src/main/resources/agv.py |  280 ++++++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 202 insertions(+), 78 deletions(-)

diff --git a/zy-acs-manager/src/main/resources/agv.py b/zy-acs-manager/src/main/resources/agv.py
index a366634..ad188ed 100644
--- a/zy-acs-manager/src/main/resources/agv.py
+++ b/zy-acs-manager/src/main/resources/agv.py
@@ -1,64 +1,100 @@
-import ast
-import sys
+# -*- coding: utf-8 -*-
 
+import ast
+import multiprocessing
+import sys
 import numpy as np
 import json
 import time
 import redis
+from collections import deque, defaultdict
 
 radiusLen = None
-
 
 # 灏嗗瓧绗︿覆杞崲涓烘诞鐐瑰瀷鏁扮粍
 def convert_to_float_array(str_array):
     if isinstance(str_array, str):
         return np.array(ast.literal_eval(str_array), dtype=float)
-    return str_array
+    elif isinstance(str_array, list) or isinstance(str_array, np.ndarray):
+        return np.array(str_array, dtype=float)
+    return np.array([], dtype=float)
 
-def getWaveScopeByCode(x, y):
-    code = codeMatrix[x, y]
+def getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen):
+    """
+    浣跨敤骞垮害浼樺厛鎼滅储锛圔FS锛夊苟璺熻釜鎵╁睍鏂瑰悜锛屼互閬垮厤閫掑綊娣卞害杩囧ぇ鍜屼笉蹇呰鐨勮祫婧愭氮璐广��
+    褰撻亣鍒� 'NONE' 鑺傜偣鏃讹紝浠呭湪褰撳墠鏂瑰悜涓婄户缁墿灞曘��
+    """
     includeList = []
     existNodes = set()
-    spreadWaveNode({"x": x, "y": y}, {"x": x, "y": y}, existNodes, includeList)
-    return includeList
+    queue = deque()
 
-def spreadWaveNode(originNode, currNode, existNodes, includeList):
-    x, y = currNode['x'], currNode['y']
-    neighbors = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)]
-    for neighbor in neighbors:
-        extendNeighborNodes(originNode, {"x": neighbor[0], "y": neighbor[1]}, existNodes, includeList)
-
-def extendNeighborNodes(originNode, nextNode, existNodes, includeList):
-    x, y = nextNode['x'], nextNode['y']
-    if (x < 0 or x >= codeMatrix.shape[0] or y < 0 or y >= codeMatrix.shape[1]):
-        return
-
-    if (x, y) in existNodes:
-        return
-
+    # 鍒濆鑺傜偣锛屾病鏈夋柟鍚�
+    originNode = {"x": x, "y": y, "dir": None}
+    queue.append(originNode)
     existNodes.add((x, y))
 
-    nextNodeCodeData = codeMatrix[x, y]
+    while queue:
+        node = queue.popleft()
+        node_x, node_y, current_dir = node['x'], node['y'], node['dir']
 
-    if nextNodeCodeData == 'NONE':
-        spreadWaveNode(originNode, nextNode, existNodes, includeList)
-    else:
-        o1Cda = convert_to_float_array(cdaMatrix[originNode['x'], originNode['y']])
-        o2Cda = convert_to_float_array(cdaMatrix[x, y])
+        # 鏍规嵁褰撳墠鏂瑰悜鍐冲畾鎵╁睍鐨勬柟鍚�
+        if current_dir is None:
+            # 濡傛灉娌℃湁鏂瑰悜锛屽悜鍥涗釜鏂瑰悜鎵╁睍
+            neighbors = [
+                (node_x + 1, node_y, 'right'),
+                (node_x - 1, node_y, 'left'),
+                (node_x, node_y + 1, 'down'),
+                (node_x, node_y - 1, 'up')
+            ]
+        else:
+            # 濡傛灉鏈夋柟鍚戯紝浠呭湪璇ユ柟鍚戜笂鎵╁睍
+            if current_dir == 'right':
+                neighbors = [(node_x + 1, node_y, 'right')]
+            elif current_dir == 'left':
+                neighbors = [(node_x - 1, node_y, 'left')]
+            elif current_dir == 'down':
+                neighbors = [(node_x, node_y + 1, 'down')]
+            elif current_dir == 'up':
+                neighbors = [(node_x, node_y - 1, 'up')]
+            else:
+                neighbors = []
 
-        num1 = (o1Cda[0] - o2Cda[0]) ** 2
-        num2 = (o1Cda[1] - o2Cda[1]) ** 2
-        if num1 + num2 <= radiusLen ** 2:
-            includeList.append({"x": int(x), "y": int(y), "code": str(codeMatrix[x, y])})
-            spreadWaveNode(originNode, nextNode, existNodes, includeList)
+        for nx, ny, direction in neighbors:
+            # 妫�鏌ヨ竟鐣屾潯浠�
+            if (nx < 0 or nx >= codeMatrix.shape[0] or ny < 0 or ny >= codeMatrix.shape[1]):
+                continue
+            if (nx, ny) in existNodes:
+                continue
 
-# 鎵惧埌鏌愪釜鍊煎搴旂殑 x, y 涓嬫爣
-def find_value_in_matrix(value):
+            existNodes.add((nx, ny))
+            neighbor_code = codeMatrix[nx, ny]
+
+            if neighbor_code == 'NONE':
+                # 閬囧埌 'NONE' 鑺傜偣锛岀户缁湪褰撳墠鏂瑰悜涓婃墿灞�
+                queue.append({"x": nx, "y": ny, "dir": direction})
+            else:
+                # 妫�鏌ヨ窛绂绘潯浠�
+                o1Cda = convert_to_float_array(cdaMatrix[x, y])
+                o2Cda = convert_to_float_array(cdaMatrix[nx, ny])
+
+                num1 = (o1Cda[0] - o2Cda[0]) ** 2
+                num2 = (o1Cda[1] - o2Cda[1]) ** 2
+                if num1 + num2 <= radiusLen ** 2:
+                    includeList.append({
+                        "x": int(nx),
+                        "y": int(ny),
+                        "code": str(codeMatrix[nx, ny])
+                    })
+                    # 闈� 'NONE' 鑺傜偣锛岄噸缃柟鍚�
+                    queue.append({"x": nx, "y": ny, "dir": None})
+
+    return includeList
+
+def find_value_in_matrix(value, codeMatrix):
     indices = np.where(codeMatrix == value)
     return list(zip(indices[0], indices[1]))
 
-def initWaveMatrix():
-    lev = 1
+def initWaveMatrix(codeMatrix):
     waveMatrix = np.empty_like(codeMatrix, dtype=object)
 
     for x in range(codeMatrix.shape[0]):
@@ -73,7 +109,10 @@
 # 浼樺寲鐗堟湰锛氫娇鐢ㄩ泦鍚堟潵鎻愰珮鎬ц兘
 def mergeWave(originWave, vehicle):
     # 灏嗗瓧绗︿覆瑙f瀽涓洪泦鍚�
-    set_data = set(ast.literal_eval(originWave))
+    try:
+        set_data = set(ast.literal_eval(originWave))
+    except (ValueError, SyntaxError):
+        set_data = set()
     # 濡傛灉 vehicle 涓嶅湪闆嗗悎涓紝鍒欐坊鍔�
     set_data.add(vehicle)
     # 杩斿洖搴忓垪鍖栧悗鐨勫瓧绗︿覆
@@ -82,14 +121,25 @@
 # 灏� dynamicMatrix 杞崲涓� numpy 缁撴瀯鍖栨暟缁�
 def convert_to_structured_array(dynamicMatrix):
     # 瀹氫箟缁撴瀯鍖栨暟缁勭殑 dtype
-    dtype = [('serial', int), ('vehicle', 'U2')]
+    dtype = [('serial', int), ('vehicle', 'U2'), ('time', int)]
+
+    # 纭繚姣忎釜瀛楀吀鍖呭惈鎵�鏈夊瓧娈�
+    structured_list = []
+    for row in dynamicMatrix:
+        for d in row:
+            # 鎻愬彇瀛楁锛岀‘淇� 'time' 瀛樺湪锛屽惁鍒欒缃负榛樿鍊硷紙渚嬪 0锛�
+            serial = d.get('serial', 0)
+            vehicle = d.get('vehicle', '0')
+            time_val = d.get('time', 0)
+            structured_list.append((serial, vehicle, time_val))
+
     # 灏嗗祵濂楃殑鍒楄〃杞崲涓虹粨鏋勫寲鏁扮粍
-    structured_array = np.array([tuple(d.values()) for row in dynamicMatrix for d in row], dtype=dtype)
+    structured_array = np.array(structured_list, dtype=dtype)
     # 閲嶅涓哄師濮嬬殑浜岀淮褰㈢姸
     return structured_array.reshape(len(dynamicMatrix), -1)
 
 # 浣跨敤 numpy 鍔犻�熺殑浠g爜
-def process_dynamic_matrix(dynamicMatrix, codeMatrix):
+def process_dynamic_matrix(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen, waveMatrix):
     # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍
     dynamicMatrix = convert_to_structured_array(dynamicMatrix)
 
@@ -104,57 +154,131 @@
 
     # 閬嶅巻婊¤冻鏉′欢鐨勫潗鏍�
     for x, y in zip(x_indices, y_indices):
-        # print(code)
-        data = dynamicMatrix[x][y]
-        vehicle = data['vehicle']
-        includeList = getWaveScopeByCode(x,y)
+        vehicle = dynamicMatrix[x][y]['vehicle']
+        includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen)
         for include in includeList:
             originWave = waveMatrix[include['x']][include['y']]
             waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle)
 
-radiusLenStr = sys.argv[1]
-radiusLen = float(radiusLenStr)
+    return waveMatrix
 
-redisHost = sys.argv[2]
-redisPwd = sys.argv[3]
-redisPort = sys.argv[4]
-redisIdx = sys.argv[5]
+def process_chunk(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen):
+    """澶勭悊鏁版嵁鍧楃殑鍑芥暟锛岃繑鍥為渶瑕佸悎骞剁殑 (x, y, vehicle) 鍒楄〃"""
+    local_wave_updates = []
+    for data in chunk:
+        x, y = data  # 鍋囪姣忎釜鏁版嵁椤瑰寘鍚玿鍜寉鍧愭爣
+        vehicle = dynamicMatrix[x][y]['vehicle']
+        includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen)
+        for include in includeList:
+            local_wave_updates.append((include['x'], include['y'], vehicle))
+    return local_wave_updates
 
-startTime = time.perf_counter()
+def process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen):
+    # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍
+    dynamicMatrix = convert_to_structured_array(dynamicMatrix)
 
-# 鍒涘缓涓�涓繛鎺ユ睜
-pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx))
-r = redis.Redis(connection_pool=pool)
+    # 鍒涘缓甯冨皵鎺╃爜
+    mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1')
+    x_indices, y_indices = np.where(mask)
 
-codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1')
-codeMatrix = np.array(json.loads(codeMatrixStr))
+    # 灏嗘弧瓒虫潯浠剁殑鍧愭爣缁勫悎鎴愪换鍔★紝姣忎釜浠诲姟鍖呭惈涓�涓暟鎹潡
+    tasks = list(zip(x_indices, y_indices))
+#     num_processes = multiprocessing.cpu_count()
+    num_processes = 5
+    chunk_size = max(1, len(tasks) // num_processes)
+    chunks = [tasks[i:i + chunk_size] for i in range(0, len(tasks), chunk_size)]
 
-cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1')
-cdaMatrix = np.array(json.loads(cdaMatrixStr))
+    all_results = []  # 瀛樺偍鎵�鏈夎繘绋嬬殑缁撴灉
 
-dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1')
-dynamicMatrix = np.array(json.loads(dynamicMatrixStr))
+    # 璁剧疆杩涚▼姹�
+    with multiprocessing.Pool(processes=num_processes) as pool:
+        # 浣跨敤map鏂规硶骞惰澶勭悊浠诲姟
+        results = pool.starmap(process_chunk, [(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen) for chunk in chunks])
+        for result in results:
+            all_results.extend(result)
 
-waveMatrix = initWaveMatrix()
+    # 浣跨敤 defaultdict 鏉ユ敹闆嗘瘡涓� (x, y) 瀵瑰簲鐨勬墍鏈� vehicle
+    wave_updates = defaultdict(set)
+    for x, y, vehicle in all_results:
+        wave_updates[(x, y)].add(vehicle)
 
-# # 浣跨敤 numpy 鍔犻�熺殑浠g爜
-process_dynamic_matrix(dynamicMatrix, codeMatrix)
+    return wave_updates
 
-# for x in range(dynamicMatrix.shape[0]):
-#     for y in range(dynamicMatrix.shape[1]):
-#         data = dynamicMatrix[x, y]
-#         vehicle = data['vehicle']
-#         if vehicle != '0' and vehicle != '-1':
-#             getWaveScopeByCode(x, y)
+def main():
+    global radiusLen, codeMatrix, cdaMatrix, waveMatrix  # 澹版槑涓哄叏灞�鍙橀噺
 
-# 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛�
-waveMatrixList = waveMatrix.tolist()
-# 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓�
-waveMatrixJsonStr = json.dumps(waveMatrixList)
+    if len(sys.argv) != 6:
+        print("Usage: python script.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>")
+        sys.exit(1)
 
-r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1",waveMatrixJsonStr)
+    radiusLenStr = sys.argv[1]
+    try:
+        radiusLen = float(radiusLenStr)
+    except ValueError:
+        print("Error: radiusLen must be a float.")
+        sys.exit(1)
 
-end = time.perf_counter()
-# print('绋嬪簭杩愯鏃堕棿涓�: %s Seconds' % (end - startTime))
-print("1")
+    redisHost = sys.argv[2]
+    redisPwd = sys.argv[3]
+    redisPort = sys.argv[4]
+    redisIdx = sys.argv[5]
 
+    startTime = time.perf_counter()
+
+    try:
+        # 鍒涘缓涓�涓繛鎺ユ睜
+        pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx))
+        r = redis.Redis(connection_pool=pool)
+
+        # 鑾峰彇骞跺姞杞� codeMatrix
+        codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1')
+        if codeMatrixStr is None:
+            print("Error: 'KV.AGV_MAP_ASTAR_CODE_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        codeMatrix = np.array(json.loads(codeMatrixStr.decode('utf-8')), dtype=str)
+
+        # 鑾峰彇骞跺姞杞� cdaMatrix
+        cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1')
+        if cdaMatrixStr is None:
+            print("Error: 'KV.AGV_MAP_ASTAR_CDA_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        cdaMatrix = np.array(json.loads(cdaMatrixStr.decode('utf-8')), dtype=object)
+
+        # 鑾峰彇骞跺姞杞� dynamicMatrix
+        dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1')
+        if dynamicMatrixStr is None:
+            print("Error: 'KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        dynamicMatrix = np.array(json.loads(dynamicMatrixStr.decode('utf-8')), dtype=object)
+
+        # 鍒濆鍖� waveMatrix
+        waveMatrix = initWaveMatrix(codeMatrix)
+
+        # 璋冪敤骞惰澶勭悊鐨勫嚱鏁�
+        wave_updates = process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen)
+
+        # 搴旂敤鎵�鏈夋洿鏂板埌 waveMatrix
+        for (x, y), vehicles in wave_updates.items():
+            originWave = waveMatrix[x][y]
+            for vehicle in vehicles:
+                originWave = mergeWave(originWave, vehicle)
+            waveMatrix[x][y] = originWave
+
+        # 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛�
+        waveMatrixList = waveMatrix.tolist()
+        # 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓�
+        waveMatrixJsonStr = json.dumps(waveMatrixList)
+
+        # 灏嗙粨鏋滀繚瀛樺洖 Redis
+        r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1", waveMatrixJsonStr)
+
+        end = time.perf_counter()
+        # 鎵撳嵃绋嬪簭杩愯鏃堕棿
+#         print(f"绋嬪簭杩愯鏃堕棿涓�: {end - startTime} Seconds")
+        print("1")
+    except Exception as e:
+        print(f"An error occurred: {e}")
+        sys.exit(1)
+
+if __name__ == "__main__":
+    main()
\ No newline at end of file

--
Gitblit v1.9.1