From 418c07bf5bb7e27d124cac00cf867039fad9060e Mon Sep 17 00:00:00 2001
From: luxiaotao1123 <t1341870251@163.com>
Date: 星期六, 21 十二月 2024 11:25:24 +0800
Subject: [PATCH] #

---
 zy-acs-manager/src/main/resources/agv.py |  174 +++++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 117 insertions(+), 57 deletions(-)

diff --git a/zy-acs-manager/src/main/resources/agv.py b/zy-acs-manager/src/main/resources/agv.py
index 245bb6e..23d4dd6 100644
--- a/zy-acs-manager/src/main/resources/agv.py
+++ b/zy-acs-manager/src/main/resources/agv.py
@@ -1,10 +1,11 @@
 import ast
+import multiprocessing
 import sys
 import numpy as np
 import json
 import time
 import redis
-from collections import deque
+from collections import deque, defaultdict
 
 radiusLen = None
 
@@ -12,9 +13,11 @@
 def convert_to_float_array(str_array):
     if isinstance(str_array, str):
         return np.array(ast.literal_eval(str_array), dtype=float)
-    return str_array
+    elif isinstance(str_array, list) or isinstance(str_array, np.ndarray):
+        return np.array(str_array, dtype=float)
+    return np.array([], dtype=float)
 
-def getWaveScopeByCode_iterative(x, y):
+def getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen):
     """
     浣跨敤骞垮害浼樺厛鎼滅储锛圔FS锛夋潵浠f浛閫掑綊锛屼互閬垮厤閫掑綊娣卞害杩囧ぇ鐨勯棶棰樸��
     """
@@ -66,11 +69,11 @@
 
     return includeList
 
-def find_value_in_matrix(value):
+def find_value_in_matrix(value, codeMatrix):
     indices = np.where(codeMatrix == value)
     return list(zip(indices[0], indices[1]))
 
-def initWaveMatrix():
+def initWaveMatrix(codeMatrix):
     waveMatrix = np.empty_like(codeMatrix, dtype=object)
 
     for x in range(codeMatrix.shape[0]):
@@ -102,12 +105,12 @@
     # 纭繚姣忎釜瀛楀吀鍖呭惈鎵�鏈夊瓧娈�
     structured_list = []
     for row in dynamicMatrix:
-       for d in row:
-           # 鎻愬彇瀛楁锛岀‘淇� 'time' 瀛樺湪锛屽惁鍒欒缃负榛樿鍊硷紙渚嬪 0.0锛�
-           serial = d.get('serial', 0)
-           vehicle = d.get('vehicle', '0')
-           time_val = d.get('time', 0)
-           structured_list.append((serial, vehicle, time_val))
+        for d in row:
+            # 鎻愬彇瀛楁锛岀‘淇� 'time' 瀛樺湪锛屽惁鍒欒缃负榛樿鍊硷紙渚嬪 0锛�
+            serial = d.get('serial', 0)
+            vehicle = d.get('vehicle', '0')
+            time_val = d.get('time', 0)
+            structured_list.append((serial, vehicle, time_val))
 
     # 灏嗗祵濂楃殑鍒楄〃杞崲涓虹粨鏋勫寲鏁扮粍
     structured_array = np.array(structured_list, dtype=dtype)
@@ -115,9 +118,7 @@
     return structured_array.reshape(len(dynamicMatrix), -1)
 
 # 浣跨敤 numpy 鍔犻�熺殑浠g爜
-def process_dynamic_matrix(dynamicMatrix, codeMatrix):
-    global waveMatrix  # 纭繚 waveMatrix 鏄叏灞�鍙橀噺
-
+def process_dynamic_matrix(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen, waveMatrix):
     # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍
     dynamicMatrix = convert_to_structured_array(dynamicMatrix)
 
@@ -133,10 +134,54 @@
     # 閬嶅巻婊¤冻鏉′欢鐨勫潗鏍�
     for x, y in zip(x_indices, y_indices):
         vehicle = dynamicMatrix[x][y]['vehicle']
