From 9483baffba9a24a2a36fc8739fc65b59317d9142 Mon Sep 17 00:00:00 2001
From: zhang <zc857179121@qq.com>
Date: 星期四, 03 七月 2025 14:18:46 +0800
Subject: [PATCH] 队列拆分

---
 zy-acs-manager/src/main/resources/agv.py |  268 ++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 172 insertions(+), 96 deletions(-)

diff --git a/zy-acs-manager/src/main/resources/agv.py b/zy-acs-manager/src/main/resources/agv.py
index 2425f67..ad188ed 100644
--- a/zy-acs-manager/src/main/resources/agv.py
+++ b/zy-acs-manager/src/main/resources/agv.py
@@ -1,66 +1,100 @@
+# -*- coding: utf-8 -*-
+
 import ast
+import multiprocessing
 import sys
 import numpy as np
 import json
 import time
 import redis
+from collections import deque, defaultdict
 
 radiusLen = None
-codeMatrix = None  # 鍏ㄥ眬鍙橀噺
-cdaMatrix = None   # 鍏ㄥ眬鍙橀噺
-waveMatrix = None  # 鍏ㄥ眬鍙橀噺
 
 # 灏嗗瓧绗︿覆杞崲涓烘诞鐐瑰瀷鏁扮粍
 def convert_to_float_array(str_array):
     if isinstance(str_array, str):
         return np.array(ast.literal_eval(str_array), dtype=float)
-    return str_array
+    elif isinstance(str_array, list) or isinstance(str_array, np.ndarray):
+        return np.array(str_array, dtype=float)
+    return np.array([], dtype=float)
 
-def getWaveScopeByCode(x, y):
-    code = codeMatrix[x, y]
+def getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen):
+    """
+    浣跨敤骞垮害浼樺厛鎼滅储锛圔FS锛夊苟璺熻釜鎵╁睍鏂瑰悜锛屼互閬垮厤閫掑綊娣卞害杩囧ぇ鍜屼笉蹇呰鐨勮祫婧愭氮璐广��
+    褰撻亣鍒� 'NONE' 鑺傜偣鏃讹紝浠呭湪褰撳墠鏂瑰悜涓婄户缁墿灞曘��
+    """
     includeList = []
     existNodes = set()
-    spreadWaveNode({"x": x, "y": y}, {"x": x, "y": y}, existNodes, includeList)
-    return includeList
+    queue = deque()
 
-def spreadWaveNode(originNode, currNode, existNodes, includeList):
-    x, y = currNode['x'], currNode['y']
-    neighbors = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)]
-    for neighbor in neighbors:
-        extendNeighborNodes(originNode, {"x": neighbor[0], "y": neighbor[1]}, existNodes, includeList)
-
-def extendNeighborNodes(originNode, nextNode, existNodes, includeList):
-    x, y = nextNode['x'], nextNode['y']
-    if (x < 0 or x >= codeMatrix.shape[0] or y < 0 or y >= codeMatrix.shape[1]):
-        return
-
-    if (x, y) in existNodes:
-        return
-
+    # 鍒濆鑺傜偣锛屾病鏈夋柟鍚�
+    originNode = {"x": x, "y": y, "dir": None}
+    queue.append(originNode)
     existNodes.add((x, y))
 
-    nextNodeCodeData = codeMatrix[x, y]
+    while queue:
+        node = queue.popleft()
+        node_x, node_y, current_dir = node['x'], node['y'], node['dir']
 
-    if nextNodeCodeData == 'NONE':
-        spreadWaveNode(originNode, nextNode, existNodes, includeList)
-    else:
-        o1Cda = convert_to_float_array(cdaMatrix[originNode['x'], originNode['y']])
-        o2Cda = convert_to_float_array(cdaMatrix[x, y])
+        # 鏍规嵁褰撳墠鏂瑰悜鍐冲畾鎵╁睍鐨勬柟鍚�
+        if current_dir is None:
+            # 濡傛灉娌℃湁鏂瑰悜锛屽悜鍥涗釜鏂瑰悜鎵╁睍
+            neighbors = [
+                (node_x + 1, node_y, 'right'),
+                (node_x - 1, node_y, 'left'),
+                (node_x, node_y + 1, 'down'),
+                (node_x, node_y - 1, 'up')
+            ]
+        else:
+            # 濡傛灉鏈夋柟鍚戯紝浠呭湪璇ユ柟鍚戜笂鎵╁睍
+            if current_dir == 'right':
+                neighbors = [(node_x + 1, node_y, 'right')]
+            elif current_dir == 'left':
+                neighbors = [(node_x - 1, node_y, 'left')]
+            elif current_dir == 'down':
+                neighbors = [(node_x, node_y + 1, 'down')]
+            elif current_dir == 'up':
+                neighbors = [(node_x, node_y - 1, 'up')]
+            else:
+                neighbors = []
 
