#
luxiaotao1123
2024-12-27 c4227d8a899eae8eb497c2cde0c9924ef7ecf010
zy-acs-manager/src/main/resources/agv1.py
@@ -1,160 +1,242 @@
import ast
import sys
import numpy as np
import json
import time
import redis
from collections import deque, defaultdict
radiusLen = None
# 将字符串转换为浮点型数组
##########################
# 工具函数
##########################
def convert_to_float_array(str_array):
    """
    将字符串或可迭代对象转换为浮点型数组。
    """
    if str_array == "-":
        return np.array([], dtype=float)
    if isinstance(str_array, str):
        return np.array(ast.literal_eval(str_array), dtype=float)
    return str_array
    elif isinstance(str_array, list) or isinstance(str_array, np.ndarray):
        return np.array(str_array, dtype=float)
    return np.array([], dtype=float)
def getWaveScopeByCode(x, y):
    code = codeMatrix[x, y]
    includeList = []
    existNodes = set()
    spreadWaveNode({"x": x, "y": y}, {"x": x, "y": y}, existNodes, includeList)
    return includeList
def spreadWaveNode(originNode, currNode, existNodes, includeList):
    x, y = currNode['x'], currNode['y']
    neighbors = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)]
    for neighbor in neighbors:
        extendNeighborNodes(originNode, {"x": neighbor[0], "y": neighbor[1]}, existNodes, includeList)
def extendNeighborNodes(originNode, nextNode, existNodes, includeList):
    x, y = nextNode['x'], nextNode['y']
    if (x < 0 or x >= codeMatrix.shape[0] or y < 0 or y >= codeMatrix.shape[1]):
        return
    if (x, y) in existNodes:
        return
    existNodes.add((x, y))
    nextNodeCodeData = codeMatrix[x, y]
    if nextNodeCodeData == 'NONE':
        spreadWaveNode(originNode, nextNode, existNodes, includeList)
    else:
        o1Cda = convert_to_float_array(cdaMatrix[originNode['x'], originNode['y']])
        o2Cda = convert_to_float_array(cdaMatrix[x, y])
        num1 = (o1Cda[0] - o2Cda[0]) ** 2
        num2 = (o1Cda[1] - o2Cda[1]) ** 2
        if num1 + num2 <= radiusLen ** 2:
            includeList.append({"x": int(x), "y": int(y), "code": str(codeMatrix[x, y])})
            spreadWaveNode(originNode, nextNode, existNodes, includeList)
# 找到某个值对应的 x, y 下标
def find_value_in_matrix(value):
    indices = np.where(codeMatrix == value)
    return list(zip(indices[0], indices[1]))
def initWaveMatrix():
    lev = 1
def initWaveMatrix(codeMatrix):
    """
    根据 codeMatrix 初始化 waveMatrix。
    若格子为 'NONE',则 waveMatrix[x][y] = "-"
    否则为 "[]"
    """
    waveMatrix = np.empty_like(codeMatrix, dtype=object)
    for x in range(codeMatrix.shape[0]):
        for y in range(codeMatrix.shape[1]):
            if codeMatrix[x][y] == 'NONE':
                waveMatrix[x][y] = "-"
            else:
                waveMatrix[x][y] = '[]'
                waveMatrix[x][y] = "[]"
    return waveMatrix
def mergeWave(originWave, vehicle):
    """
    originWave 是 waveMatrix[x][y] 中存储的字符串(内部是一个 JSON 数组),
    将新车辆 vehicle 合并进去(用 set 去重),
    并返回更新后的 JSON 字符串。
    """
    try:
        set_data = set(ast.literal_eval(originWave))
    except (ValueError, SyntaxError):
        set_data = set()
    set_data.add(vehicle)
    return json.dumps(list(set_data))
def convert_to_structured_array(dynamicMatrix):
    """
    将 dynamicMatrix(嵌套列表,里边是 dict)转换为 numpy 结构化数组,方便之后进行掩码筛选。
    """
    # 定义结构化数组的 dtype
    dtype = [('serial', int), ('vehicle', 'U2'), ('time', int)]
    structured_list = []
    for row in dynamicMatrix:
        for d in row:
            serial = d.get('serial', 0)
            vehicle = d.get('vehicle', '0')
            time_val = d.get('time', 0)
            structured_list.append((serial, vehicle, time_val))
    # 转换为结构化数组,并重塑为原始二维形状
    structured_array = np.array(structured_list, dtype=dtype)
    return structured_array.reshape(len(dynamicMatrix), -1)