-        includeList = getWaveScopeByCode_iterative(x, y)
+        includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen)
         for include in includeList:
             originWave = waveMatrix[include['x']][include['y']]
             waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle)
+
+    return waveMatrix
+
+def process_chunk(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen):
+    """澶勭悊鏁版嵁鍧楃殑鍑芥暟锛岃繑鍥為渶瑕佸悎骞剁殑 (x, y, vehicle) 鍒楄〃"""
+    local_wave_updates = []
+    for data in chunk:
+        x, y = data  # 鍋囪姣忎釜鏁版嵁椤瑰寘鍚玿鍜寉鍧愭爣
+        vehicle = dynamicMatrix[x][y]['vehicle']
+        includeList = getWaveScopeByCode_iterative(x, y, codeMatrix, cdaMatrix, radiusLen)
+        for include in includeList:
+            local_wave_updates.append((include['x'], include['y'], vehicle))
+    return local_wave_updates
+
+def process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen):
+    # 灏� dynamicMatrix 杞崲涓虹粨鏋勫寲鏁扮粍
+    dynamicMatrix = convert_to_structured_array(dynamicMatrix)
+
+    # 鍒涘缓甯冨皵鎺╃爜
+    mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1')
+    x_indices, y_indices = np.where(mask)
+
+    # 灏嗘弧瓒虫潯浠剁殑鍧愭爣缁勫悎鎴愪换鍔★紝姣忎釜浠诲姟鍖呭惈涓�涓暟鎹潡
+    tasks = list(zip(x_indices, y_indices))
+#     num_processes = multiprocessing.cpu_count()
+    num_processes = 5
+    chunk_size = max(1, len(tasks) // num_processes)
+    chunks = [tasks[i:i + chunk_size] for i in range(0, len(tasks), chunk_size)]
+
+    all_results = []  # 瀛樺偍鎵�鏈夎繘绋嬬殑缁撴灉
+
+    # 璁剧疆杩涚▼姹�
+    with multiprocessing.Pool(processes=num_processes) as pool:
+        # 浣跨敤map鏂规硶骞惰澶勭悊浠诲姟
+        results = pool.starmap(process_chunk, [(chunk, dynamicMatrix, codeMatrix, cdaMatrix, radiusLen) for chunk in chunks])
+        for result in results:
+            all_results.extend(result)
+
+    # 浣跨敤 defaultdict 鏉ユ敹闆嗘瘡涓� (x, y) 瀵瑰簲鐨勬墍鏈� vehicle
+    wave_updates = defaultdict(set)
+    for x, y, vehicle in all_results:
+        wave_updates[(x, y)].add(vehicle)
+
+    return wave_updates
 
 def main():
     global radiusLen, codeMatrix, cdaMatrix, waveMatrix  # 澹版槑涓哄叏灞�鍙橀噺
@@ -146,7 +191,11 @@
         sys.exit(1)
 
     radiusLenStr = sys.argv[1]
-    radiusLen = float(radiusLenStr)
+    try:
+        radiusLen = float(radiusLenStr)
+    except ValueError:
+        print("Error: radiusLen must be a float.")
+        sys.exit(1)
 
     redisHost = sys.argv[2]
     redisPwd = sys.argv[3]
@@ -155,49 +204,60 @@
 
     startTime = time.perf_counter()
 
-    # 鍒涘缓涓�涓繛鎺ユ睜
-    pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx))
-    r = redis.Redis(connection_pool=pool)
+    try:
+        # 鍒涘缓涓�涓繛鎺ユ睜
+        pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx))
+        r = redis.Redis(connection_pool=pool)
 