-        num1 = (o1Cda[0] - o2Cda[0]) ** 2
-        num2 = (o1Cda[1] - o2Cda[1]) ** 2
-        if num1 + num2 <= radiusLen ** 2:
-            includeList.append({"x": int(x), "y": int(y), "code": str(codeMatrix[x, y])})
-            spreadWaveNode(originNode, nextNode, existNodes, includeList)
+        for nx, ny, direction in neighbors:
+            # 妫�鏌ヨ竟鐣屾潯浠�
+            if (nx < 0 or nx >= codeMatrix.shape[0] or ny < 0 or ny >= codeMatrix.shape[1]):
+                continue
+            if (nx, ny) in existNodes:
+                continue
 
-# 鎵惧埌鏌愪釜鍊煎搴旂殑 x, y 涓嬫爣
-def find_value_in_matrix(value):
+            existNodes.add((nx, ny))
+            neighbor_code = codeMatrix[nx, ny]
+
+            if neighbor_code == 'NONE':
+                # 閬囧埌 'NONE' 鑺傜偣锛岀户缁湪褰撳墠鏂瑰悜涓婃墿灞�
+                queue.append({"x": nx, "y": ny, "dir": direction})
+            else:
+                # 妫�鏌ヨ窛绂绘潯浠�
+                o1Cda = convert_to_float_array(cdaMatrix[x, y])
+                o2Cda = convert_to_float_array(cdaMatrix[nx, ny])
+
+                num1 = (o1Cda[0] - o2Cda[0]) ** 2
+                num2 = (o1Cda[1] - o2Cda[1]) ** 2
+                if num1 + num2 <= radiusLen ** 2:
+                    includeList.append({
+                        "x": int(nx),
+                        "y": int(ny),
+                        "code": str(codeMatrix[nx, ny])
+                    })
+                    # 闈� 'NONE' 鑺傜偣锛岄噸缃柟鍚�
+                    queue.append({"x": nx, "y": ny, "dir": None})
+
+    return includeList
+
+def find_value_in_matrix(value, codeMatrix):
     indices = np.where(codeMatrix == value)
     return list(zip(indices[0], indices[1]))
 
-def initWaveMatrix():
-    global codeMatrix, waveMatrix  # 澹版槑浣跨敤鍏ㄥ眬鍙橀噺
-    lev = 1
+def initWaveMatrix(codeMatrix):
     waveMatrix = np.empty_like(codeMatrix, dtype=object)
 
     for x in range(codeMatrix.shape[0]):
@@ -87,15 +121,25 @@
 # 灏� dynamicMatrix 杞崲涓� numpy 缁撴瀯鍖栨暟缁�
 def convert_to_structured_array(dynamicMatrix):
     # 瀹氫箟缁撴瀯鍖栨暟缁勭殑 dtype
-    dtype = [('serial', int), ('vehicle', 'U2')]
+    dtype = [('serial', int), ('vehicle', 'U2'), ('time', int)]
+
+    # 纭繚姣忎釜瀛楀吀鍖呭惈鎵�鏈夊瓧娈�
+    structured_list = []
+    for row in dynamicMatrix:
+        for d in row:
+            # 鎻愬彇瀛楁锛岀‘淇� 'time' 瀛樺湪锛屽惁鍒欒缃负榛樿鍊硷紙渚嬪 0锛�
+            serial = d.get('serial', 0)
+            vehicle = d.get('vehicle', '0')
+            time_val = d.get('time', 0)
+            structured_list.append((serial, vehicle, time_val))
+
     # 灏嗗祵濂楃殑鍒楄〃杞崲涓虹粨鏋勫寲鏁扮粍
-    structured_array = np.array([tuple(d.values()) for row in dynamicMatrix for d in row], dtype=dtype)
+    structured_array = np.array(structured_list, dtype=dtype)
     # 閲嶅涓哄師濮嬬殑浜岀淮褰㈢姸
     return structured_array.reshape(len(dynamicMatrix), -1)
 
 # 浣跨敤 numpy 鍔犻�熺殑浠g爜
