#!/usr/bin/env python
|
# -*- coding: utf-8 -*-
|
"""
|
海康工业相机管理类 - 支持多相机选择和操作
|
"""
|
import time
|
from MvImport.MvCameraControl_class import *
|
|
|
class HikCameraManager:
|
"""海康相机管理器"""
|
|
def __init__(self):
|
self.cam = None
|
self.is_initialized = False
|
|
def initialize(self):
|
"""初始化SDK"""
|
ret = MvCamera.MV_CC_Initialize()
|
if ret != 0:
|
raise Exception(f"初始化SDK失败! 错误码: {hex(ret)}")
|
self.is_initialized = True
|
print("SDK初始化成功")
|
|
def list_cameras(self):
|
"""
|
列出所有可用相机
|
返回: 相机信息列表
|
"""
|
if not self.is_initialized:
|
self.initialize()
|
|
deviceList = MV_CC_DEVICE_INFO_LIST()
|
ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, deviceList)
|
|
if ret != 0:
|
raise Exception(f"枚举设备失败! 错误码: {hex(ret)}")
|
|
if deviceList.nDeviceNum == 0:
|
print("未找到相机设备")
|
return []
|
|
cameras = []
|
for i in range(deviceList.nDeviceNum):
|
mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
|
|
camera_info = {
|
'index': i,
|
'device_info': mvcc_dev_info,
|
'type': None,
|
'model': '',
|
'serial': '',
|
'ip': '',
|
'user_defined_name': ''
|
}
|
|
if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
|
camera_info['type'] = 'GigE'
|
|
# 型号
|
model_name = ""
|
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
|
if per == 0:
|
break
|
model_name += chr(per)
|
camera_info['model'] = model_name.strip()
|
|
# 序列号
|
serial = ""
|
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chSerialNumber:
|
if per == 0:
|
break
|
serial += chr(per)
|
camera_info['serial'] = serial.strip()
|
|
# IP地址
|
nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
|
nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
|
nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
|
nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
|
camera_info['ip'] = f"{nip1}.{nip2}.{nip3}.{nip4}"
|
|
# 用户自定义名称
|
user_name = ""
|
for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName:
|
if per == 0:
|
break
|
user_name += chr(per)
|
camera_info['user_defined_name'] = user_name.strip()
|
|
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
|
camera_info['type'] = 'USB'
|
|
# 型号
|
model_name = ""
|
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
|
if per == 0:
|
break
|
model_name += chr(per)
|
camera_info['model'] = model_name.strip()
|
|
# 序列号
|
serial = ""
|
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
|
if per == 0:
|
break
|
serial += chr(per)
|
camera_info['serial'] = serial.strip()
|
|
# 用户自定义名称
|
user_name = ""
|
for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName:
|
if per == 0:
|
break
|
user_name += chr(per)
|
camera_info['user_defined_name'] = user_name.strip()
|
|
cameras.append(camera_info)
|
|
return cameras
|
|
def print_cameras(self):
|
"""打印所有相机信息"""
|
cameras = self.list_cameras()
|
|
if not cameras:
|
print("未找到相机")
|
return
|
|
print(f"\n找到 {len(cameras)} 个相机:")
|
print("=" * 80)
|
|
for cam in cameras:
|
print(f"\n[{cam['index']}] {cam['type']} 相机")
|
print(f" 型号: {cam['model']}")
|
print(f" 序列号: {cam['serial']}")
|
if cam['ip']:
|
print(f" IP地址: {cam['ip']}")
|
if cam['user_defined_name']:
|
print(f" 自定义名称: {cam['user_defined_name']}")
|
|
print("=" * 80)
|
|
def open_camera(self, index=None, serial=None, ip=None, model=None):
|
"""
|
打开指定相机
|
|
参数:
|
index: 相机索引(0, 1, 2...)
|
serial: 相机序列号
|
ip: 相机IP地址(仅GigE相机)
|
model: 相机型号
|
|
优先级: serial > ip > model > index
|
"""
|
cameras = self.list_cameras()
|
|
if not cameras:
|
raise Exception("未找到相机设备")
|
|
# 根据条件选择相机
|
selected_camera = None
|
|
if serial:
|
# 按序列号查找
|
for cam in cameras:
|
if cam['serial'] == serial:
|
selected_camera = cam
|
print(f"通过序列号选择相机: {serial}")
|
break
|
if not selected_camera:
|
raise Exception(f"未找到序列号为 {serial} 的相机")
|
|
elif ip:
|
# 按IP地址查找(仅GigE)
|
for cam in cameras:
|
if cam['ip'] == ip:
|
selected_camera = cam
|
print(f"通过IP地址选择相机: {ip}")
|
break
|
if not selected_camera:
|
raise Exception(f"未找到IP地址为 {ip} 的相机")
|
|
elif model:
|
# 按型号查找
|
for cam in cameras:
|
if model in cam['model']:
|
selected_camera = cam
|
print(f"通过型号选择相机: {model}")
|
break
|
if not selected_camera:
|
raise Exception(f"未找到型号包含 {model} 的相机")
|
|
else:
|
# 按索引选择(默认第一个)
|
if index is None:
|
index = 0
|
if index >= len(cameras):
|
raise Exception(f"相机索引 {index} 超出范围(共 {len(cameras)} 个相机)")
|
selected_camera = cameras[index]
|
print(f"通过索引选择相机: {index}")
|
|
# 打印选中的相机信息
|
print(f"选中相机: [{selected_camera['type']}] {selected_camera['model']} (序列号: {selected_camera['serial']})")
|
|
# 创建句柄
|
self.cam = MvCamera()
|
ret = self.cam.MV_CC_CreateHandle(selected_camera['device_info'])
|
if ret != 0:
|
raise Exception(f"创建句柄失败! 错误码: {hex(ret)}")
|
|
# 打开设备
|
ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
|
if ret != 0:
|
# 尝试控制模式
|
ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Control, 0)
|
if ret != 0:
|
self.cam.MV_CC_DestroyHandle()
|
raise Exception(f"打开设备失败! 错误码: {hex(ret)}")
|
|
print("相机打开成功")
|
return selected_camera
|
|
def capture_image(self, filename="captured_image", storageAddress = "D:pic/", save_bmp=True, save_jpg=True, timeout=3000):
|
"""
|
采集一帧图像
|
|
参数:
|
filename: 文件名(不含扩展名)
|
save_bmp: 是否保存BMP格式
|
save_jpg: 是否保存JPG格式
|
timeout: 超时时间(毫秒)
|
"""
|
if not self.cam:
|
raise Exception("相机未打开")
|
|
# 设置触发模式为连续采集
|
ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
|
if ret != 0:
|
print(f"警告: 设置触发模式失败! 错误码: {hex(ret)}")
|
|
# 开始取流
|
ret = self.cam.MV_CC_StartGrabbing()
|
if ret != 0:
|
raise Exception(f"开始取流失败! 错误码: {hex(ret)}")
|
|
print("开始采集...")
|
time.sleep(1) # 等待相机稳定
|
|
# 获取图像
|
stFrameInfo = MV_FRAME_OUT()
|
memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
|
|
ret = self.cam.MV_CC_GetImageBuffer(stFrameInfo, timeout)
|
if ret != 0:
|
self.cam.MV_CC_StopGrabbing()
|
raise Exception(f"获取图像失败! 错误码: {hex(ret)}")
|
|
print(f"成功获取图像: {stFrameInfo.stFrameInfo.nWidth}x{stFrameInfo.stFrameInfo.nHeight}")
|
|
# 转换并保存图像
|
saved_files = []
|
|
if save_bmp:
|
bmp_file = f"{storageAddress}{filename}.bmp"
|
if self._save_image(stFrameInfo, bmp_file, MV_Image_Bmp):
|
saved_files.append(bmp_file)
|
|
if save_jpg:
|
jpg_file = f"{storageAddress}{filename}.jpg"
|
if self._save_image(stFrameInfo, jpg_file, MV_Image_Jpeg):
|
saved_files.append(jpg_file)
|
|
# 释放图像缓存
|
self.cam.MV_CC_FreeImageBuffer(stFrameInfo)
|
|
# 停止取流
|
self.cam.MV_CC_StopGrabbing()
|
|
return saved_files
|
|
def _save_image(self, stFrameInfo, filename, image_type):
|
"""内部方法:保存图像"""
|
stConvertParam = MV_SAVE_IMAGE_PARAM_EX()
|
memset(byref(stConvertParam), 0, sizeof(stConvertParam))
|
|
nBufSize = stFrameInfo.stFrameInfo.nWidth * stFrameInfo.stFrameInfo.nHeight * 3 + 2048
|
pBuf = (c_ubyte * nBufSize)()
|
|
stConvertParam.nWidth = stFrameInfo.stFrameInfo.nWidth
|
stConvertParam.nHeight = stFrameInfo.stFrameInfo.nHeight
|
stConvertParam.pData = stFrameInfo.pBufAddr
|
stConvertParam.nDataLen = stFrameInfo.stFrameInfo.nFrameLen
|
stConvertParam.enPixelType = stFrameInfo.stFrameInfo.enPixelType
|
stConvertParam.pImageBuffer = cast(pBuf, POINTER(c_ubyte))
|
stConvertParam.nBufferSize = nBufSize
|
stConvertParam.enImageType = image_type
|
stConvertParam.nJpgQuality = 90
|
|
ret = self.cam.MV_CC_SaveImageEx2(stConvertParam)
|
if ret != 0:
|
print(f"图像转换失败! 错误码: {hex(ret)}")
|
return False
|
|
try:
|
with open(filename, "wb") as f:
|
f.write(bytearray(pBuf[0:stConvertParam.nImageLen]))
|
print(f"✓ 图像已保存: {filename}")
|
return True
|
except Exception as e:
|
print(f"保存文件失败: {e}")
|
return False
|
|
def close_camera(self):
|
"""关闭相机"""
|
if self.cam:
|
self.cam.MV_CC_CloseDevice()
|
self.cam.MV_CC_DestroyHandle()
|
self.cam = None
|
print("相机已关闭")
|
|
def finalize(self):
|
"""清理SDK资源"""
|
self.close_camera()
|
if self.is_initialized:
|
MvCamera.MV_CC_Finalize()
|
self.is_initialized = False
|
print("SDK已清理")
|
|
def __enter__(self):
|
"""支持with语句"""
|
self.initialize()
|
return self
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
"""支持with语句"""
|
self.finalize()
|
|
|
# 使用示例
|
if __name__ == "__main__":
|
try:
|
with HikCameraManager() as manager:
|
# 1. 列出所有相机
|
manager.print_cameras()
|
|
# 2. 选择相机的几种方式:
|
|
# 方式1: 通过索引选择(默认第一个)
|
# manager.open_camera(index=0)
|
|
# 方式2: 通过序列号选择
|
# manager.open_camera(serial="DA7598570")
|
|
# 方式3: 通过IP地址选择(GigE相机)
|
manager.open_camera(ip="192.168.4.22")
|
|
# 方式4: 通过型号选择
|
# manager.open_camera(model="MV-CS200")
|
|
# 3. 采集图像
|
saved_files = manager.capture_image("test")
|
print(f"\n保存的文件: {saved_files}")
|
|
except Exception as e:
|
print(f"错误: {e}")
|
import traceback
|
traceback.print_exc()
|