#
Junjie
4 天以前 0ecd4a0ec8c4c5585cbd8975d7786c5618814381
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse, FileResponse
from pydantic import BaseModel
from typing import Optional, List
import os
import uvicorn
from camera_manager import HikCameraManager
 
# 创建FastAPI应用
app = FastAPI(
    title="海康相机API服务",
    description="通过HTTP接口控制海康工业相机拍照",
    version="1.0.0"
)
 
# 全局相机管理器
camera_manager = HikCameraManager()
# 存储地址
storageAddress = "D:\\pic\\"
 
class CaptureRequest(BaseModel):
    """拍照请求参数"""
    ip: str
    filename: str
    save_bmp: bool = True
    save_jpg: bool = True
    timeout: int = 10000
 
 
class CaptureResponse(BaseModel):
    """拍照响应"""
    success: bool
    message: str
    files: List[str] = []
    camera_info: dict = {}
 
 
@app.on_event("startup")
async def startup_event():
    """启动时初始化SDK"""
    try:
        camera_manager.initialize()
        print("相机API服务启动成功")
    except Exception as e:
        print(f"初始化失败: {e}")
 
 
@app.on_event("shutdown")
async def shutdown_event():
    """关闭时清理资源"""
    try:
        camera_manager.finalize()
        print("相机API服务已关闭")
    except Exception as e:
        print(f"清理资源失败: {e}")
 
 
@app.get("/")
async def root():
    """根路径"""
    return {
        "service": "海康相机API服务",
        "version": "1.0.0",
        "endpoints": {
            "GET /cameras": "列出所有相机",
            "POST /capture": "拍照(JSON body)",
            "GET /capture": "拍照(URL参数)",
            "GET /image/{filename}": "下载图片"
        }
    }
 
 
@app.get("/cameras")
async def list_cameras():
    """
    列出所有可用相机
    
    返回:
        相机列表及详细信息
    """
    try:
        cameras = camera_manager.list_cameras()
        
        # 格式化返回数据
        camera_list = []
        for cam in cameras:
            camera_list.append({
                "index": cam['index'],
                "type": cam['type'],
                "model": cam['model'],
                "serial": cam['serial'],
                "ip": cam['ip'] if cam['ip'] else None,
                "user_defined_name": cam['user_defined_name'] if cam['user_defined_name'] else None
            })
        
        return {
            "success": True,
            "count": len(camera_list),
            "cameras": camera_list
        }
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"获取相机列表失败: {str(e)}")
 
 
@app.post("/capture")
async def capture_image_post(request: CaptureRequest):
    """
    拍照接口(POST方式)
    
    参数:
        - ip: 相机IP地址
        - filename: 保存的文件名(不含扩展名)
        - save_bmp: 是否保存BMP格式(默认true)
        - save_jpg: 是否保存JPG格式(默认true)
        - timeout: 超时时间(毫秒,默认3000)
    
    返回:
        拍照结果和保存的文件列表
    """
    try:
        # 打开相机
        camera_info = camera_manager.open_camera(ip=request.ip)
        
        # 拍照
        saved_files = camera_manager.capture_image(
            filename=request.filename,
            storageAddress=storageAddress,
            save_bmp=request.save_bmp,
            save_jpg=request.save_jpg,
            timeout=request.timeout
        )
        
        # 关闭相机
        camera_manager.close_camera()
        
        return CaptureResponse(
            success=True,
            message="拍照成功",
            files=saved_files,
            camera_info={
                "type": camera_info['type'],
                "model": camera_info['model'],
                "serial": camera_info['serial'],
                "ip": camera_info['ip']
            }
        )
    
    except Exception as e:
        # 确保关闭相机
        try:
            camera_manager.close_camera()
        except:
            pass
        
        raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}")
 
 
