from fastapi import FastAPI, HTTPException, Query
|
from fastapi.responses import JSONResponse, FileResponse
|
from pydantic import BaseModel
|
from typing import Optional, List
|
import os
|
import uvicorn
|
from camera_manager import HikCameraManager
|
|
# 创建FastAPI应用
|
app = FastAPI(
|
title="海康相机API服务",
|
description="通过HTTP接口控制海康工业相机拍照",
|
version="1.0.0"
|
)
|
|
# 全局相机管理器
|
camera_manager = HikCameraManager()
|
# 存储地址
|
storageAddress = "D:\\pic\\"
|
|
class CaptureRequest(BaseModel):
|
"""拍照请求参数"""
|
ip: str
|
filename: str
|
save_bmp: bool = True
|
save_jpg: bool = True
|
timeout: int = 10000
|
|
|
class CaptureResponse(BaseModel):
|
"""拍照响应"""
|
success: bool
|
message: str
|
files: List[str] = []
|
camera_info: dict = {}
|
|
|
@app.on_event("startup")
|
async def startup_event():
|
"""启动时初始化SDK"""
|
try:
|
camera_manager.initialize()
|
print("相机API服务启动成功")
|
except Exception as e:
|
print(f"初始化失败: {e}")
|
|
|
@app.on_event("shutdown")
|
async def shutdown_event():
|
"""关闭时清理资源"""
|
try:
|
camera_manager.finalize()
|
print("相机API服务已关闭")
|
except Exception as e:
|
print(f"清理资源失败: {e}")
|
|
|
@app.get("/")
|
async def root():
|
"""根路径"""
|
return {
|
"service": "海康相机API服务",
|
"version": "1.0.0",
|
"endpoints": {
|
"GET /cameras": "列出所有相机",
|
"POST /capture": "拍照(JSON body)",
|
"GET /capture": "拍照(URL参数)",
|
"GET /image/{filename}": "下载图片"
|
}
|
}
|
|
|
@app.get("/cameras")
|
async def list_cameras():
|
"""
|
列出所有可用相机
|
|
返回:
|
相机列表及详细信息
|
"""
|
try:
|
cameras = camera_manager.list_cameras()
|
|
# 格式化返回数据
|
camera_list = []
|
for cam in cameras:
|
camera_list.append({
|
"index": cam['index'],
|
"type": cam['type'],
|
"model": cam['model'],
|
"serial": cam['serial'],
|
"ip": cam['ip'] if cam['ip'] else None,
|
"user_defined_name": cam['user_defined_name'] if cam['user_defined_name'] else None
|
})
|
|
return {
|
"success": True,
|
"count": len(camera_list),
|
"cameras": camera_list
|
}
|
|
except Exception as e:
|
raise HTTPException(status_code=500, detail=f"获取相机列表失败: {str(e)}")
|
|
|
@app.post("/capture")
|
async def capture_image_post(request: CaptureRequest):
|
"""
|
拍照接口(POST方式)
|
|
参数:
|
- ip: 相机IP地址
|
- filename: 保存的文件名(不含扩展名)
|
- save_bmp: 是否保存BMP格式(默认true)
|
- save_jpg: 是否保存JPG格式(默认true)
|
- timeout: 超时时间(毫秒,默认3000)
|
|
返回:
|
拍照结果和保存的文件列表
|
"""
|
try:
|
# 打开相机
|
camera_info = camera_manager.open_camera(ip=request.ip)
|
|
# 拍照
|
saved_files = camera_manager.capture_image(
|
filename=request.filename,
|
storageAddress=storageAddress,
|
save_bmp=request.save_bmp,
|
save_jpg=request.save_jpg,
|
timeout=request.timeout
|
)
|
|
# 关闭相机
|
camera_manager.close_camera()
|
|
return CaptureResponse(
|
success=True,
|
message="拍照成功",
|
files=saved_files,
|
camera_info={
|
"type": camera_info['type'],
|
"model": camera_info['model'],
|
"serial": camera_info['serial'],
|
"ip": camera_info['ip']
|
}
|
)
|
|
except Exception as e:
|
# 确保关闭相机
|
try:
|
camera_manager.close_camera()
|
except:
|
pass
|
|
raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}")
|
|
|
@app.get("/capture")
|
async def capture_image_get(
|
ip: str = Query(..., description="相机IP地址"),
|
filename: str = Query(..., description="保存的文件名(不含扩展名)"),
|
save_bmp: bool = Query(True, description="是否保存BMP格式"),
|
save_jpg: bool = Query(True, description="是否保存JPG格式"),
|
timeout: int = Query(3000, description="超时时间(毫秒)")
|
):
|
"""
|
拍照接口(GET方式)
|
|
参数:
|
- ip: 相机IP地址
|
- filename: 保存的文件名(不含扩展名)
|
- save_bmp: 是否保存BMP格式(默认true)
|
- save_jpg: 是否保存JPG格式(默认true)
|
- timeout: 超时时间(毫秒,默认3000)
|
|
示例:
|
GET /capture?ip=192.168.1.100&filename=test_image
|
"""
|
request = CaptureRequest(
|
ip=ip,
|
filename=filename,
|
save_bmp=save_bmp,
|
save_jpg=save_jpg,
|
timeout=timeout
|
)
|
return await capture_image_post(request)
|
|
|
@app.get("/capture_by_serial")
|
async def capture_by_serial(
|
serial: str = Query(..., description="相机序列号"),
|
filename: str = Query(..., description="保存的文件名(不含扩展名)"),
|
save_bmp: bool = Query(True, description="是否保存BMP格式"),
|
save_jpg: bool = Query(True, description="是否保存JPG格式"),
|
timeout: int = Query(3000, description="超时时间(毫秒)")
|
):
|
"""
|
通过序列号拍照
|
|
示例:
|
GET /capture_by_serial?serial=00J12345678&filename=test_image
|
"""
|
try:
|
camera_info = camera_manager.open_camera(serial=serial)
|
|
saved_files = camera_manager.capture_image(
|
filename=filename,
|
save_bmp=save_bmp,
|
save_jpg=save_jpg,
|
timeout=timeout
|
)
|
|
camera_manager.close_camera()
|
|
return CaptureResponse(
|
success=True,
|
message="拍照成功",
|
files=saved_files,
|
camera_info={
|
"type": camera_info['type'],
|
"model": camera_info['model'],
|
"serial": camera_info['serial'],
|
"ip": camera_info.get('ip', '')
|
}
|
)
|
|
except Exception as e:
|
try:
|
camera_manager.close_camera()
|
except:
|
pass
|
raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}")
|
|
|
@app.get("/capture_by_index")
|
async def capture_by_index(
|
index: int = Query(0, description="相机索引"),
|
filename: str = Query(..., description="保存的文件名(不含扩展名)"),
|
save_bmp: bool = Query(True, description="是否保存BMP格式"),
|
save_jpg: bool = Query(True, description="是否保存JPG格式"),
|
timeout: int = Query(3000, description="超时时间(毫秒)")
|
):
|
"""
|
通过索引拍照
|
|
示例:
|
GET /capture_by_index?index=0&filename=test_image
|
"""
|
try:
|
camera_info = camera_manager.open_camera(index=index)
|
|
saved_files = camera_manager.capture_image(
|
filename=filename,
|
storageAddress=storageAddress,
|
save_bmp=save_bmp,
|
save_jpg=save_jpg,
|
timeout=timeout
|
)
|
|
camera_manager.close_camera()
|
|
return CaptureResponse(
|
success=True,
|
message="拍照成功",
|
files=saved_files,
|
camera_info={
|
"type": camera_info['type'],
|
"model": camera_info['model'],
|
"serial": camera_info['serial'],
|
"ip": camera_info.get('ip', '')
|
}
|
)
|
|
except Exception as e:
|
try:
|
camera_manager.close_camera()
|
except:
|
pass
|
raise HTTPException(status_code=500, detail=f"拍照失败: {str(e)}")
|
|
|
@app.get("/image/{filename}")
|
async def get_image(filename: str):
|
"""
|
下载图片
|
|
参数:
|
filename: 文件名(含扩展名)
|
|
示例:
|
GET /image/test_image.jpg
|
"""
|
|
filepath = storageAddress + filename
|
if not os.path.exists(filepath):
|
raise HTTPException(status_code=404, detail="文件不存在")
|
|
return FileResponse(
|
filepath,
|
media_type="application/octet-stream",
|
filename=filename
|
)
|
|
|
@app.get("/health")
|
async def health_check():
|
"""健康检查"""
|
return {
|
"status": "healthy",
|
"service": "camera-api",
|
"sdk_initialized": camera_manager.is_initialized
|
}
|
|
|
if __name__ == "__main__":
|
# 启动服务
|
uvicorn.run(
|
app,
|
host="0.0.0.0",
|
port=8000,
|
log_level="info"
|
)
|