from fastapi import FastAPI, HTTPException, Query from fastapi.responses import JSONResponse, FileResponse from pydantic import BaseModel from typing import Optional, List import os import uvicorn from camera_manager import HikCameraManager # 创建FastAPI应用 app = FastAPI( title="海康相机API服务", description="通过HTTP接口控制海康工业相机拍照", version="1.0.0" ) # 全局相机管理器 camera_manager = HikCameraManager() # 存储地址 storageAddress = "D:\\pic\\" class CaptureRequest(BaseModel): """拍照请求参数""" ip: str filename: str save_bmp: bool = True save_jpg: bool = True timeout: int = 10000 class CaptureResponse(BaseModel): """拍照响应""" success: bool message: str files: List[str] = [] camera_info: dict = {} @app.on_event("startup") async def startup_event(): """启动时初始化SDK""" try: camera_manager.initialize() print("相机API服务启动成功") except Exception as e: print(f"初始化失败: {e}") @app.on_event("shutdown") async def shutdown_event(): """关闭时清理资源""" try: camera_manager.finalize() print("相机API服务已关闭") except Exception as e: print(f"清理资源失败: {e}") @app.get("/") async def root(): """根路径""" return { "service": "海康相机API服务", "version": "1.0.0", "endpoints": { "GET /cameras": "列出所有相机", "POST /capture": "拍照(JSON body)", "GET /capture": "拍照(URL参数)", "GET /image/{filename}": "下载图片" } } @app.get("/cameras") async def list_cameras(): """ 列出所有可用相机 返回: 相机列表及详细信息 """ try: cameras = camera_manager.list_cameras() # 格式化返回数据 camera_list = [] for cam in cameras: camera_list.append({ "index": cam['index'], "type": cam['type'], "model": cam['model'], "serial": cam['serial'], "ip": cam['ip'] if cam['ip'] else None, "user_defined_name": cam['user_defined_name'] if cam['user_defined_name'] else None }) return { "success": True, "count": len(camera_list), "cameras": camera_list } except Exception as e: raise HTTPException(status_code=500, detail=f"获取相机列表失败: {str(e)}") @app.post("/capture") async def capture_image_post(request: CaptureRequest): """ 拍照接口(POST方式) 参数: - ip: 相机IP地址 - filename: 保存的文件名(不含扩展名) - save_bmp: 是否保存BMP格式(默认true) - save_jpg: 是否保存JPG格式(默认true) - timeout: 超时时间(毫秒,默认3000) 返回: 拍照结果和保存的文件列表 """ try: # 打开相机 camera_info = camera_manager.open_camera(ip=request.ip) # 拍照 saved_files = camera_manager.capture_image( filename=request.filename, storageAddress=storageAddress, save_bmp=request.save_bmp, save_jpg=request.save_jpg, timeout=request.timeout ) # 关闭相机 camera_manager.close_camera() return CaptureResponse( success=True, message="拍照成功", files=saved_files, camera_info={ "type": camera_info['type'], "model": camera_info['model'], "serial": camera_info['serial'], "ip": camera_info['ip'] } ) except Exception as e: # 确保关闭相机 try: camera_manager.close_camera() except: pass raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}") @app.get("/capture") async def capture_image_get( ip: str = Query(..., description="相机IP地址"), filename: str = Query(..., description="保存的文件名(不含扩展名)"), save_bmp: bool = Query(True, description="是否保存BMP格式"), save_jpg: bool = Query(True, description="是否保存JPG格式"), timeout: int = Query(3000, description="超时时间(毫秒)") ): """ 拍照接口(GET方式) 参数: - ip: 相机IP地址 - filename: 保存的文件名(不含扩展名) - save_bmp: 是否保存BMP格式(默认true) - save_jpg: 是否保存JPG格式(默认true) - timeout: 超时时间(毫秒,默认3000) 示例: GET /capture?ip=192.168.1.100&filename=test_image """ request = CaptureRequest( ip=ip, filename=filename, save_bmp=save_bmp, save_jpg=save_jpg, timeout=timeout ) return await capture_image_post(request) @app.get("/capture_by_serial") async def capture_by_serial( serial: str = Query(..., description="相机序列号"), filename: str = Query(..., description="保存的文件名(不含扩展名)"), save_bmp: bool = Query(True, description="是否保存BMP格式"), save_jpg: bool = Query(True, description="是否保存JPG格式"), timeout: int = Query(3000, description="超时时间(毫秒)") ): """ 通过序列号拍照 示例: GET /capture_by_serial?serial=00J12345678&filename=test_image """ try: camera_info = camera_manager.open_camera(serial=serial) saved_files = camera_manager.capture_image( filename=filename, save_bmp=save_bmp, save_jpg=save_jpg, timeout=timeout ) camera_manager.close_camera() return CaptureResponse( success=True, message="拍照成功", files=saved_files, camera_info={ "type": camera_info['type'], "model": camera_info['model'], "serial": camera_info['serial'], "ip": camera_info.get('ip', '') } ) except Exception as e: try: camera_manager.close_camera() except: pass raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}") @app.get("/capture_by_index") async def capture_by_index( index: int = Query(0, description="相机索引"), filename: str = Query(..., description="保存的文件名(不含扩展名)"), save_bmp: bool = Query(True, description="是否保存BMP格式"), save_jpg: bool = Query(True, description="是否保存JPG格式"), timeout: int = Query(3000, description="超时时间(毫秒)") ): """ 通过索引拍照 示例: GET /capture_by_index?index=0&filename=test_image """ try: camera_info = camera_manager.open_camera(index=index) saved_files = camera_manager.capture_image( filename=filename, storageAddress=storageAddress, save_bmp=save_bmp, save_jpg=save_jpg, timeout=timeout ) camera_manager.close_camera() return CaptureResponse( success=True, message="拍照成功", files=saved_files, camera_info={ "type": camera_info['type'], "model": camera_info['model'], "serial": camera_info['serial'], "ip": camera_info.get('ip', '') } ) except Exception as e: try: camera_manager.close_camera() except: pass raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}") @app.get("/image/{filename}") async def get_image(filename: str): """ 下载图片 参数: filename: 文件名(含扩展名) 示例: GET /image/test_image.jpg """ filepath = storageAddress + filename if not os.path.exists(filepath): raise HTTPException(status_code=404, detail="文件不存在") return FileResponse( filepath, media_type="application/octet-stream", filename=filename ) @app.get("/health") async def health_check(): """健康检查""" return { "status": "healthy", "service": "camera-api", "sdk_initialized": camera_manager.is_initialized } if __name__ == "__main__": # 启动服务 uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info" )