@app.get("/capture")
async def capture_image_get(
    ip: str = Query(..., description="相机IP地址"),
    filename: str = Query(..., description="保存的文件名(不含扩展名)"),
    save_bmp: bool = Query(True, description="是否保存BMP格式"),
    save_jpg: bool = Query(True, description="是否保存JPG格式"),
    timeout: int = Query(3000, description="超时时间(毫秒)")
):
    """
    拍照接口(GET方式)
    
    参数:
        - ip: 相机IP地址
        - filename: 保存的文件名(不含扩展名)
        - save_bmp: 是否保存BMP格式(默认true)
        - save_jpg: 是否保存JPG格式(默认true)
        - timeout: 超时时间(毫秒,默认3000)
    
    示例:
        GET /capture?ip=192.168.1.100&filename=test_image
    """
    request = CaptureRequest(
        ip=ip,
        filename=filename,
        save_bmp=save_bmp,
        save_jpg=save_jpg,
        timeout=timeout
    )
    return await capture_image_post(request)
 
 
@app.get("/capture_by_serial")
async def capture_by_serial(
    serial: str = Query(..., description="相机序列号"),
    filename: str = Query(..., description="保存的文件名(不含扩展名)"),
    save_bmp: bool = Query(True, description="是否保存BMP格式"),
    save_jpg: bool = Query(True, description="是否保存JPG格式"),
    timeout: int = Query(3000, description="超时时间(毫秒)")
):
    """
    通过序列号拍照
    
    示例:
        GET /capture_by_serial?serial=00J12345678&filename=test_image
    """
    try:
        camera_info = camera_manager.open_camera(serial=serial)
        
        saved_files = camera_manager.capture_image(
            filename=filename,
            save_bmp=save_bmp,
            save_jpg=save_jpg,
            timeout=timeout
        )
        
        camera_manager.close_camera()
        
        return CaptureResponse(
            success=True,
            message="拍照成功",
            files=saved_files,
            camera_info={
                "type": camera_info['type'],
                "model": camera_info['model'],
                "serial": camera_info['serial'],
                "ip": camera_info.get('ip', '')
            }
        )
    
    except Exception as e:
        try:
            camera_manager.close_camera()
        except:
            pass
        raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}")
 
 
@app.get("/capture_by_index")
async def capture_by_index(
    index: int = Query(0, description="相机索引"),
    filename: str = Query(..., description="保存的文件名(不含扩展名)"),
    save_bmp: bool = Query(True, description="是否保存BMP格式"),
    save_jpg: bool = Query(True, description="是否保存JPG格式"),
    timeout: int = Query(3000, description="超时时间(毫秒)")
):
    """
    通过索引拍照
    
    示例:
        GET /capture_by_index?index=0&filename=test_image
    """
    try:
        camera_info = camera_manager.open_camera(index=index)
        
        saved_files = camera_manager.capture_image(
            filename=filename,
            storageAddress=storageAddress,
            save_bmp=save_bmp,
            save_jpg=save_jpg,
            timeout=timeout
        )
        
        camera_manager.close_camera()
        
        return CaptureResponse(
            success=True,
            message="拍照成功",
            files=saved_files,
            camera_info={
                "type": camera_info['type'],
                "model": camera_info['model'],
                "serial": camera_info['serial'],
                "ip": camera_info.get('ip', '')
            }
        )
    
    except Exception as e:
        try:
            camera_manager.close_camera()
        except:
            pass
        raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}")
 
 
@app.get("/image/{filename}")
async def get_image(filename: str):
    """
    下载图片
    
    参数:
        filename: 文件名(含扩展名)
    
    示例:
        GET /image/test_image.jpg
    """
 
    filepath = storageAddress + filename
    if not os.path.exists(filepath):
        raise HTTPException(status_code=404, detail="文件不存在")
    
    return FileResponse(
        filepath,
        media_type="application/octet-stream",
        filename=filename
    )
 
 
@app.get("/health")
async def health_check():
    """健康检查"""
    return {
        "status": "healthy",
        "service": "camera-api",
        "sdk_initialized": camera_manager.is_initialized
    }
 
 
if __name__ == "__main__":
    # 启动服务
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        log_level="info"
    )