-    # 鑾峰彇骞跺姞杞� codeMatrix
-    codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1')
-    if codeMatrixStr is None:
-        print("Error: 'KV.AGV_MAP_ASTAR_CODE_FLAG.1' not found in Redis.")
+        # 鑾峰彇骞跺姞杞� codeMatrix
+        codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1')
+        if codeMatrixStr is None:
+            print("Error: 'KV.AGV_MAP_ASTAR_CODE_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        codeMatrix = np.array(json.loads(codeMatrixStr.decode('utf-8')), dtype=str)
+
+        # 鑾峰彇骞跺姞杞� cdaMatrix
+        cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1')
+        if cdaMatrixStr is None:
+            print("Error: 'KV.AGV_MAP_ASTAR_CDA_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        cdaMatrix = np.array(json.loads(cdaMatrixStr.decode('utf-8')), dtype=object)
+
+        # 鑾峰彇骞跺姞杞� dynamicMatrix
+        dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1')
+        if dynamicMatrixStr is None:
+            print("Error: 'KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1' not found in Redis.")
+            sys.exit(1)
+        dynamicMatrix = np.array(json.loads(dynamicMatrixStr.decode('utf-8')), dtype=object)
+
+        # 鍒濆鍖� waveMatrix
+        waveMatrix = initWaveMatrix(codeMatrix)
+
+        # 璋冪敤骞惰澶勭悊鐨勫嚱鏁�
+        wave_updates = process_dynamic_matrix_parallel(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen)
+
+        # 搴旂敤鎵�鏈夋洿鏂板埌 waveMatrix
+        for (x, y), vehicles in wave_updates.items():
+            originWave = waveMatrix[x][y]
+            for vehicle in vehicles:
+                originWave = mergeWave(originWave, vehicle)
+            waveMatrix[x][y] = originWave
+
+        # 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛�
+        waveMatrixList = waveMatrix.tolist()
+        # 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓�
+        waveMatrixJsonStr = json.dumps(waveMatrixList)
+
+        # 灏嗙粨鏋滀繚瀛樺洖 Redis
+        r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1", waveMatrixJsonStr)
+
+        end = time.perf_counter()
+        # 鎵撳嵃绋嬪簭杩愯鏃堕棿
+#         print(f"绋嬪簭杩愯鏃堕棿涓�: {end - startTime} Seconds")
+        print("1")
+    except Exception as e:
+        print(f"An error occurred: {e}")
         sys.exit(1)
-    codeMatrix = np.array(json.loads(codeMatrixStr))
-
-    # 鑾峰彇骞跺姞杞� cdaMatrix
-    cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1')
-    if cdaMatrixStr is None:
-        print("Error: 'KV.AGV_MAP_ASTAR_CDA_FLAG.1' not found in Redis.")
-        sys.exit(1)
-    cdaMatrix = np.array(json.loads(cdaMatrixStr))
-
-    # 鑾峰彇骞跺姞杞� dynamicMatrix
-    dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1')
-    if dynamicMatrixStr is None:
-        print("Error: 'KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1' not found in Redis.")
-        sys.exit(1)
-    dynamicMatrix = np.array(json.loads(dynamicMatrixStr))
-
-    # 鍒濆鍖� waveMatrix
-    waveMatrix = initWaveMatrix()
-
-    # 澶勭悊 dynamicMatrix
-    process_dynamic_matrix(dynamicMatrix, codeMatrix)
-
-    # 灏� numpy.ndarray 杞崲涓哄祵濂楀垪琛�
-    waveMatrixList = waveMatrix.tolist()
-    # 灏嗗祵濂楀垪琛ㄨ浆鎹负 JSON 瀛楃涓�
-    waveMatrixJsonStr = json.dumps(waveMatrixList)
-
-    # 灏嗙粨鏋滀繚瀛樺洖 Redis
-    r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1", waveMatrixJsonStr)
-
-    end = time.perf_counter()
-    # 鎵撳嵃绋嬪簭杩愯鏃堕棿
-#     print(f"绋嬪簭杩愯鏃堕棿涓�: {end - startTime} Seconds")
-    print("1")
 
 if __name__ == "__main__":
     main()
\ No newline at end of file

--
Gitblit v1.9.1