##########################
# 多源 BFS 核心函数
##########################
def process_dynamic_matrix_multi_source_bfs(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen):
    """
    使用多源 BFS,一次性将所有含有车辆的格子作为起点同时进入队列。
    - 如果格子为 'NONE',则沿同一方向继续直线扩散;
    - 若遇到非 'NONE' 的格子,则进行欧几里得距离判定 <= radiusLen 时可到达,并且从此处再次朝四方向扩散(方向重置为 None)。
    """
    # 1. 转换 dynamicMatrix 为结构化数组
    dynamicMatrix = convert_to_structured_array(dynamicMatrix)
    rows, cols = dynamicMatrix.shape
    # 2. 初始化 waveMatrix(最终要存储回 Redis 的矩阵)
    waveMatrix = initWaveMatrix(codeMatrix)
    # 3. 建立一个与 waveMatrix 等大小的 2D 数组 waveSets,用于保存车辆集合(用 set 存储)
    waveSets = np.empty((rows, cols), dtype=object)
    for i in range(rows):
        for j in range(cols):
            waveSets[i, j] = set()
    # 4. 准备 BFS 队列,队列元素格式: (x, y, direction, vehicle)
    queue = deque()
    # 5. 找到所有有车辆的格子,统一入队列
    mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1')
    x_indices, y_indices = np.where(mask)
    for x, y in zip(x_indices, y_indices):
        v = dynamicMatrix[x][y]['vehicle']
        waveSets[x, y].add(v)
        queue.append((x, y, None, v))  # 起始时 direction=None
    # 6. 建立 visited 来避免重复访问
    #    因为一个位置 (x, y) 可能被多个车辆用多种方向访问,需要记录 (x, y, direction, vehicle)
    visited = set()
    # 7. BFS 主循环
    while queue:
        x, y, direction, vehicle = queue.popleft()
        if (x, y, direction, vehicle) in visited:
            continue
        visited.add((x, y, direction, vehicle))
        # 判断下一个要扩展的方向
        if direction is None:
            # 无方向时,四个方向同时扩散
            neighbors_info = [
                (x + 1, y, 'right'),
                (x - 1, y, 'left'),
                (x, y + 1, 'down'),
                (x, y - 1, 'up')
            ]
        else:
            # 有方向时,只往同一个方向扩展
            if direction == 'right':
                neighbors_info = [(x + 1, y, 'right')]
            elif direction == 'left':
                neighbors_info = [(x - 1, y, 'left')]
            elif direction == 'down':
                neighbors_info = [(x, y + 1, 'down')]
            elif direction == 'up':
                neighbors_info = [(x, y - 1, 'up')]
            else:
                neighbors_info = []
        # 遍历邻居
        for nx, ny, ndir in neighbors_info:
            # 边界检查
            if nx < 0 or nx >= rows or ny < 0 or ny >= cols:
                continue
            neighbor_code = codeMatrix[nx, ny]
            if neighbor_code == 'NONE':
                # 如果是 'NONE',则沿当前方向继续扩散
                if vehicle not in waveSets[nx, ny]:
                    waveSets[nx, ny].add(vehicle)
                    queue.append((nx, ny, ndir, vehicle))
            else:
                # 非 'NONE',需要用欧几里得距离判定
                c1 = convert_to_float_array(cdaMatrix[x, y])   # 当前坐标
                c2 = convert_to_float_array(cdaMatrix[nx, ny]) # 邻居坐标
                if c1.size < 2 or c2.size < 2:
                    continue
                dist_sqr = (c1[0] - c2[0])**2 + (c1[1] - c2[1])**2
                if dist_sqr <= radiusLen**2:
                    # 可以到达,则把车辆加入 waveSets
                    if vehicle not in waveSets[nx, ny]:
                        waveSets[nx, ny].add(vehicle)
                        # 方向重置为 None,表示四向扩散
                        queue.append((nx, ny, None, vehicle))
    # 8. BFS 完成后,将 waveSets 的信息写入 waveMatrix(字符串形式)
    for i in range(rows):
        for j in range(cols):
            if waveSets[i, j]:
                origin_str = waveMatrix[i][j]
                for v in waveSets[i, j]:
                    origin_str = mergeWave(origin_str, v)
                waveMatrix[i][j] = origin_str
    return waveMatrix
# 优化版本:使用集合来提高性能
def mergeWave(originWave, vehicle):
    # 将字符串解析为集合
    set_data = set(ast.literal_eval(originWave))
    # 如果 vehicle 不在集合中,则添加
    set_data.add(vehicle)
    # 返回序列化后的字符串
    return json.dumps(list(set_data))
# 将 dynamicMatrix 转换为 numpy 结构化数组
def convert_to_structured_array(dynamicMatrix):
    # 定义结构化数组的 dtype
    dtype = [('serial', int), ('vehicle', 'U2')]
    # 将嵌套的列表转换为结构化数组
    structured_array = np.array([tuple(d.values()) for row in dynamicMatrix for d in row], dtype=dtype)
    # 重塑为原始的二维形状
    return structured_array.reshape(len(dynamicMatrix), -1)
##########################
# 主函数入口
##########################
def main():
    global radiusLen, codeMatrix, cdaMatrix, waveMatrix  # 声明为全局变量
