#!/usr/bin/env python # -*- coding: utf-8 -*- """ 海康工业相机管理类 - 支持多相机选择和操作 """ import time from MvImport.MvCameraControl_class import * class HikCameraManager: """海康相机管理器""" def __init__(self): self.cam = None self.is_initialized = False def initialize(self): """初始化SDK""" ret = MvCamera.MV_CC_Initialize() if ret != 0: raise Exception(f"初始化SDK失败! 错误码: {hex(ret)}") self.is_initialized = True print("SDK初始化成功") def list_cameras(self): """ 列出所有可用相机 返回: 相机信息列表 """ if not self.is_initialized: self.initialize() deviceList = MV_CC_DEVICE_INFO_LIST() ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, deviceList) if ret != 0: raise Exception(f"枚举设备失败! 错误码: {hex(ret)}") if deviceList.nDeviceNum == 0: print("未找到相机设备") return [] cameras = [] for i in range(deviceList.nDeviceNum): mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents camera_info = { 'index': i, 'device_info': mvcc_dev_info, 'type': None, 'model': '', 'serial': '', 'ip': '', 'user_defined_name': '' } if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE: camera_info['type'] = 'GigE' # 型号 model_name = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName: if per == 0: break model_name += chr(per) camera_info['model'] = model_name.strip() # 序列号 serial = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chSerialNumber: if per == 0: break serial += chr(per) camera_info['serial'] = serial.strip() # IP地址 nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24) nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16) nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8) nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff) camera_info['ip'] = f"{nip1}.{nip2}.{nip3}.{nip4}" # 用户自定义名称 user_name = "" for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName: if per == 0: break user_name += chr(per) camera_info['user_defined_name'] = user_name.strip() elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE: camera_info['type'] = 'USB' # 型号 model_name = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName: if per == 0: break model_name += chr(per) camera_info['model'] = model_name.strip() # 序列号 serial = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber: if per == 0: break serial += chr(per) camera_info['serial'] = serial.strip() # 用户自定义名称 user_name = "" for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName: if per == 0: break user_name += chr(per) camera_info['user_defined_name'] = user_name.strip() cameras.append(camera_info) return cameras def print_cameras(self): """打印所有相机信息""" cameras = self.list_cameras() if not cameras: print("未找到相机") return print(f"\n找到 {len(cameras)} 个相机:") print("=" * 80) for cam in cameras: print(f"\n[{cam['index']}] {cam['type']} 相机") print(f" 型号: {cam['model']}") print(f" 序列号: {cam['serial']}") if cam['ip']: print(f" IP地址: {cam['ip']}") if cam['user_defined_name']: print(f" 自定义名称: {cam['user_defined_name']}") print("=" * 80) def open_camera(self, index=None, serial=None, ip=None, model=None): """ 打开指定相机 参数: index: 相机索引(0, 1, 2...) serial: 相机序列号 ip: 相机IP地址(仅GigE相机) model: 相机型号 优先级: serial > ip > model > index """ cameras = self.list_cameras() if not cameras: raise Exception("未找到相机设备") # 根据条件选择相机 selected_camera = None if serial: # 按序列号查找 for cam in cameras: if cam['serial'] == serial: selected_camera = cam print(f"通过序列号选择相机: {serial}") break if not selected_camera: raise Exception(f"未找到序列号为 {serial} 的相机") elif ip: # 按IP地址查找(仅GigE) for cam in cameras: if cam['ip'] == ip: selected_camera = cam print(f"通过IP地址选择相机: {ip}") break if not selected_camera: raise Exception(f"未找到IP地址为 {ip} 的相机") elif model: # 按型号查找 for cam in cameras: if model in cam['model']: selected_camera = cam print(f"通过型号选择相机: {model}") break if not selected_camera: raise Exception(f"未找到型号包含 {model} 的相机") else: # 按索引选择(默认第一个) if index is None: index = 0 if index >= len(cameras): raise Exception(f"相机索引 {index} 超出范围(共 {len(cameras)} 个相机)") selected_camera = cameras[index] print(f"通过索引选择相机: {index}") # 打印选中的相机信息 print(f"选中相机: [{selected_camera['type']}] {selected_camera['model']} (序列号: {selected_camera['serial']})") # 创建句柄 self.cam = MvCamera() ret = self.cam.MV_CC_CreateHandle(selected_camera['device_info']) if ret != 0: raise Exception(f"创建句柄失败! 错误码: {hex(ret)}") # 打开设备 ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0) if ret != 0: # 尝试控制模式 ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Control, 0) if ret != 0: self.cam.MV_CC_DestroyHandle() raise Exception(f"打开设备失败! 错误码: {hex(ret)}") print("相机打开成功") return selected_camera def capture_image(self, filename="captured_image", storageAddress = "D:pic/", save_bmp=True, save_jpg=True, timeout=3000): """ 采集一帧图像 参数: filename: 文件名(不含扩展名) save_bmp: 是否保存BMP格式 save_jpg: 是否保存JPG格式 timeout: 超时时间(毫秒) """ if not self.cam: raise Exception("相机未打开") # 设置触发模式为连续采集 ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF) if ret != 0: print(f"警告: 设置触发模式失败! 错误码: {hex(ret)}") # 开始取流 ret = self.cam.MV_CC_StartGrabbing() if ret != 0: raise Exception(f"开始取流失败! 错误码: {hex(ret)}") print("开始采集...") time.sleep(1) # 等待相机稳定 # 获取图像 stFrameInfo = MV_FRAME_OUT() memset(byref(stFrameInfo), 0, sizeof(stFrameInfo)) ret = self.cam.MV_CC_GetImageBuffer(stFrameInfo, timeout) if ret != 0: self.cam.MV_CC_StopGrabbing() raise Exception(f"获取图像失败! 错误码: {hex(ret)}") print(f"成功获取图像: {stFrameInfo.stFrameInfo.nWidth}x{stFrameInfo.stFrameInfo.nHeight}") # 转换并保存图像 saved_files = [] if save_bmp: bmp_file = f"{storageAddress}{filename}.bmp" if self._save_image(stFrameInfo, bmp_file, MV_Image_Bmp): saved_files.append(bmp_file) if save_jpg: jpg_file = f"{storageAddress}{filename}.jpg" if self._save_image(stFrameInfo, jpg_file, MV_Image_Jpeg): saved_files.append(jpg_file) # 释放图像缓存 self.cam.MV_CC_FreeImageBuffer(stFrameInfo) # 停止取流 self.cam.MV_CC_StopGrabbing() return saved_files def _save_image(self, stFrameInfo, filename, image_type): """内部方法:保存图像""" stConvertParam = MV_SAVE_IMAGE_PARAM_EX() memset(byref(stConvertParam), 0, sizeof(stConvertParam)) nBufSize = stFrameInfo.stFrameInfo.nWidth * stFrameInfo.stFrameInfo.nHeight * 3 + 2048 pBuf = (c_ubyte * nBufSize)() stConvertParam.nWidth = stFrameInfo.stFrameInfo.nWidth stConvertParam.nHeight = stFrameInfo.stFrameInfo.nHeight stConvertParam.pData = stFrameInfo.pBufAddr stConvertParam.nDataLen = stFrameInfo.stFrameInfo.nFrameLen stConvertParam.enPixelType = stFrameInfo.stFrameInfo.enPixelType stConvertParam.pImageBuffer = cast(pBuf, POINTER(c_ubyte)) stConvertParam.nBufferSize = nBufSize stConvertParam.enImageType = image_type stConvertParam.nJpgQuality = 90 ret = self.cam.MV_CC_SaveImageEx2(stConvertParam) if ret != 0: print(f"图像转换失败! 错误码: {hex(ret)}") return False try: with open(filename, "wb") as f: f.write(bytearray(pBuf[0:stConvertParam.nImageLen])) print(f"✓ 图像已保存: {filename}") return True except Exception as e: print(f"保存文件失败: {e}") return False def close_camera(self): """关闭相机""" if self.cam: self.cam.MV_CC_CloseDevice() self.cam.MV_CC_DestroyHandle() self.cam = None print("相机已关闭") def finalize(self): """清理SDK资源""" self.close_camera() if self.is_initialized: MvCamera.MV_CC_Finalize() self.is_initialized = False print("SDK已清理") def __enter__(self): """支持with语句""" self.initialize() return self def __exit__(self, exc_type, exc_val, exc_tb): """支持with语句""" self.finalize() # 使用示例 if __name__ == "__main__": try: with HikCameraManager() as manager: # 1. 列出所有相机 manager.print_cameras() # 2. 选择相机的几种方式: # 方式1: 通过索引选择(默认第一个) # manager.open_camera(index=0) # 方式2: 通过序列号选择 # manager.open_camera(serial="DA7598570") # 方式3: 通过IP地址选择(GigE相机) manager.open_camera(ip="192.168.4.22") # 方式4: 通过型号选择 # manager.open_camera(model="MV-CS200") # 3. 采集图像 saved_files = manager.capture_image("test") print(f"\n保存的文件: {saved_files}") except Exception as e: print(f"错误: {e}") import traceback traceback.print_exc()