From 0ecd4a0ec8c4c5585cbd8975d7786c5618814381 Mon Sep 17 00:00:00 2001
From: Junjie <DELL@qq.com>
Date: 星期三, 03 十二月 2025 08:28:59 +0800
Subject: [PATCH] #
---
camera_manager.py | 368 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 368 insertions(+), 0 deletions(-)
diff --git a/camera_manager.py b/camera_manager.py
new file mode 100644
index 0000000..cc0b28e
--- /dev/null
+++ b/camera_manager.py
@@ -0,0 +1,368 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+娴峰悍宸ヤ笟鐩告満绠$悊绫� - 鏀寔澶氱浉鏈洪�夋嫨鍜屾搷浣�
+"""
+import time
+from MvImport.MvCameraControl_class import *
+
+
+class HikCameraManager:
+ """娴峰悍鐩告満绠$悊鍣�"""
+
+ def __init__(self):
+ self.cam = None
+ self.is_initialized = False
+
+ def initialize(self):
+ """鍒濆鍖朣DK"""
+ ret = MvCamera.MV_CC_Initialize()
+ if ret != 0:
+ raise Exception(f"鍒濆鍖朣DK澶辫触! 閿欒鐮�: {hex(ret)}")
+ self.is_initialized = True
+ print("SDK鍒濆鍖栨垚鍔�")
+
+ def list_cameras(self):
+ """
+ 鍒楀嚭鎵�鏈夊彲鐢ㄧ浉鏈�
+ 杩斿洖: 鐩告満淇℃伅鍒楄〃
+ """
+ if not self.is_initialized:
+ self.initialize()
+
+ deviceList = MV_CC_DEVICE_INFO_LIST()
+ ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, deviceList)
+
+ if ret != 0:
+ raise Exception(f"鏋氫妇璁惧澶辫触! 閿欒鐮�: {hex(ret)}")
+
+ if deviceList.nDeviceNum == 0:
+ print("鏈壘鍒扮浉鏈鸿澶�")
+ return []
+
+ cameras = []
+ for i in range(deviceList.nDeviceNum):
+ mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
+
+ camera_info = {
+ 'index': i,
+ 'device_info': mvcc_dev_info,
+ 'type': None,
+ 'model': '',
+ 'serial': '',
+ 'ip': '',
+ 'user_defined_name': ''
+ }
+
+ if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
+ camera_info['type'] = 'GigE'
+
+ # 鍨嬪彿
+ model_name = ""
+ for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
+ if per == 0:
+ break
+ model_name += chr(per)
+ camera_info['model'] = model_name.strip()
+
+ # 搴忓垪鍙�
+ serial = ""
+ for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chSerialNumber:
+ if per == 0:
+ break
+ serial += chr(per)
+ camera_info['serial'] = serial.strip()
+
+ # IP鍦板潃
+ nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
+ nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
+ nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
+ nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
+ camera_info['ip'] = f"{nip1}.{nip2}.{nip3}.{nip4}"
+
+ # 鐢ㄦ埛鑷畾涔夊悕绉�
+ user_name = ""
+ for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName:
+ if per == 0:
+ break
+ user_name += chr(per)
+ camera_info['user_defined_name'] = user_name.strip()
+
+ elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
+ camera_info['type'] = 'USB'
+
+ # 鍨嬪彿
+ model_name = ""
+ for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
+ if per == 0:
+ break
+ model_name += chr(per)
+ camera_info['model'] = model_name.strip()
+
+ # 搴忓垪鍙�
+ serial = ""
+ for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
+ if per == 0:
+ break
+ serial += chr(per)
+ camera_info['serial'] = serial.strip()
+
+ # 鐢ㄦ埛鑷畾涔夊悕绉�
+ user_name = ""
+ for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName:
+ if per == 0:
+ break
+ user_name += chr(per)
+ camera_info['user_defined_name'] = user_name.strip()
+
+ cameras.append(camera_info)
+
+ return cameras
+
+ def print_cameras(self):
+ """鎵撳嵃鎵�鏈夌浉鏈轰俊鎭�"""
+ cameras = self.list_cameras()
+
+ if not cameras:
+ print("鏈壘鍒扮浉鏈�")
+ return
+
+ print(f"\n鎵惧埌 {len(cameras)} 涓浉鏈�:")
+ print("=" * 80)
+
+ for cam in cameras:
+ print(f"\n[{cam['index']}] {cam['type']} 鐩告満")
+ print(f" 鍨嬪彿: {cam['model']}")
+ print(f" 搴忓垪鍙�: {cam['serial']}")
+ if cam['ip']:
+ print(f" IP鍦板潃: {cam['ip']}")
+ if cam['user_defined_name']:
+ print(f" 鑷畾涔夊悕绉�: {cam['user_defined_name']}")
+
+ print("=" * 80)
+
+ def open_camera(self, index=None, serial=None, ip=None, model=None):
+ """
+ 鎵撳紑鎸囧畾鐩告満
+
+ 鍙傛暟:
+ index: 鐩告満绱㈠紩锛�0, 1, 2...锛�
+ serial: 鐩告満搴忓垪鍙�
+ ip: 鐩告満IP鍦板潃锛堜粎GigE鐩告満锛�
+ model: 鐩告満鍨嬪彿
+
+ 浼樺厛绾�: serial > ip > model > index
+ """
+ cameras = self.list_cameras()
+
+ if not cameras:
+ raise Exception("鏈壘鍒扮浉鏈鸿澶�")
+
+ # 鏍规嵁鏉′欢閫夋嫨鐩告満
+ selected_camera = None
+
+ if serial:
+ # 鎸夊簭鍒楀彿鏌ユ壘
+ for cam in cameras:
+ if cam['serial'] == serial:
+ selected_camera = cam
+ print(f"閫氳繃搴忓垪鍙烽�夋嫨鐩告満: {serial}")
+ break
+ if not selected_camera:
+ raise Exception(f"鏈壘鍒板簭鍒楀彿涓� {serial} 鐨勭浉鏈�")
+
+ elif ip:
+ # 鎸塈P鍦板潃鏌ユ壘锛堜粎GigE锛�
+ for cam in cameras:
+ if cam['ip'] == ip:
+ selected_camera = cam
+ print(f"閫氳繃IP鍦板潃閫夋嫨鐩告満: {ip}")
+ break
+ if not selected_camera:
+ raise Exception(f"鏈壘鍒癐P鍦板潃涓� {ip} 鐨勭浉鏈�")
+
+ elif model:
+ # 鎸夊瀷鍙锋煡鎵�
+ for cam in cameras:
+ if model in cam['model']:
+ selected_camera = cam
+ print(f"閫氳繃鍨嬪彿閫夋嫨鐩告満: {model}")
+ break
+ if not selected_camera:
+ raise Exception(f"鏈壘鍒板瀷鍙峰寘鍚� {model} 鐨勭浉鏈�")
+
+ else:
+ # 鎸夌储寮曢�夋嫨锛堥粯璁ょ涓�涓級
+ if index is None:
+ index = 0
+ if index >= len(cameras):
+ raise Exception(f"鐩告満绱㈠紩 {index} 瓒呭嚭鑼冨洿锛堝叡 {len(cameras)} 涓浉鏈猴級")
+ selected_camera = cameras[index]
+ print(f"閫氳繃绱㈠紩閫夋嫨鐩告満: {index}")
+
+ # 鎵撳嵃閫変腑鐨勭浉鏈轰俊鎭�
+ print(f"閫変腑鐩告満: [{selected_camera['type']}] {selected_camera['model']} (搴忓垪鍙�: {selected_camera['serial']})")
+
+ # 鍒涘缓鍙ユ焺
+ self.cam = MvCamera()
+ ret = self.cam.MV_CC_CreateHandle(selected_camera['device_info'])
+ if ret != 0:
+ raise Exception(f"鍒涘缓鍙ユ焺澶辫触! 閿欒鐮�: {hex(ret)}")
+
+ # 鎵撳紑璁惧
+ ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
+ if ret != 0:
+ # 灏濊瘯鎺у埗妯″紡
+ ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Control, 0)
+ if ret != 0:
+ self.cam.MV_CC_DestroyHandle()
+ raise Exception(f"鎵撳紑璁惧澶辫触! 閿欒鐮�: {hex(ret)}")
+
+ print("鐩告満鎵撳紑鎴愬姛")
+ return selected_camera
+
+ def capture_image(self, filename="captured_image", storageAddress = "D:pic/", save_bmp=True, save_jpg=True, timeout=3000):
+ """
+ 閲囬泦涓�甯у浘鍍�
+
+ 鍙傛暟:
+ filename: 鏂囦欢鍚嶏紙涓嶅惈鎵╁睍鍚嶏級
+ save_bmp: 鏄惁淇濆瓨BMP鏍煎紡
+ save_jpg: 鏄惁淇濆瓨JPG鏍煎紡
+ timeout: 瓒呮椂鏃堕棿锛堟绉掞級
+ """
+ if not self.cam:
+ raise Exception("鐩告満鏈墦寮�")
+
+ # 璁剧疆瑙﹀彂妯″紡涓鸿繛缁噰闆�
+ ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
+ if ret != 0:
+ print(f"璀﹀憡: 璁剧疆瑙﹀彂妯″紡澶辫触! 閿欒鐮�: {hex(ret)}")
+
+ # 寮�濮嬪彇娴�
+ ret = self.cam.MV_CC_StartGrabbing()
+ if ret != 0:
+ raise Exception(f"寮�濮嬪彇娴佸け璐�! 閿欒鐮�: {hex(ret)}")
+
+ print("寮�濮嬮噰闆�...")
+ time.sleep(1) # 绛夊緟鐩告満绋冲畾
+
+ # 鑾峰彇鍥惧儚
+ stFrameInfo = MV_FRAME_OUT()
+ memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
+
+ ret = self.cam.MV_CC_GetImageBuffer(stFrameInfo, timeout)
+ if ret != 0:
+ self.cam.MV_CC_StopGrabbing()
+ raise Exception(f"鑾峰彇鍥惧儚澶辫触! 閿欒鐮�: {hex(ret)}")
+
+ print(f"鎴愬姛鑾峰彇鍥惧儚: {stFrameInfo.stFrameInfo.nWidth}x{stFrameInfo.stFrameInfo.nHeight}")
+
+ # 杞崲骞朵繚瀛樺浘鍍�
+ saved_files = []
+
+ if save_bmp:
+ bmp_file = f"{storageAddress}{filename}.bmp"
+ if self._save_image(stFrameInfo, bmp_file, MV_Image_Bmp):
+ saved_files.append(bmp_file)
+
+ if save_jpg:
+ jpg_file = f"{storageAddress}{filename}.jpg"
+ if self._save_image(stFrameInfo, jpg_file, MV_Image_Jpeg):
+ saved_files.append(jpg_file)
+
+ # 閲婃斁鍥惧儚缂撳瓨
+ self.cam.MV_CC_FreeImageBuffer(stFrameInfo)
+
+ # 鍋滄鍙栨祦
+ self.cam.MV_CC_StopGrabbing()
+
+ return saved_files
+
+ def _save_image(self, stFrameInfo, filename, image_type):
+ """鍐呴儴鏂规硶锛氫繚瀛樺浘鍍�"""
+ stConvertParam = MV_SAVE_IMAGE_PARAM_EX()
+ memset(byref(stConvertParam), 0, sizeof(stConvertParam))
+
+ nBufSize = stFrameInfo.stFrameInfo.nWidth * stFrameInfo.stFrameInfo.nHeight * 3 + 2048
+ pBuf = (c_ubyte * nBufSize)()
+
+ stConvertParam.nWidth = stFrameInfo.stFrameInfo.nWidth
+ stConvertParam.nHeight = stFrameInfo.stFrameInfo.nHeight
+ stConvertParam.pData = stFrameInfo.pBufAddr
+ stConvertParam.nDataLen = stFrameInfo.stFrameInfo.nFrameLen
+ stConvertParam.enPixelType = stFrameInfo.stFrameInfo.enPixelType
+ stConvertParam.pImageBuffer = cast(pBuf, POINTER(c_ubyte))
+ stConvertParam.nBufferSize = nBufSize
+ stConvertParam.enImageType = image_type
+ stConvertParam.nJpgQuality = 90
+
+ ret = self.cam.MV_CC_SaveImageEx2(stConvertParam)
+ if ret != 0:
+ print(f"鍥惧儚杞崲澶辫触! 閿欒鐮�: {hex(ret)}")
+ return False
+
+ try:
+ with open(filename, "wb") as f:
+ f.write(bytearray(pBuf[0:stConvertParam.nImageLen]))
+ print(f"鉁� 鍥惧儚宸蹭繚瀛�: {filename}")
+ return True
+ except Exception as e:
+ print(f"淇濆瓨鏂囦欢澶辫触: {e}")
+ return False
+
+ def close_camera(self):
+ """鍏抽棴鐩告満"""
+ if self.cam:
+ self.cam.MV_CC_CloseDevice()
+ self.cam.MV_CC_DestroyHandle()
+ self.cam = None
+ print("鐩告満宸插叧闂�")
+
+ def finalize(self):
+ """娓呯悊SDK璧勬簮"""
+ self.close_camera()
+ if self.is_initialized:
+ MvCamera.MV_CC_Finalize()
+ self.is_initialized = False
+ print("SDK宸叉竻鐞�")
+
+ def __enter__(self):
+ """鏀寔with璇彞"""
+ self.initialize()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """鏀寔with璇彞"""
+ self.finalize()
+
+
+# 浣跨敤绀轰緥
+if __name__ == "__main__":
+ try:
+ with HikCameraManager() as manager:
+ # 1. 鍒楀嚭鎵�鏈夌浉鏈�
+ manager.print_cameras()
+
+ # 2. 閫夋嫨鐩告満鐨勫嚑绉嶆柟寮忥細
+
+ # 鏂瑰紡1: 閫氳繃绱㈠紩閫夋嫨锛堥粯璁ょ涓�涓級
+ # manager.open_camera(index=0)
+
+ # 鏂瑰紡2: 閫氳繃搴忓垪鍙烽�夋嫨
+ # manager.open_camera(serial="DA7598570")
+
+ # 鏂瑰紡3: 閫氳繃IP鍦板潃閫夋嫨锛圙igE鐩告満锛�
+ manager.open_camera(ip="192.168.4.22")
+
+ # 鏂瑰紡4: 閫氳繃鍨嬪彿閫夋嫨
+ # manager.open_camera(model="MV-CS200")
+
+ # 3. 閲囬泦鍥惧儚
+ saved_files = manager.capture_image("test")
+ print(f"\n淇濆瓨鐨勬枃浠�: {saved_files}")
+
+ except Exception as e:
+ print(f"閿欒: {e}")
+ import traceback
+ traceback.print_exc()
--
Gitblit v1.9.1