zhang
2 天以前 2fa19599467263dcf582bb12906e03328e03b4a4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
与RCS系统通信
"""
import requests
import json
import logging
import time
from typing import Dict, Optional, Any
from .data_models import (
    APIResponse, create_error_response, ResponseCode
)
 
try:
    from config.settings import (
        RCS_SERVER_HOST, RCS_SERVER_PORT, 
        REQUEST_TIMEOUT, AGV_STATUS_API_ENDPOINT
    )
except ImportError:
    RCS_SERVER_HOST = "10.10.10.156"
    RCS_SERVER_PORT = 8088
    REQUEST_TIMEOUT = 30
    AGV_STATUS_API_ENDPOINT = "/api/open/algorithm/getAgv"
    logging.warning("无法从config.settings导入配置,使用默认值")
 
 
class APIClient:
    """HTTP API客户端基类"""
    
    def __init__(self, base_url: str, timeout: int = 30):
        """初始化API客户端"""
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.logger = logging.getLogger(__name__)
        self.session = requests.Session()
 
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })
    
    def _make_request(self, method: str, endpoint: str, data: Any = None, 
                     params: Dict = None) -> APIResponse:
        """发送HTTP请求"""
        url = f"{self.base_url}{endpoint}"
        
        try:
            start_time = time.time()
 
            request_kwargs = {
                'timeout': self.timeout,
                'params': params
            }
            
            if data is not None:
                if isinstance(data, (dict, list)):
                    request_kwargs['json'] = data
                else:
                    request_kwargs['data'] = json.dumps(data)
 
            response = self.session.request(method, url, **request_kwargs)
            
            end_time = time.time()
            duration = (end_time - start_time) * 1000  # 转换为毫秒
            
            self.logger.info(f"{method} {url} - {response.status_code} - {duration:.2f}ms")
 
            try:
                response_data = response.json()
            except ValueError:
                response_data = {"text": response.text}
 
            if response.status_code == 200:
                if isinstance(response_data, dict) and 'code' in response_data:
                    return APIResponse(
                        code=response_data.get('code', ResponseCode.SUCCESS),
                        msg=response_data.get('msg', '操作成功'),
                        data=response_data.get('data')
                    )
                else:
                    return APIResponse(
                        code=ResponseCode.SUCCESS,
                        msg='操作成功',
                        data=response_data
                    )
            else:
                return create_error_response(
                    ResponseCode.SERVER_ERROR,
                    f"HTTP {response.status_code}: {response.text}"
                )
                
        except requests.exceptions.Timeout:
            self.logger.error(f"请求超时: {url}")
            return create_error_response(ResponseCode.SERVER_ERROR, "请求超时")
            
        except requests.exceptions.ConnectionError:
            self.logger.error(f"连接错误: {url}")
            return create_error_response(ResponseCode.SERVER_ERROR, "连接错误")
            
        except Exception as e:
            self.logger.error(f"请求异常: {url} - {str(e)}")
            return create_error_response(ResponseCode.SERVER_ERROR, f"请求异常: {str(e)}")
 
 
class RCSAPIClient(APIClient):
    
    def __init__(self, rcs_host: str = None, rcs_port: int = None, timeout: int = None):
        """初始化RCS API客户端"""
        rcs_host = rcs_host or RCS_SERVER_HOST
        rcs_port = rcs_port or RCS_SERVER_PORT
        timeout = timeout or REQUEST_TIMEOUT
        
        base_url = f"http://{rcs_host}:{rcs_port}"
        super().__init__(base_url, timeout)
    
    def get_agv_status(self, agv_id: Optional[str] = None, 
                      map_id: Optional[str] = None) -> APIResponse:
        """获取AGV状态"""
        data = {}
        if agv_id:
            data['agvId'] = agv_id
        if map_id:
            data['mapId'] = map_id
            
        return self._make_request('POST', AGV_STATUS_API_ENDPOINT, data=data)