# 使用 numpy 加速的代码
def process_dynamic_matrix(dynamicMatrix, codeMatrix):
    # 将 dynamicMatrix 转换为结构化数组
    dynamicMatrix = convert_to_structured_array(dynamicMatrix)
    if len(sys.argv) != 6:
        print("Usage: python script.py <radiusLen> <redisHost> <redisPwd> <redisPort> <redisIdx>")
        sys.exit(1)
    # 获取 dynamicMatrix 的形状
    rows, cols = dynamicMatrix.shape
    radiusLenStr = sys.argv[1]
    try:
        radiusLen = float(radiusLenStr)
    except ValueError:
        print("Error: radiusLen must be a float.")
        sys.exit(1)
    # 创建一个布尔掩码,用于筛选出 vehicle 不为 '0' 和 '-1' 的元素
    mask = (dynamicMatrix['vehicle'] != '0') & (dynamicMatrix['vehicle'] != '-1')
    redisHost = sys.argv[2]
    redisPwd = sys.argv[3]
    redisPort = sys.argv[4]
    redisIdx = sys.argv[5]
    # 获取满足条件的 x 和 y 坐标
    x_indices, y_indices = np.where(mask)
    startTime = time.perf_counter()
    # 遍历满足条件的坐标
    for x, y in zip(x_indices, y_indices):
        # print(code)
        data = dynamicMatrix[x][y]
        vehicle = data['vehicle']
        includeList = getWaveScopeByCode(x,y)
        for include in includeList:
            originWave = waveMatrix[include['x']][include['y']]
            waveMatrix[include['x']][include['y']] = mergeWave(originWave, vehicle)
    try:
        # 1) 连接 Redis
        pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx))
        r = redis.Redis(connection_pool=pool)
radiusLenStr = sys.argv[1]
radiusLen = float(radiusLenStr)
        # 2) 获取并加载 codeMatrix
        codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1')
        if codeMatrixStr is None:
            print("Error: 'KV.AGV_MAP_ASTAR_CODE_FLAG.1' not found in Redis.")
            sys.exit(1)
        codeMatrix = np.array(json.loads(codeMatrixStr.decode('utf-8')), dtype=str)
redisHost = sys.argv[2]
redisPwd = sys.argv[3]
redisPort = sys.argv[4]
redisIdx = sys.argv[5]
        # 3) 获取并加载 cdaMatrix
        cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1')
        if cdaMatrixStr is None:
            print("Error: 'KV.AGV_MAP_ASTAR_CDA_FLAG.1' not found in Redis.")
            sys.exit(1)
        cdaMatrix = np.array(json.loads(cdaMatrixStr.decode('utf-8')), dtype=object)
startTime = time.perf_counter()
        # 4) 获取并加载 dynamicMatrix
        dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1')
        if dynamicMatrixStr is None:
            print("Error: 'KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1' not found in Redis.")
            sys.exit(1)
        dynamicMatrix = np.array(json.loads(dynamicMatrixStr.decode('utf-8')), dtype=object)
waveMatrix = initWaveMatrix()
        # 5) 使用多源 BFS 计算 waveMatrix
        waveMatrix = process_dynamic_matrix_multi_source_bfs(dynamicMatrix, codeMatrix, cdaMatrix, radiusLen)
# 创建一个连接池
pool = redis.ConnectionPool(host=redisHost, port=int(redisPort), password=redisPwd, db=int(redisIdx))
r = redis.Redis(connection_pool=pool)
        # 6) 将 waveMatrix 转为 JSON 并写回 Redis
        waveMatrixList = waveMatrix.tolist()
        waveMatrixJsonStr = json.dumps(waveMatrixList)
        r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1", waveMatrixJsonStr)
codeMatrixStr = r.get('KV.AGV_MAP_ASTAR_CODE_FLAG.1')
codeMatrix = np.array(json.loads(codeMatrixStr))
        end = time.perf_counter()
        print(f"程序运行时间为: {end - startTime} Seconds")
        print("1")
    except Exception as e:
        print(f"An error occurred: {e}")
        sys.exit(1)
cdaMatrixStr = r.get('KV.AGV_MAP_ASTAR_CDA_FLAG.1')
cdaMatrix = np.array(json.loads(cdaMatrixStr))
dynamicMatrixStr = r.get('KV.AGV_MAP_ASTAR_DYNAMIC_FLAG.1')
dynamicMatrix = np.array(json.loads(dynamicMatrixStr))
# # 使用 numpy 加速的代码
process_dynamic_matrix(dynamicMatrix, codeMatrix)
# for x in range(dynamicMatrix.shape[0]):
#     for y in range(dynamicMatrix.shape[1]):
#         data = dynamicMatrix[x, y]
#         vehicle = data['vehicle']
#         if vehicle != '0' and vehicle != '-1':
#             getWaveScopeByCode(x, y)
# 将 numpy.ndarray 转换为嵌套列表
waveMatrixList = waveMatrix.tolist()
# 将嵌套列表转换为 JSON 字符串
waveMatrixJsonStr = json.dumps(waveMatrixList)
r.set("KV.AGV_MAP_ASTAR_WAVE_FLAG.1",waveMatrixJsonStr)
end = time.perf_counter()
# print('程序运行时间为: %s Seconds' % (end - startTime))
print("1")
if __name__ == "__main__":
    main()