-def process_dynamic_matrix(dynamicMatrix, codeMatrix):
-    global waveMatrix  # 澹版槑浣跨敤鍏ㄥ眬鍙橀噺
+def process_dynamic_matrix(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen, waveMatrix):
     # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍
     dynamicMatrix = convert_to_structured_array(dynamicMatrix)
 
@@ -110,25 +154,68 @@
 
     # 閬嶅巻婊¤冻鏉′欢鐨勫潗鏍�
     for x, y in zip(x_indices, y_indices):
-        # print(code)
         vehicle = dynamicMatrix[x][y]['vehicle']
-        includeList = getWaveScopeByCode(x, y)
+        includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen)
         for include in includeList:
             originWave = waveMatrix[include['x']][include['y']]
             waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle)
 
+    return waveMatrix
+
+def process_chunk(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen):
+    """澶勭悊鏁版嵁鍧楃殑鍑芥暟锛岃繑鍥為渶瑕佸悎骞剁殑 (x, y, vehicle) 鍒楄〃"""
+    local_wave_updates = []
+    for data in chunk:
+        x, y = data  # 鍋囪姣忎釜鏁版嵁椤瑰寘鍚玿鍜寉鍧愭爣
+        vehicle = dynamicMatrix[x][y]['vehicle']
+        includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen)
+        for include in includeList:
+            local_wave_updates.append((include['x'], include['y'], vehicle))
+    return local_wave_updates
+
+def process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen):
+    # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍
+    dynamicMatrix = convert_to_structured_array(dynamicMatrix)
+
+    # 鍒涘缓甯冨皵鎺╃爜
+    mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1')
+    x_indices, y_indices = np.where(mask)
+
+    # 灏嗘弧瓒虫潯浠剁殑鍧愭爣缁勫悎鎴愪换鍔★紝姣忎釜浠诲姟鍖呭惈涓�涓暟鎹潡
+    tasks = list(zip(x_indices, y_indices))
+#     num_processes = multiprocessing.cpu_count()
+    num_processes = 5
+    chunk_size = max(1, len(tasks) // num_processes)
+    chunks = [tasks[i:i + chunk_size] for i in range(0, len(tasks), chunk_size)]
+
+    all_results = []  # 瀛樺偍鎵�鏈夎繘绋嬬殑缁撴灉
+
+    # 璁剧疆杩涚▼姹�
+    with multiprocessing.Pool(processes=num_processes) as pool:
+        # 浣跨敤map鏂规硶骞惰澶勭悊浠诲姟
+        results = pool.starmap(process_chunk, [(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen) for chunk in chunks])
+        for result in results:
+            all_results.extend(result)
+
+    # 浣跨敤 defaultdict 鏉ユ敹闆嗘瘡涓� (x, y) 瀵瑰簲鐨勬墍鏈� vehicle
+    wave_updates = defaultdict(set)
+    for x, y, vehicle in all_results:
+        wave_updates[(x, y)].add(vehicle)
+
+    return wave_updates
+
 def main():
-    global radiusLen, codeMatrix, cdaMatrix, waveMatrix  # 澹版槑浣跨敤鍏ㄥ眬鍙橀噺
+    global radiusLen, codeMatrix, cdaMatrix, waveMatrix  # 澹版槑涓哄叏灞�鍙橀噺
 
     if len(sys.argv) != 6:
-        print("鐢ㄦ硶: python agv.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>")
+        print("Usage: python script.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>")
         sys.exit(1)
 
     radiusLenStr = sys.argv[1]
     try:
         radiusLen = float(radiusLenStr)
     except ValueError:
-        print("radiusLen 蹇呴』鏄竴涓诞鐐规暟")
+        print("Error: radiusLen must be a float.")
         sys.exit(1)
 
     redisHost = sys.argv[2]
@@ -138,71 +225,60 @@
 
     startTime = time.perf_counter()
 
-    # 鍒涘缓涓�涓繛鎺ユ睜
     try:
+        # 鍒涘缓涓�涓繛鎺ユ睜
         pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx))
-        r = redis.Redis(connection_pool=pool, decode_responses=True)
-    except Exception as e:
-        print(f"鏃犳硶杩炴帴鍒� Redis: {e}")
-        sys.exit(1)
+        r = redis.Redis(connection_pool=pool)
 
-    try:
+        # 鑾峰彇骞跺姞杞� codeMatrix
         codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1')
         if codeMatrixStr is None:
