zhang
昨天 2fa19599467263dcf582bb12906e03328e03b4a4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
"""
AGV路径碰撞检测和解决模块
"""
import math
import logging
from typing import Dict, List, Tuple, Optional, Set
from dataclasses import dataclass
from collections import defaultdict
 
from common.data_models import PlannedPath, PathCode, AGVActionTypeEnum
from common.utils import get_coordinate_from_path_id
 
 
@dataclass
class SpaceTimeNode:
    """时空节点 - 表示AGV在特定时间和位置的状态"""
    agv_id: str
    position: str  # 位置码
    coordinates: Tuple[int, int]  # 坐标
    time_step: int  # 时间步
    direction: str  # 方向
 
 
@dataclass
class Conflict:
    """冲突描述"""
    type: str  # 冲突类型: "vertex", "edge", "follow"
    agv1: str
    agv2: str
    time_step: int
    position1: str
    position2: str
    description: str
 
 
@dataclass
class AGVPriority:
    """AGV优先级评估结果"""
    agv_id: str
    priority_score: float  # 优先级分数,越高越优先
    task_status: str  # 任务状态
    action_type: str  # 动作类型
    task_priority: int  # 任务优先级
    voltage: int  # 电量
    explanation: str  # 优先级说明
 
 
class CollisionDetector:
    """AGV路径碰撞检测器"""
    
    def __init__(self, path_mapping: Dict[str, Dict[str, int]], 
                 min_distance: float = 3.0, time_buffer: int = 1):
        """
        初始化碰撞检测器
        
        Args:
            path_mapping: 路径点映射字典
            min_distance: AGV之间的最小安全距离
            time_buffer: 时间缓冲区(时间步数)
        """
        self.path_mapping = path_mapping
        self.min_distance = min_distance
        self.time_buffer = time_buffer
        self.logger = logging.getLogger(__name__)
    
    def detect_conflicts(self, planned_paths: List[Dict]) -> List[Conflict]:
        """
        检测所有AGV路径之间的冲突
        
        Args:
            planned_paths: 规划路径列表
            
        Returns:
            List[Conflict]: 冲突列表
        """
        conflicts = []
        
        # 构建时空表
        space_time_table = self._build_space_time_table(planned_paths)
        
        # 检测顶点冲突(同一时间同一位置)
        conflicts.extend(self._detect_vertex_conflicts(space_time_table))
        
        # 检测边冲突(AGV交换位置)
        conflicts.extend(self._detect_edge_conflicts(space_time_table))
        
        # 检测跟随冲突(AGV距离过近)
        conflicts.extend(self._detect_following_conflicts(space_time_table))
        
        self.logger.info(f"检测到 {len(conflicts)} 个路径冲突")
        return conflicts
    
    def _build_space_time_table(self, planned_paths: List[Dict]) -> Dict[int, List[SpaceTimeNode]]:
        """
        构建时空表
        
        Args:
            planned_paths: 规划路径列表
            
        Returns:
            Dict[int, List[SpaceTimeNode]]: 时间步 -> 时空节点列表
        """
        space_time_table = defaultdict(list)
        
        for path_data in planned_paths:
            agv_id = path_data.get('agvId', '')
            code_list = path_data.get('codeList', [])
            
            for time_step, path_code in enumerate(code_list):
                position = path_code.get('code', '') if isinstance(path_code, dict) else path_code.code
                direction = path_code.get('direction', '90') if isinstance(path_code, dict) else path_code.direction
                
                coordinates = get_coordinate_from_path_id(position, self.path_mapping)
                if coordinates:
                    node = SpaceTimeNode(
                        agv_id=agv_id,
                        position=position,
                        coordinates=coordinates,
                        time_step=time_step,
                        direction=direction
                    )
                    space_time_table[time_step].append(node)
        
        return space_time_table
    
    def _detect_vertex_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
        """检测顶点冲突(同一时间同一位置)"""
        conflicts = []
        
        for time_step, nodes in space_time_table.items():
            # 按位置分组
            position_groups = defaultdict(list)
            for node in nodes:
                position_groups[node.position].append(node)
            
            # 检查每个位置是否有多个AGV
            for position, agv_nodes in position_groups.items():
                if len(agv_nodes) > 1:
                    # 发现冲突
                    for i in range(len(agv_nodes)):
                        for j in range(i + 1, len(agv_nodes)):
                            conflict = Conflict(
                                type="vertex",
                                agv1=agv_nodes[i].agv_id,
                                agv2=agv_nodes[j].agv_id,
                                time_step=time_step,
                                position1=position,
                                position2=position,
                                description=f"AGV {agv_nodes[i].agv_id} 和 {agv_nodes[j].agv_id} 在时间 {time_step} 占用同一位置 {position}"
                            )
                            conflicts.append(conflict)
        
        return conflicts
    
    def _detect_edge_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
        """检测边冲突(AGV交换位置)"""
        conflicts = []
        
        max_time = max(space_time_table.keys()) if space_time_table else 0
        
        for time_step in range(max_time):
            current_nodes = space_time_table.get(time_step, [])
            next_nodes = space_time_table.get(time_step + 1, [])
            
            # 构建当前和下一时刻的位置映射
            current_positions = {node.agv_id: node.position for node in current_nodes}
            next_positions = {node.agv_id: node.position for node in next_nodes}
            
            # 检查是否有AGV交换位置
            for agv1, pos1_current in current_positions.items():
                for agv2, pos2_current in current_positions.items():
                    if agv1 >= agv2:  # 避免重复检查
                        continue
                    
                    pos1_next = next_positions.get(agv1)
                    pos2_next = next_positions.get(agv2)
                    
                    if (pos1_next and pos2_next and 
                        pos1_current == pos2_next and pos2_current == pos1_next):
                        # 发现边冲突
                        conflict = Conflict(
                            type="edge",
                            agv1=agv1,
                            agv2=agv2,
                            time_step=time_step,
                            position1=pos1_current,
                            position2=pos2_current,
                            description=f"AGV {agv1} 和 {agv2} 在时间 {time_step}-{time_step+1} 交换位置 {pos1_current}<->{pos2_current}"
                        )
                        conflicts.append(conflict)
        
        return conflicts
    
    def _detect_following_conflicts(self, space_time_table: Dict[int, List[SpaceTimeNode]]) -> List[Conflict]:
        """检测跟随冲突(AGV距离过近)"""
        conflicts = []
        
        for time_step, nodes in space_time_table.items():
            # 检查所有AGV对之间的距离
            for i in range(len(nodes)):
                for j in range(i + 1, len(nodes)):
                    node1, node2 = nodes[i], nodes[j]
                    
                    # 计算欧几里得距离
                    distance = math.sqrt(
                        (node1.coordinates[0] - node2.coordinates[0]) ** 2 +
                        (node1.coordinates[1] - node2.coordinates[1]) ** 2
                    )
                    
                    if distance < self.min_distance and distance > 0:
                        conflict = Conflict(
                            type="follow",
                            agv1=node1.agv_id,
                            agv2=node2.agv_id,
                            time_step=time_step,
                            position1=node1.position,
                            position2=node2.position,
                            description=f"AGV {node1.agv_id} 和 {node2.agv_id} 在时间 {time_step} 距离过近 ({distance:.2f} < {self.min_distance})"
                        )
                        conflicts.append(conflict)
        
        return conflicts
 
 
