zhang
昨天 2fa19599467263dcf582bb12906e03328e03b4a4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""
数据模型定义
"""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import json
 
 
class AGVStatusEnum(Enum):
    # AGV状态
    IDLE = 0        # 空闲
    BUSY = 1        # 忙碌
    CHARGING = 2    # 充电
    ERROR = 3       # 故障
    MAINTENANCE = 4 # 维护
 
 
class TaskTypeEnum(Enum):
    # 任务类型
    PICKUP = "1"    # 取货
    DELIVERY = "2"  # 送货
    TRANSPORT = "3" # 运输
 
 
class AGVActionTypeEnum(Enum):
    # AGV动作类型
    AVOIDANCE = "1"    # 避让
    TASK = "2"         # 任务
    CHARGING = "3"     # 充电
    STANDBY = "4"      # 去待机位
 
 
@dataclass
class BackpackData:
    # 背篓数据
    index: int                    # 背篓编号
    loaded: bool                  # 是否载货
    execute: bool                 # 是否在执行
    taskId: Optional[str] = None  # 执行任务编号
 
 
@dataclass
class AGVStatus:
    # AGV状态
    agvId: str                          # 小车编号
    status: int                         # 状态
    position: str                       # 小车当前点位
    empty: int                          # 空背篓数量
    direction: str                      # 小车角度
    vol: int                           # 电压
    error: int                         # 异常码,0表示正常
    backpack: List[BackpackData] = field(default_factory=list)  # 背篓数据
    autoCharge: int = 20               # 低电量设定阈值,低于该值可以去自动充电也可以继续做任务
    lowVol: int = 10                   # 最低电量,电量低于该值必须去充电
 
 
@dataclass
class TaskData:
    # 任务数据
    taskId: str         # 任务id
    start: str          # 起点
    end: str            # 终点
    type: str          # 任务类型
    priority: int      # 优先级
 
 
@dataclass
class PathCode:
    # 路径点
    code: str                          # 地图点位id
    direction: str                     # 方向
    type: Optional[str] = None         # AGV动作类型(避让、任务、充电、去待机位)
    taskId: Optional[str] = None       # 任务编号,如果是执行任务则必需
    posType: Optional[str] = None      # 动作类型,表示到达某个点位进行的动作,如取、放
    lev: Optional[int] = None          # 表示posType对应的任务所要操作的是第几个背篓
 
 
@dataclass
class PlannedPath:
    # 规划路径
    agvId: str                      # 小车编号
    codeList: List[PathCode]        # 点位集合
    segId: Optional[str] = None     # 导航重复发送时的去重标识
 
 
@dataclass
class TaskAssignment:
    # 任务分配结果
    taskId: str        # 任务ID
    agvId: str         # 分配的AGV ID
    lev_id: int = 0    # 背篓位置编号
 
 
@dataclass
class APIResponse:
    # API响应格式 - 字段顺序固定为 code, msg, data
    code: int                       # 状态码
    msg: str                        # 消息
    data: Optional[Any] = None      # 数据
    
    def to_ordered_dict(self) -> Dict:
        # 转换为有序字典
        from collections import OrderedDict
        return OrderedDict([
            ('code', self.code),
            ('msg', self.msg),
            ('data', self.data)
        ])
 
 
class ResponseCode:
    # 响应状态码
    SUCCESS = 200           # 操作成功
    NO_DATA = 201          # 暂无数据
    PARAM_EMPTY = 401      # 参数为空
    PERMISSION_DENIED = 403 # 权限不足
    DUPLICATE_SUBMIT = 407  # 请勿重复提交
    SERVER_ERROR = 500     # 服务器错误
 
 
def create_success_response(data: Any = None, msg: str = "操作成功") -> APIResponse:
    # 创建成功响应
    return APIResponse(code=ResponseCode.SUCCESS, msg=msg, data=data)
 
def create_error_response(code: int, msg: str) -> APIResponse:
    # 创建错误响应
    return APIResponse(code=code, msg=msg, data=None)
 
def to_dict(obj) -> Dict:
    # 将数据类转换为字典
    if hasattr(obj, '__dict__'):
        if isinstance(obj, APIResponse):
            return obj.to_ordered_dict()
        
        result = {}
        for key, value in obj.__dict__.items():
            if isinstance(value, list):
                result[key] = [to_dict(item) if hasattr(item, '__dict__') else item for item in value]
            elif hasattr(value, '__dict__'):
                result[key] = to_dict(value)
            else:
                result[key] = value
        return result
    return obj
 
 
def from_dict(data_class, data: Dict):
    # 从字典创建数据类实例
    if isinstance(data, dict):
        field_types = data_class.__annotations__
        kwargs = {}
        
        for field_name, field_type in field_types.items():
            if field_name in data:
                value = data[field_name]
                
                if hasattr(field_type, '__origin__') and field_type.__origin__ is list:
                    inner_type = field_type.__args__[0]
                    if hasattr(inner_type, '__annotations__'):
                        kwargs[field_name] = [from_dict(inner_type, item) for item in value]
                    else:
                        kwargs[field_name] = value
                elif hasattr(field_type, '__origin__'):
                    import typing
                    origin = getattr(field_type, '__origin__', None)
                    
                    if origin is typing.Union or str(origin) == 'typing.Union':
                        kwargs[field_name] = value
                    else:
                        kwargs[field_name] = value
                elif hasattr(field_type, '__annotations__'):
                    kwargs[field_name] = from_dict(field_type, value)
                else:
                    kwargs[field_name] = value
            else:
                field_info = data_class.__dataclass_fields__.get(field_name)
                if field_info and field_info.default is not field_info.default_factory:
                    pass
                elif field_info and field_info.default_factory is not field_info.default_factory:
                    pass
        
        return data_class(**kwargs)
    return data