""" 与RCS系统通信 """ import requests import json import logging import time from typing import Dict, Optional, Any from .data_models import ( APIResponse, create_error_response, ResponseCode ) try: from config.settings import ( RCS_SERVER_HOST, RCS_SERVER_PORT, REQUEST_TIMEOUT, AGV_STATUS_API_ENDPOINT ) except ImportError: RCS_SERVER_HOST = "10.10.10.156" RCS_SERVER_PORT = 8088 REQUEST_TIMEOUT = 30 AGV_STATUS_API_ENDPOINT = "/api/open/algorithm/getAgv" logging.warning("无法从config.settings导入配置,使用默认值") class APIClient: """HTTP API客户端基类""" def __init__(self, base_url: str, timeout: int = 30): """初始化API客户端""" self.base_url = base_url.rstrip('/') self.timeout = timeout self.logger = logging.getLogger(__name__) self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json', 'Accept': 'application/json' }) def _make_request(self, method: str, endpoint: str, data: Any = None, params: Dict = None) -> APIResponse: """发送HTTP请求""" url = f"{self.base_url}{endpoint}" try: start_time = time.time() request_kwargs = { 'timeout': self.timeout, 'params': params } if data is not None: if isinstance(data, (dict, list)): request_kwargs['json'] = data else: request_kwargs['data'] = json.dumps(data) response = self.session.request(method, url, **request_kwargs) end_time = time.time() duration = (end_time - start_time) * 1000 # 转换为毫秒 self.logger.info(f"{method} {url} - {response.status_code} - {duration:.2f}ms") try: response_data = response.json() except ValueError: response_data = {"text": response.text} if response.status_code == 200: if isinstance(response_data, dict) and 'code' in response_data: return APIResponse( code=response_data.get('code', ResponseCode.SUCCESS), msg=response_data.get('msg', '操作成功'), data=response_data.get('data') ) else: return APIResponse( code=ResponseCode.SUCCESS, msg='操作成功', data=response_data ) else: return create_error_response( ResponseCode.SERVER_ERROR, f"HTTP {response.status_code}: {response.text}" ) except requests.exceptions.Timeout: self.logger.error(f"请求超时: {url}") return create_error_response(ResponseCode.SERVER_ERROR, "请求超时") except requests.exceptions.ConnectionError: self.logger.error(f"连接错误: {url}") return create_error_response(ResponseCode.SERVER_ERROR, "连接错误") except Exception as e: self.logger.error(f"请求异常: {url} - {str(e)}") return create_error_response(ResponseCode.SERVER_ERROR, f"请求异常: {str(e)}") class RCSAPIClient(APIClient): def __init__(self, rcs_host: str = None, rcs_port: int = None, timeout: int = None): """初始化RCS API客户端""" rcs_host = rcs_host or RCS_SERVER_HOST rcs_port = rcs_port or RCS_SERVER_PORT timeout = timeout or REQUEST_TIMEOUT base_url = f"http://{rcs_host}:{rcs_port}" super().__init__(base_url, timeout) def get_agv_status(self, agv_id: Optional[str] = None, map_id: Optional[str] = None) -> APIResponse: """获取AGV状态""" data = {} if agv_id: data['agvId'] = agv_id if map_id: data['mapId'] = map_id return self._make_request('POST', AGV_STATUS_API_ENDPOINT, data=data)