-            raise ValueError("Redis 涓湭鎵惧埌閿�: KV.AGV_MAP_ASTAR_CODE_FLAG.1")
-        codeMatrix = np.array(json.loads(codeMatrixStr))
-    except Exception as e:
-        print(f"鑾峰彇 codeMatrix 澶辫触: {e}")
-        sys.exit(1)
+            print("Error: 'KV.AGV_MAP_ASTAR_CODE_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        codeMatrix = np.array(json.loads(codeMatrixStr.decode('utf-8')), dtype=str)
 
-    try:
+        # 鑾峰彇骞跺姞杞� cdaMatrix
         cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1')
         if cdaMatrixStr is None:
-            raise ValueError("Redis 涓湭鎵惧埌閿�: KV.AGV_MAP_ASTAR_CDA_FLAG.1")
-        cdaMatrix = np.array(json.loads(cdaMatrixStr))
-    except Exception as e:
-        print(f"鑾峰彇 cdaMatrix 澶辫触: {e}")
-        sys.exit(1)
+            print("Error: 'KV.AGV_MAP_ASTAR_CDA_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        cdaMatrix = np.array(json.loads(cdaMatrixStr.decode('utf-8')), dtype=object)
 
-    try:
+        # 鑾峰彇骞跺姞杞� dynamicMatrix
         dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1')
         if dynamicMatrixStr is None:
-            raise ValueError("Redis 涓湭鎵惧埌閿�: KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1")
-        dynamicMatrix = np.array(json.loads(dynamicMatrixStr))
-    except Exception as e:
-        print(f"鑾峰彇 dynamicMatrix 澶辫触: {e}")
-        sys.exit(1)
+            print("Error: 'KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        dynamicMatrix = np.array(json.loads(dynamicMatrixStr.decode('utf-8')), dtype=object)
 
-    # 鍒濆鍖� waveMatrix
-    try:
-        waveMatrix = initWaveMatrix()
-    except Exception as e:
-        print(f"鍒濆鍖� waveMatrix 澶辫触: {e}")
-        sys.exit(1)
+        # 鍒濆鍖� waveMatrix
+        waveMatrix = initWaveMatrix(codeMatrix)
 
-    # 澶勭悊 dynamicMatrix
-    try:
-        process_dynamic_matrix(dynamicMatrix, codeMatrix)
-    except Exception as e:
-        print(f"澶勭悊 dynamicMatrix 澶辫触: {e}")
-        sys.exit(1)
+        # 璋冪敤骞惰澶勭悊鐨勫嚱鏁�
+        wave_updates = process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen)
 
-    # 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛�
-    waveMatrixList = waveMatrix.tolist()
-    # 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓�
-    waveMatrixJsonStr = json.dumps(waveMatrixList)
+        # 搴旂敤鎵�鏈夋洿鏂板埌 waveMatrix
+        for (x, y), vehicles in wave_updates.items():
+            originWave = waveMatrix[x][y]
+            for vehicle in vehicles:
+                originWave = mergeWave(originWave, vehicle)
+            waveMatrix[x][y] = originWave
 
-    try:
+        # 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛�
+        waveMatrixList = waveMatrix.tolist()
+        # 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓�
+        waveMatrixJsonStr = json.dumps(waveMatrixList)
+
+        # 灏嗙粨鏋滀繚瀛樺洖 Redis
         r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1", waveMatrixJsonStr)
-    except Exception as e:
-        print(f"灏� waveMatrix 鍐欏叆 Redis 澶辫触: {e}")
-        sys.exit(1)
 
-    end = time.perf_counter()
-    # print('绋嬪簭杩愯鏃堕棿涓�: %s Seconds' % (end - startTime))
-    print("1")
-    # 濡傛灉闇�瑕侊紝鍙互鎵撳嵃杩愯鏃堕棿
-    # print(f"绋嬪簭杩愯鏃堕棿涓�: {end - startTime:.2f} 绉�")
+        end = time.perf_counter()
+        # 鎵撳嵃绋嬪簭杩愯鏃堕棿
+#         print(f"绋嬪簭杩愯鏃堕棿涓�: {end - startTime} Seconds")
+        print("1")
+    except Exception as e:
+        print(f"An error occurred: {e}")
+        sys.exit(1)
 
 if __name__ == "__main__":
-    main()
+    main()
\ No newline at end of file

--
Gitblit v1.9.1