""" 数据模型定义 """ from dataclasses import dataclass, field from typing import List, Optional, Dict, Any from enum import Enum import json class AGVStatusEnum(Enum): # AGV状态 IDLE = 0 # 空闲 BUSY = 1 # 忙碌 CHARGING = 2 # 充电 ERROR = 3 # 故障 MAINTENANCE = 4 # 维护 class TaskTypeEnum(Enum): # 任务类型 PICKUP = "1" # 取货 DELIVERY = "2" # 送货 TRANSPORT = "3" # 运输 class AGVActionTypeEnum(Enum): # AGV动作类型 AVOIDANCE = "1" # 避让 TASK = "2" # 任务 CHARGING = "3" # 充电 STANDBY = "4" # 去待机位 @dataclass class BackpackData: # 背篓数据 index: int # 背篓编号 loaded: bool # 是否载货 execute: bool # 是否在执行 taskId: Optional[str] = None # 执行任务编号 @dataclass class AGVStatus: # AGV状态 agvId: str # 小车编号 status: int # 状态 position: str # 小车当前点位 empty: int # 空背篓数量 direction: str # 小车角度 vol: int # 电压 error: int # 异常码,0表示正常 backpack: List[BackpackData] = field(default_factory=list) # 背篓数据 autoCharge: int = 20 # 低电量设定阈值,低于该值可以去自动充电也可以继续做任务 lowVol: int = 10 # 最低电量,电量低于该值必须去充电 @dataclass class TaskData: # 任务数据 taskId: str # 任务id start: str # 起点 end: str # 终点 type: str # 任务类型 priority: int # 优先级 @dataclass class PathCode: # 路径点 code: str # 地图点位id direction: str # 方向 type: Optional[str] = None # AGV动作类型(避让、任务、充电、去待机位) taskId: Optional[str] = None # 任务编号,如果是执行任务则必需 posType: Optional[str] = None # 动作类型,表示到达某个点位进行的动作,如取、放 lev: Optional[int] = None # 表示posType对应的任务所要操作的是第几个背篓 @dataclass class PlannedPath: # 规划路径 agvId: str # 小车编号 codeList: List[PathCode] # 点位集合 segId: Optional[str] = None # 导航重复发送时的去重标识 @dataclass class TaskAssignment: # 任务分配结果 taskId: str # 任务ID agvId: str # 分配的AGV ID lev_id: int = 0 # 背篓位置编号 @dataclass class APIResponse: # API响应格式 - 字段顺序固定为 code, msg, data code: int # 状态码 msg: str # 消息 data: Optional[Any] = None # 数据 def to_ordered_dict(self) -> Dict: # 转换为有序字典 from collections import OrderedDict return OrderedDict([ ('code', self.code), ('msg', self.msg), ('data', self.data) ]) class ResponseCode: # 响应状态码 SUCCESS = 200 # 操作成功 NO_DATA = 201 # 暂无数据 PARAM_EMPTY = 401 # 参数为空 PERMISSION_DENIED = 403 # 权限不足 DUPLICATE_SUBMIT = 407 # 请勿重复提交 SERVER_ERROR = 500 # 服务器错误 def create_success_response(data: Any = None, msg: str = "操作成功") -> APIResponse: # 创建成功响应 return APIResponse(code=ResponseCode.SUCCESS, msg=msg, data=data) def create_error_response(code: int, msg: str) -> APIResponse: # 创建错误响应 return APIResponse(code=code, msg=msg, data=None) def to_dict(obj) -> Dict: # 将数据类转换为字典 if hasattr(obj, '__dict__'): if isinstance(obj, APIResponse): return obj.to_ordered_dict() result = {} for key, value in obj.__dict__.items(): if isinstance(value, list): result[key] = [to_dict(item) if hasattr(item, '__dict__') else item for item in value] elif hasattr(value, '__dict__'): result[key] = to_dict(value) else: result[key] = value return result return obj def from_dict(data_class, data: Dict): # 从字典创建数据类实例 if isinstance(data, dict): field_types = data_class.__annotations__ kwargs = {} for field_name, field_type in field_types.items(): if field_name in data: value = data[field_name] if hasattr(field_type, '__origin__') and field_type.__origin__ is list: inner_type = field_type.__args__[0] if hasattr(inner_type, '__annotations__'): kwargs[field_name] = [from_dict(inner_type, item) for item in value] else: kwargs[field_name] = value elif hasattr(field_type, '__origin__'): import typing origin = getattr(field_type, '__origin__', None) if origin is typing.Union or str(origin) == 'typing.Union': kwargs[field_name] = value else: kwargs[field_name] = value elif hasattr(field_type, '__annotations__'): kwargs[field_name] = from_dict(field_type, value) else: kwargs[field_name] = value else: field_info = data_class.__dataclass_fields__.get(field_name) if field_info and field_info.default is not field_info.default_factory: pass elif field_info and field_info.default_factory is not field_info.default_factory: pass return data_class(**kwargs) return data