class CollisionResolver:
    """AGV路径碰撞解决器"""
    
    def __init__(self, path_mapping: Dict[str, Dict[str, int]], detector: CollisionDetector, agv_manager=None):
        """
        初始化碰撞解决器
        
        Args:
            path_mapping: 路径点映射字典
            detector: 碰撞检测器
            agv_manager: AGV管理器,用于获取AGV状态信息
        """
        self.path_mapping = path_mapping
        self.detector = detector
        self.agv_manager = agv_manager
        self.logger = logging.getLogger(__name__)
    
    def evaluate_agv_priority(self, agv_id: str, path: Dict, executing_tasks: List[Dict] = None) -> AGVPriority:
        """
        评估AGV的避让优先级
        
        Args:
            agv_id: AGV ID
            path: AGV路径信息
            executing_tasks: 执行中的任务列表
            
        Returns:
            AGVPriority: AGV优先级评估结果
        """
        # 默认值
        task_status = "idle"
        action_type = "unknown"
        task_priority = 1
        voltage = 100
        priority_score = 0.0
        explanation_parts = []
        
        # 获取AGV状态信息
        agv_model = None
        if self.agv_manager:
            agv_model = self.agv_manager.get_agv_by_id(agv_id)
            if agv_model:
                voltage = agv_model.voltage
                
        # 从路径信息中获取动作类型
        code_list = path.get('codeList', [])
        if code_list:
            first_code = code_list[0]
            if isinstance(first_code, dict):
                action_type = first_code.get('type', 'unknown')
                
        # 从执行中任务获取任务状态和优先级
        if executing_tasks:
            for task in executing_tasks:
                if task.get('agvId') == agv_id:
                    task_status = task.get('status', 'idle')
                    # 尝试从任务ID获取优先级(简化处理)
                    task_id = task.get('taskId', '')
                    if 'HIGH_PRIORITY' in task_id:
                        task_priority = 10
                    elif 'PRIORITY' in task_id:
                        task_priority = 8
                    else:
                        task_priority = 5
                    break
        
        # 计算优先级分数(分数越高优先级越高,越不容易让步)
        priority_score = 0.0
        
        # 1. 任务状态优先级 (40%权重)
        if task_status == "executing":
            priority_score += 40.0
            explanation_parts.append("执行任务中(+40)")
        elif task_status == "assigned":
            priority_score += 20.0
            explanation_parts.append("已分配任务(+20)")
        else:
            priority_score += 5.0
            explanation_parts.append("空闲状态(+5)")
        
        # 2. 动作类型优先级 (30%权重)
        if action_type == AGVActionTypeEnum.TASK.value:
            priority_score += 30.0
            explanation_parts.append("任务行为(+30)")
        elif action_type == AGVActionTypeEnum.CHARGING.value:
            priority_score += 25.0
            explanation_parts.append("充电行为(+25)")
        elif action_type == AGVActionTypeEnum.AVOIDANCE.value:
            priority_score += 5.0
            explanation_parts.append("避让行为(+5)")
        elif action_type == AGVActionTypeEnum.STANDBY.value:
            priority_score += 10.0
            explanation_parts.append("待机行为(+10)")
        else:
            priority_score += 15.0
            explanation_parts.append("未知行为(+15)")
        
        # 3. 任务优先级 (20%权重)
        task_priority_score = (task_priority / 10.0) * 20.0
        priority_score += task_priority_score
        explanation_parts.append(f"任务优先级{task_priority}(+{task_priority_score:.1f})")
        
        # 4. 电量状态 (10%权重)
        if voltage <= 10:  # 必须充电
            priority_score += 10.0
            explanation_parts.append("电量危险(+10)")
        elif voltage <= 20:  # 可自动充电
            priority_score += 8.0
            explanation_parts.append("电量偏低(+8)")
        elif voltage <= 50:
            priority_score += 5.0
            explanation_parts.append("电量一般(+5)")
        else:
            priority_score += 2.0
            explanation_parts.append("电量充足(+2)")
        
        explanation = f"总分{priority_score:.1f}: " + ", ".join(explanation_parts)
        
        return AGVPriority(
            agv_id=agv_id,
            priority_score=priority_score,
            task_status=task_status,
            action_type=action_type,
            task_priority=task_priority,
            voltage=voltage,
            explanation=explanation
        )
    
    def resolve_conflicts(self, planned_paths: List[Dict], conflicts: List[Conflict], executing_tasks: List[Dict] = None) -> List[Dict]:
        """
        解决冲突
        
        Args:
            planned_paths: 原始规划路径
            conflicts: 冲突列表
            executing_tasks: 执行中的任务列表
            
        Returns:
            List[Dict]: 解决冲突后的路径
        """
        if not conflicts:
            return planned_paths
        
        resolved_paths = [path.copy() for path in planned_paths]
        
        # 按时间步排序冲突,优先解决早期冲突
        conflicts_sorted = sorted(conflicts, key=lambda c: c.time_step)
        
        for conflict in conflicts_sorted:
            resolved_paths = self._resolve_single_conflict(resolved_paths, conflict, executing_tasks)
        
        self.logger.info(f"解决了 {len(conflicts)} 个路径冲突")
        return resolved_paths
    
    def _resolve_single_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """
        解决单个冲突
        
        Args:
            paths: 当前路径列表
            conflict: 冲突
            executing_tasks: 执行中的任务列表
            
        Returns:
            List[Dict]: 更新后的路径列表
        """
        if conflict.type == "vertex":
            return self._resolve_vertex_conflict(paths, conflict, executing_tasks)
        elif conflict.type == "edge":
            return self._resolve_edge_conflict(paths, conflict, executing_tasks)
        elif conflict.type == "follow":
            return self._resolve_following_conflict(paths, conflict, executing_tasks)
        
        return paths
    
    def _resolve_vertex_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """解决顶点冲突 - 基于优先级评估选择让步的AGV"""
        updated_paths = paths.copy()
        
        # 找到冲突的AGV路径
        agv1_path = None
        agv2_path = None
        agv1_index = agv2_index = -1
        
        for i, path in enumerate(updated_paths):
            if path.get('agvId') == conflict.agv1:
                agv1_path = path
                agv1_index = i
            elif path.get('agvId') == conflict.agv2:
                agv2_path = path
                agv2_index = i
        
        if not agv1_path or not agv2_path:
            return updated_paths
        
        # 评估两个AGV的优先级
        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
        
        # 选择优先级较低的AGV进行等待
        if agv1_priority.priority_score <= agv2_priority.priority_score:
            waiting_agv_path = agv1_path
            waiting_agv_index = agv1_index
            waiting_agv_id = conflict.agv1
            higher_priority_agv_id = conflict.agv2
        else:
            waiting_agv_path = agv2_path
            waiting_agv_index = agv2_index
            waiting_agv_id = conflict.agv2
            higher_priority_agv_id = conflict.agv1
        
        # 记录详细的避让决策信息
        self.logger.info(f"顶点冲突避让决策 - 位置: {conflict.position1}, 时间: {conflict.time_step}")
        self.logger.info(f"  AGV {conflict.agv1}: {agv1_priority.explanation}")
        self.logger.info(f"  AGV {conflict.agv2}: {agv2_priority.explanation}")
        self.logger.info(f"  决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}")
        
        # 在冲突位置之前插入等待步骤
        self._insert_wait_step(waiting_agv_path, conflict.time_step)
        updated_paths[waiting_agv_index] = waiting_agv_path
        
        return updated_paths
    
    def _resolve_edge_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """解决边冲突 - 基于优先级评估选择让步的AGV"""
        updated_paths = paths.copy()
        
        # 找到冲突的AGV路径
        agv1_path = None
        agv2_path = None
        agv1_index = agv2_index = -1
        
        for i, path in enumerate(updated_paths):
            if path.get('agvId') == conflict.agv1:
                agv1_path = path
                agv1_index = i
            elif path.get('agvId') == conflict.agv2:
                agv2_path = path
                agv2_index = i
        
        if not agv1_path or not agv2_path:
            return updated_paths
        
        # 评估两个AGV的优先级
        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
        
        # 选择优先级较低的AGV进行等待
        if agv1_priority.priority_score <= agv2_priority.priority_score:
            waiting_agv_path = agv1_path
            waiting_agv_index = agv1_index
            waiting_agv_id = conflict.agv1
            higher_priority_agv_id = conflict.agv2
        else:
            waiting_agv_path = agv2_path
            waiting_agv_index = agv2_index
            waiting_agv_id = conflict.agv2
            higher_priority_agv_id = conflict.agv1
        
        # 记录详细的避让决策信息
        self.logger.info(f"边冲突避让决策 - 位置交换: {conflict.position1}<->{conflict.position2}, 时间: {conflict.time_step}")
        self.logger.info(f"  AGV {conflict.agv1}: {agv1_priority.explanation}")
        self.logger.info(f"  AGV {conflict.agv2}: {agv2_priority.explanation}")
        self.logger.info(f"  决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}")
        
        # 在冲突位置之前插入等待步骤
        self._insert_wait_step(waiting_agv_path, conflict.time_step)
        updated_paths[waiting_agv_index] = waiting_agv_path
        
        return updated_paths
    
    def _resolve_following_conflict(self, paths: List[Dict], conflict: Conflict, executing_tasks: List[Dict] = None) -> List[Dict]:
        """解决跟随冲突 - 基于优先级评估选择让步的AGV"""
        updated_paths = paths.copy()
        
        # 找到冲突的AGV路径
        agv1_path = None
        agv2_path = None
        agv1_index = agv2_index = -1
        
        for i, path in enumerate(updated_paths):
            if path.get('agvId') == conflict.agv1:
                agv1_path = path
                agv1_index = i
            elif path.get('agvId') == conflict.agv2:
                agv2_path = path
                agv2_index = i
        
        if not agv1_path or not agv2_path:
            return updated_paths
        
        # 评估两个AGV的优先级
        agv1_priority = self.evaluate_agv_priority(conflict.agv1, agv1_path, executing_tasks)
        agv2_priority = self.evaluate_agv_priority(conflict.agv2, agv2_path, executing_tasks)
        
        # 选择优先级较低的AGV进行等待
        if agv1_priority.priority_score <= agv2_priority.priority_score:
            waiting_agv_path = agv1_path
            waiting_agv_index = agv1_index
            waiting_agv_id = conflict.agv1
            higher_priority_agv_id = conflict.agv2
        else:
            waiting_agv_path = agv2_path
            waiting_agv_index = agv2_index
            waiting_agv_id = conflict.agv2
            higher_priority_agv_id = conflict.agv1
        
        # 记录详细的避让决策信息
        self.logger.info(f"跟随冲突避让决策 - 距离过近, 时间: {conflict.time_step}")
        self.logger.info(f"  AGV {conflict.agv1} (位置: {conflict.position1}): {agv1_priority.explanation}")
        self.logger.info(f"  AGV {conflict.agv2} (位置: {conflict.position2}): {agv2_priority.explanation}")
        self.logger.info(f"  决策: AGV {waiting_agv_id} 让步给 AGV {higher_priority_agv_id}")
        
        # 在冲突位置之前插入等待步骤
        self._insert_wait_step(waiting_agv_path, conflict.time_step)
        updated_paths[waiting_agv_index] = waiting_agv_path
        
        return updated_paths
    
    def _insert_wait_step(self, path: Dict, time_step: int):
        """
        在指定时间步插入等待步骤
        
        Args:
            path: AGV路径
            time_step: 插入位置的时间步
        """
        code_list = path.get('codeList', [])
        
        if time_step < len(code_list) and time_step > 0:
            # 在指定位置插入等待步骤(重复前一个位置)
            prev_code = code_list[time_step - 1]
            
            # 确保复制所有字段
            if isinstance(prev_code, dict):
                wait_code = prev_code.copy()
            else:
                wait_code = {
                    'code': prev_code.code,
                    'direction': prev_code.direction
                }
            
            code_list.insert(time_step, wait_code)
            path['codeList'] = code_list
    
    def validate_four_direction_movement(self, planned_paths: List[Dict]) -> List[Dict]:
        """
        验证并修正路径,确保AGV只能以0、90、180、270度移动
        
        Args:
            planned_paths: 规划路径列表
            
        Returns:
            List[Dict]: 修正后的路径列表
        """
        validated_paths = []
        
        for path_data in planned_paths:
            validated_path = self._validate_single_path_directions(path_data)
            validated_paths.append(validated_path)
        
        return validated_paths
    
    def _validate_single_path_directions(self, path_data: Dict) -> Dict:
        """
        验证单个路径的移动方向
        
        Args:
            path_data: 单个AGV路径数据
            
        Returns:
            Dict: 修正后的路径数据
        """
        validated_path = path_data.copy()
        code_list = path_data.get('codeList', [])
        
        if len(code_list) < 2:
            return validated_path
        
        validated_code_list = []
        
        for i in range(len(code_list)):
            current_code = code_list[i]
            
            if isinstance(current_code, dict):
                position = current_code.get('code', '')
                direction = current_code.get('direction', '90')
                
                # 保留原有的所有字段
                validated_code = current_code.copy()
            else:
                position = current_code.code
                direction = current_code.direction
                
                # 转换为字典格式并保留基本字段
                validated_code = {
                    'code': position,
                    'direction': direction
                }
            
            # 如果不是第一个点,计算正确的方向
            if i > 0:
                prev_code = code_list[i - 1]
                prev_position = prev_code.get('code', '') if isinstance(prev_code, dict) else prev_code.code
                
                # 计算移动方向
                correct_direction = self._calculate_movement_direction(prev_position, position)
                if correct_direction:
                    direction = correct_direction
            
            # 确保方向是有效的四方向之一
            direction = self._normalize_direction(direction)
            
            # 更新方向,保留其他所有字段
            validated_code['direction'] = direction
            validated_code_list.append(validated_code)
        
        validated_path['codeList'] = validated_code_list
        return validated_path
    
    def _calculate_movement_direction(self, from_position: str, to_position: str) -> Optional[str]:
        """
        计算从一个位置到另一个位置的移动方向
        
        Args:
            from_position: 起始位置码
            to_position: 目标位置码
            
        Returns:
            Optional[str]: 移动方向(0, 90, 180, 270)
        """
        from_coord = get_coordinate_from_path_id(from_position, self.path_mapping)
        to_coord = get_coordinate_from_path_id(to_position, self.path_mapping)
        
        if not from_coord or not to_coord:
            return None
        
        dx = to_coord[0] - from_coord[0]
        dy = to_coord[1] - from_coord[1]
        
        # 只允许四个方向的移动
        if dx > 0 and dy == 0:
            return "0"    # 向东
        elif dx < 0 and dy == 0:
            return "180"  # 向西
        elif dx == 0 and dy > 0:
            return "90"   # 向南
        elif dx == 0 and dy < 0:
            return "270"  # 向北
        else:
            # 不是标准四方向移动,返回默认方向
            return "90"
    
    def _normalize_direction(self, direction: str) -> str:
        """
        标准化方向值,确保只有0、90、180、270
        
        Args:
            direction: 原始方向值
            
        Returns:
            str: 标准化后的方向值
        """
        try:
            angle = int(direction) % 360
            
            # 将角度映射到最近的四方向
            if angle <= 45 or angle > 315:
                return "0"
            elif 45 < angle <= 135:
                return "90"
            elif 135 < angle <= 225:
                return "180"
            else:
                return "270"
        except:
            return "90"  # 默认方向