zhang
昨天 2fa19599467263dcf582bb12906e03328e03b4a4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
"""
监控RCS系统状态并生成路径
"""
import time
import threading
import logging
from typing import Dict, List, Optional, Any
import queue
 
from common.data_models import AGVStatus, PlannedPath, from_dict
from common.api_client import RCSAPIClient
from algorithm_system.algorithms.path_planning import BatchPathPlanner
from algorithm_system.models.agv_model import AGVModelManager
from common.utils import load_path_mapping, generate_segment_id
 
try:
    from config.settings import RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL
except ImportError:
    RCS_SERVER_HOST = "10.10.10.156"
    RCS_SERVER_PORT = 8088
    MONITOR_POLLING_INTERVAL = 5.0
    logging.warning("无法从config.settings导入配置,使用默认值")
 
 
class PathMonitorService:
    """监控RCS状态并生成路径"""
    
    def __init__(self, 
                 rcs_host: str = None, 
                 rcs_port: int = None,
                 poll_interval: float = None,
                 path_algorithm: str = "A_STAR",
                 auto_send_paths: bool = True):
        #初始化路径监控服务
        self.logger = logging.getLogger(__name__)
        
        self.rcs_host = rcs_host or RCS_SERVER_HOST
        self.rcs_port = rcs_port or RCS_SERVER_PORT
        self.poll_interval = poll_interval or MONITOR_POLLING_INTERVAL
        self.path_algorithm = path_algorithm
        self.auto_send_paths = auto_send_paths
        
        self.rcs_client = RCSAPIClient(self.rcs_host, self.rcs_port, timeout=10)
        self.path_mapping = load_path_mapping()
        self.agv_manager = AGVModelManager(self.path_mapping)
        
        self.path_planner = BatchPathPlanner(
            self.path_mapping, 
            self.path_algorithm,
            self.agv_manager
        )
        
        self.is_running = False
        self.monitor_thread = None
        self._stop_event = threading.Event()
        
        self.stats = {
            'poll_count': 0,           # 轮询次数
            'successful_polls': 0,     # 成功轮询次数  
            'path_generations': 0,     # 路径生成次数
            'last_poll_time': None,    # 最后轮询时间
            'last_generation_time': None  # 最后生成时间
        }
        
        self.logger.info(f"路径监控服务初始化完成 - 轮询间隔: {poll_interval}s, 算法: {path_algorithm}")
    
    def start_monitoring(self):
        """启动监控服务"""
        if self.is_running:
            self.logger.warning("监控服务已在运行中")
            return
        
        self.is_running = True
        self._stop_event.clear()
        
        self.monitor_thread = threading.Thread(
            target=self._monitoring_loop,
            name="PathMonitorThread",
            daemon=True
        )
        self.monitor_thread.start()
        
        self.logger.info("监控服务已启动")
    
    def stop_monitoring(self):
        """停止监控服务"""
        if not self.is_running:
            self.logger.warning("监控服务未在运行")
            return
        
        self.is_running = False
        self._stop_event.set()
        
        if self.monitor_thread and self.monitor_thread.is_alive():
            self.monitor_thread.join(timeout=5.0)
        
        self.logger.info("监控服务已停止")
    
    def _monitoring_loop(self):
        """监控主循环"""
        self.logger.info("开始监控循环...")
        
        while self.is_running and not self._stop_event.is_set():
            try:
                self._perform_monitoring_cycle()
                
                if not self._stop_event.wait(self.poll_interval):
                    continue
                else:
                    break
                    
            except Exception as e:
                self.logger.error(f"监控循环异常: {e}")
                self._stop_event.wait(min(self.poll_interval * 2, 10.0))
        
        self.logger.info("监控循环已结束")
    
    def _perform_monitoring_cycle(self):
        """执行一次监控周期"""
        start_time = time.time()
        
        self.stats['poll_count'] += 1
        self.stats['last_poll_time'] = start_time
        
        try:
            self.logger.info(f"开始路径监控周期 #{self.stats['poll_count']}")
            
            # 1. 获取当前AGV状态和任务状态
            current_agv_status, current_task_status = self._fetch_rcs_status()
            
            if not current_agv_status:
                self.logger.debug("未获取到AGV状态数据,跳过路径生成")
                return
            
            # 2. 直接根据当前状态生成免碰撞路径
            self.logger.info(f"[路径生成] 开始为 {len(current_agv_status)} 个AGV生成路径")
            
            generated_paths = self._generate_collision_free_paths(current_agv_status)
            
            # 3. 发送路径到RCS(根据配置决定)
            if generated_paths:
                if self.auto_send_paths:
                    self._send_paths_to_rcs(generated_paths)
                else:
                    self.logger.debug(f"路径生成完成但未发送到RCS - AGV数量: {len(current_agv_status)}, 路径数: {len(generated_paths)}")
                
                self.logger.info(f"路径生成完成 - AGV数量: {len(current_agv_status)}, 路径数: {len(generated_paths)}, 自动发送: {self.auto_send_paths}")
            else:
                self.logger.debug("未生成任何路径(可能所有AGV都处于空闲状态)")
            
            # 更新成功统计
            self.stats['successful_polls'] += 1
            self.stats['path_generations'] += 1
            self.stats['last_generation_time'] = time.time()
            
            generation_time = (time.time() - start_time) * 1000
            self.logger.debug(f"监控周期完成 - 耗时: {generation_time:.2f}ms")
            
        except Exception as e:
            self.logger.error(f"监控周期执行失败: {e}")
    
    def _generate_unique_seg_id(self, agv_id: str, task_id: str = '') -> str:
        """生成segId"""
        try:
            if task_id:
                # 有任务时使用task_id
                return generate_segment_id(agv_id=agv_id, task_id=task_id, action_type="2")
            else:
                # 无任务时使用时间戳作为目标位置,确保不同时间有不同segId
                import time
                time_target = f"IDLE_{int(time.time() / 3600)}"  # 按小时分组
                return generate_segment_id(agv_id=agv_id, target_position=time_target, action_type="4")
            
        except Exception as e:
            self.logger.error(f"生成segId失败: {e}")
            # 降级方案:使用common.utils的函数生成备用ID
            import time
            fallback_target = f"FALLBACK_{int(time.time())}"
            return generate_segment_id(agv_id=agv_id, target_position=fallback_target, action_type="1")
    
    def _fetch_rcs_status(self) -> tuple[List[AGVStatus], Dict]:
        """从RCS获取当前状态"""
        agv_status_list = []
        task_status = {}
        
        try:
            # 获取AGV状态 - 使用空参数获取所有AGV状态和任务状态
            self.logger.info(" 轮询目标: 使用空参数(agvId=None, mapId=None)获取RCS所有AGV状态和任务状态")
            agv_response = self.rcs_client.get_agv_status(agv_id=None, map_id=None)
            
            if agv_response.code == 200 and agv_response.data:
                self.logger.info(f"成功获取AGV状态 - 数量: {len(agv_response.data)}")
                
                # 统计信息
                total_tasks = 0
                executing_tasks = 0
                loaded_tasks = 0
                
                self.logger.info("=" * 80)
                self.logger.info("[详细AGV状态信息]")
                self.logger.info("=" * 80)
                
                for i, agv_data in enumerate(agv_response.data, 1):
                    try:
                        # 转换为AGVStatus对象
                        agv_status = from_dict(AGVStatus, agv_data)
                        agv_status_list.append(agv_status)
                        
                        # 打印AGV基本信息
                        self.logger.info(f"[AGV #{i}] {agv_status.agvId}")
                        self.logger.info(f"   状态: {agv_status.status} | 位置: {agv_status.position} | 方向: {agv_status.direction}")
                        self.logger.info(f"   电压: {agv_status.vol} | 错误码: {agv_status.error} | 空背篓: {agv_status.empty}")
                        
                        # 分析背篓信息
                        if agv_status.backpack:
                            self.logger.info(f"   [背篓信息] ({len(agv_status.backpack)} 个):")
                            for backpack_item in agv_status.backpack:
                                status_text = "执行中" if backpack_item.execute else ("已装载" if backpack_item.loaded else "空闲")
                                task_info = f"任务: {backpack_item.taskId}" if backpack_item.taskId else "无任务"
                                
                                self.logger.info(f"     [位置{backpack_item.index}] {task_info} | "
                                                f"状态: {status_text} | 已装载: {backpack_item.loaded} | 执行中: {backpack_item.execute}")
                                
                                # 统计任务
                                if backpack_item.taskId:
                                    total_tasks += 1
                                    if backpack_item.execute:
                                        executing_tasks += 1
                                    if backpack_item.loaded:
                                        loaded_tasks += 1
                                    
                                    # 提取任务状态信息
                                    task_status[backpack_item.taskId] = {
                                        'agvId': agv_status.agvId,
                                        'loaded': backpack_item.loaded,
                                        'execute': backpack_item.execute,
                                        'index': backpack_item.index
                                    }
                        else:
                            self.logger.info(f"   [背篓信息] 无背篓信息")
                        
                        self.logger.info("-" * 60)
                    except Exception as e:
                        self.logger.warning(f"解析AGV状态数据失败: {agv_data} - {e}")
                
                # 打印统计摘要
                self.logger.info("[任务统计摘要]")
                self.logger.info(f"   总AGV数量: {len(agv_status_list)}")
                self.logger.info(f"   总任务数量: {total_tasks}")
                self.logger.info(f"   执行中任务: {executing_tasks}")
                self.logger.info(f"   已装载任务: {loaded_tasks}")
                self.logger.info(f"   未装载任务: {total_tasks - loaded_tasks}")
                self.logger.info("=" * 80)
                
            else:
                self.logger.warning(f"获取AGV状态失败: {agv_response.msg if agv_response else '无响应'}")
        
        except Exception as e:
            self.logger.error(f"从RCS获取状态失败: {e}")
        
        return agv_status_list, task_status
    
 
    
    def _generate_collision_free_paths(self, agv_status_list: List[AGVStatus]) -> List[Dict]:
        """为所有AGV生成免碰撞路径"""
        try:
            self.logger.debug(f"开始为 {len(agv_status_list)} 个AGV生成免碰撞路径")
            
            # 使用批量路径规划器生成路径
            result = self.path_planner.plan_all_agv_paths(
                agv_status_list=agv_status_list,
                include_idle_agv=False,  # 只为执行任务的AGV生成路径
                constraints=None
            )
 
            planned_paths = result.get('plannedPaths', result.get('planned_paths', []))
            
            # 诊断:显示路径规划结果的所有字段
            self.logger.debug(f"路径规划器返回的字段: {list(result.keys())}")
            self.logger.debug(f"planned_paths字段存在: {'planned_paths' in result}")
            self.logger.debug(f"plannedPaths字段存在: {'plannedPaths' in result}")
            self.logger.debug(f"最终获取到的路径数量: {len(planned_paths)}")
            
            if planned_paths:
                self.logger.info(f"成功生成 {len(planned_paths)} 条免碰撞路径")
                
                # 记录每个AGV的路径信息
                for path_info in planned_paths:
                    agv_id = path_info.get('agvId', 'unknown')
                    code_list = path_info.get('codeList', [])
                    self.logger.debug(f"AGV {agv_id} 路径长度: {len(code_list)}")
            else:
                self.logger.info("未生成任何路径(可能所有AGV都处于空闲状态)")
            
            return planned_paths
            
        except Exception as e:
            self.logger.error(f"生成免碰撞路径失败: {e}")
            return []
    
 
    
    def _send_paths_to_rcs(self, generated_paths: List[Dict]):
        """将生成的路径批量发送到RCS系统"""
        try:
            self.logger.info(f"开始批量发送 {len(generated_paths)} 条路径到RCS系统")
            
            # 构建批量路径数据
            agv_paths = []
            import uuid
            import time as time_module
            
            for path_info in generated_paths:
                agv_id = path_info.get('agvId')
                code_list = path_info.get('codeList', [])
                
                if agv_id and code_list:
                    path_task_id = ''
                    for path_code in code_list:
                        if path_code.get('taskId'):
                            path_task_id = path_code.get('taskId')
                            break
                    
                    navigation_data = []
                    for path_code in code_list:
                        nav_item = {
                            'code': path_code.get('code', ''),
                            'direction': path_code.get('direction', '90'),
                            'type': path_code.get('type')
                        }
                        
                        if path_code.get('taskId'):
                            nav_item['taskId'] = path_code.get('taskId')
                        
                        if path_code.get('posType'):
                            nav_item['posType'] = path_code.get('posType')
                        
                        if path_code.get('lev') is not None:
                            nav_item['lev'] = path_code.get('lev')
                            
                        navigation_data.append(nav_item)
                    
                    auto_seg_id = self._generate_unique_seg_id(agv_id, path_task_id)
                    
                    agv_path_data = {
                        'agvId': agv_id,
                        'segId': auto_seg_id,
                        'codeList': navigation_data
                    }
                    
                    agv_paths.append(agv_path_data)
            
            if not agv_paths:
                self.logger.warning("没有有效的路径数据需要发送")
                return
            
            try:
                import requests
                import json
                
                rcs_url = f"http://{self.rcs_host}:{self.rcs_port}/api/open/algorithm/zkd/navigation/v1"
                
                self.logger.info(f"算法系统发送路径规划结果到RCS:")
                self.logger.info(f"   AGV数量: {len(agv_paths)}")
                self.logger.info(f"   目标URL: {rcs_url}")
                
                successful_sends = 0
                
                for agv_path in agv_paths:
                    agv_id = agv_path['agvId']
                    seg_id = agv_path['segId']
                    code_list = agv_path['codeList']
                    
                    # 构造标准导航请求数据(agvId, segId, codeList格式)
                    navigation_data = {
                        'agvId': agv_id,
                        'segId': seg_id,
                        'codeList': code_list
                    }
                    
                    # 显示路径摘要
                    start_code = code_list[0].get('code', '') if code_list else ''
                    end_code = code_list[-1].get('code', '') if code_list else ''
                    task_codes = [nc for nc in code_list if nc.get('taskId')]
                    task_id = task_codes[0].get('taskId', '') if task_codes else ''
                    
                    self.logger.info(f"    AGV: {agv_id}, 任务: {task_id}, 路径: {start_code} -> {end_code} ({len(code_list)}点)")
                    
                    # 详细显示路径JSON数据
                    self.logger.info(f"发送路径JSON数据:")
                    self.logger.info(json.dumps(navigation_data, ensure_ascii=False, indent=2))
                    
                    try:
                        response = requests.post(
                            rcs_url,
                            json=navigation_data,
                            timeout=10,
                            headers={'Content-Type': 'application/json'}
                        )
                        
                        if response.status_code == 200:
                            response_data = response.json()
                            self.logger.info(f" 成功发送路径到RCS - AGV: {agv_id}, 响应: {response_data.get('msg', 'OK')}")
                            successful_sends += 1
                        else:
                            self.logger.warning(f" 发送路径到RCS失败 - AGV: {agv_id}, HTTP状态: {response.status_code}")
                            self.logger.warning(f"   响应内容: {response.text}")
                            
                    except Exception as e:
                        self.logger.warning(f" 发送导航指令异常 - AGV: {agv_id}, 错误: {e}")
                
                self.logger.info(f" 路径发送完成 - 成功: {successful_sends}/{len(agv_paths)}")
                    
            except Exception as e:
                self.logger.error(f" 发送导航指令异常: {e}")
        
        except Exception as e:
            self.logger.error(f"发送路径到RCS异常: {e}")
    
    def get_monitoring_status(self) -> Dict[str, Any]:
        """获取监控服务状态"""
        return {
            'is_running': self.is_running,
            'rcs_host': self.rcs_host,
            'rcs_port': self.rcs_port,
            'poll_interval': self.poll_interval,
            'path_algorithm': self.path_algorithm,
            'auto_send_paths': self.auto_send_paths,
            'stats': self.stats.copy()
        }
    
    def reset_stats(self):
        """重置统计信息"""
        self.stats = {
            'poll_count': 0,
            'successful_polls': 0,
            'path_generations': 0,
            'last_poll_time': None,
            'last_generation_time': None
        }
        self.logger.info("监控统计信息已重置")