"""
|
数据模型定义
|
"""
|
from dataclasses import dataclass, field
|
from typing import List, Optional, Dict, Any
|
from enum import Enum
|
import json
|
|
|
class AGVStatusEnum(Enum):
|
# AGV状态
|
IDLE = 0 # 空闲
|
BUSY = 1 # 忙碌
|
CHARGING = 2 # 充电
|
ERROR = 3 # 故障
|
MAINTENANCE = 4 # 维护
|
|
|
class TaskTypeEnum(Enum):
|
# 任务类型
|
PICKUP = "1" # 取货
|
DELIVERY = "2" # 送货
|
TRANSPORT = "3" # 运输
|
|
|
class AGVActionTypeEnum(Enum):
|
# AGV动作类型
|
AVOIDANCE = "1" # 避让
|
TASK = "2" # 任务
|
CHARGING = "3" # 充电
|
STANDBY = "4" # 去待机位
|
|
|
@dataclass
|
class BackpackData:
|
# 背篓数据
|
index: int # 背篓编号
|
loaded: bool # 是否载货
|
execute: bool # 是否在执行
|
taskId: Optional[str] = None # 执行任务编号
|
|
|
@dataclass
|
class AGVStatus:
|
# AGV状态
|
agvId: str # 小车编号
|
status: int # 状态
|
position: str # 小车当前点位
|
empty: int # 空背篓数量
|
direction: str # 小车角度
|
vol: int # 电压
|
error: int # 异常码,0表示正常
|
backpack: List[BackpackData] = field(default_factory=list) # 背篓数据
|
autoCharge: int = 20 # 低电量设定阈值,低于该值可以去自动充电也可以继续做任务
|
lowVol: int = 10 # 最低电量,电量低于该值必须去充电
|
|
|
@dataclass
|
class TaskData:
|
# 任务数据
|
taskId: str # 任务id
|
start: str # 起点
|
end: str # 终点
|
type: str # 任务类型
|
priority: int # 优先级
|
|
|
@dataclass
|
class PathCode:
|
# 路径点
|
code: str # 地图点位id
|
direction: str # 方向
|
type: Optional[str] = None # AGV动作类型(避让、任务、充电、去待机位)
|
taskId: Optional[str] = None # 任务编号,如果是执行任务则必需
|
posType: Optional[str] = None # 动作类型,表示到达某个点位进行的动作,如取、放
|
lev: Optional[int] = None # 表示posType对应的任务所要操作的是第几个背篓
|
|
|
@dataclass
|
class PlannedPath:
|
# 规划路径
|
agvId: str # 小车编号
|
codeList: List[PathCode] # 点位集合
|
segId: Optional[str] = None # 导航重复发送时的去重标识
|
|
|
@dataclass
|
class TaskAssignment:
|
# 任务分配结果
|
taskId: str # 任务ID
|
agvId: str # 分配的AGV ID
|
lev_id: int = 0 # 背篓位置编号
|
|
|
@dataclass
|
class APIResponse:
|
# API响应格式 - 字段顺序固定为 code, msg, data
|
code: int # 状态码
|
msg: str # 消息
|
data: Optional[Any] = None # 数据
|
|
def to_ordered_dict(self) -> Dict:
|
# 转换为有序字典
|
from collections import OrderedDict
|
return OrderedDict([
|
('code', self.code),
|
('msg', self.msg),
|
('data', self.data)
|
])
|
|
|
class ResponseCode:
|
# 响应状态码
|
SUCCESS = 200 # 操作成功
|
NO_DATA = 201 # 暂无数据
|
PARAM_EMPTY = 401 # 参数为空
|
PERMISSION_DENIED = 403 # 权限不足
|
DUPLICATE_SUBMIT = 407 # 请勿重复提交
|
SERVER_ERROR = 500 # 服务器错误
|
|
|
def create_success_response(data: Any = None, msg: str = "操作成功") -> APIResponse:
|
# 创建成功响应
|
return APIResponse(code=ResponseCode.SUCCESS, msg=msg, data=data)
|
|
def create_error_response(code: int, msg: str) -> APIResponse:
|
# 创建错误响应
|
return APIResponse(code=code, msg=msg, data=None)
|
|
def to_dict(obj) -> Dict:
|
# 将数据类转换为字典
|
if hasattr(obj, '__dict__'):
|
if isinstance(obj, APIResponse):
|
return obj.to_ordered_dict()
|
|
result = {}
|
for key, value in obj.__dict__.items():
|
if isinstance(value, list):
|
result[key] = [to_dict(item) if hasattr(item, '__dict__') else item for item in value]
|
elif hasattr(value, '__dict__'):
|
result[key] = to_dict(value)
|
else:
|
result[key] = value
|
return result
|
return obj
|
|
|
def from_dict(data_class, data: Dict):
|
# 从字典创建数据类实例
|
if isinstance(data, dict):
|
field_types = data_class.__annotations__
|
kwargs = {}
|
|
for field_name, field_type in field_types.items():
|
if field_name in data:
|
value = data[field_name]
|
|
if hasattr(field_type, '__origin__') and field_type.__origin__ is list:
|
inner_type = field_type.__args__[0]
|
if hasattr(inner_type, '__annotations__'):
|
kwargs[field_name] = [from_dict(inner_type, item) for item in value]
|
else:
|
kwargs[field_name] = value
|
elif hasattr(field_type, '__origin__'):
|
import typing
|
origin = getattr(field_type, '__origin__', None)
|
|
if origin is typing.Union or str(origin) == 'typing.Union':
|
kwargs[field_name] = value
|
else:
|
kwargs[field_name] = value
|
elif hasattr(field_type, '__annotations__'):
|
kwargs[field_name] = from_dict(field_type, value)
|
else:
|
kwargs[field_name] = value
|
else:
|
field_info = data_class.__dataclass_fields__.get(field_name)
|
if field_info and field_info.default is not field_info.default_factory:
|
pass
|
elif field_info and field_info.default_factory is not field_info.default_factory:
|
pass
|
|
return data_class(**kwargs)
|
return data
|