From 0ecd4a0ec8c4c5585cbd8975d7786c5618814381 Mon Sep 17 00:00:00 2001
From: Junjie <DELL@qq.com>
Date: 星期三, 03 十二月 2025 08:28:59 +0800
Subject: [PATCH] #

---
 camera_manager.py |  368 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 368 insertions(+), 0 deletions(-)

diff --git a/camera_manager.py b/camera_manager.py
new file mode 100644
index 0000000..cc0b28e
--- /dev/null
+++ b/camera_manager.py
@@ -0,0 +1,368 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+娴峰悍宸ヤ笟鐩告満绠$悊绫� - 鏀寔澶氱浉鏈洪�夋嫨鍜屾搷浣�
+"""
+import time
+from MvImport.MvCameraControl_class import *
+
+
+class HikCameraManager:
+    """娴峰悍鐩告満绠$悊鍣�"""
+    
+    def __init__(self):
+        self.cam = None
+        self.is_initialized = False
+        
+    def initialize(self):
+        """鍒濆鍖朣DK"""
+        ret = MvCamera.MV_CC_Initialize()
+        if ret != 0:
+            raise Exception(f"鍒濆鍖朣DK澶辫触! 閿欒鐮�: {hex(ret)}")
+        self.is_initialized = True
+        print("SDK鍒濆鍖栨垚鍔�")
+        
+    def list_cameras(self):
+        """
+        鍒楀嚭鎵�鏈夊彲鐢ㄧ浉鏈�
+        杩斿洖: 鐩告満淇℃伅鍒楄〃
+        """
+        if not self.is_initialized:
+            self.initialize()
+            
+        deviceList = MV_CC_DEVICE_INFO_LIST()
+        ret = MvCamera.MV_CC_EnumDevices(MV_GIGE_DEVICE | MV_USB_DEVICE, deviceList)
+        
+        if ret != 0:
+            raise Exception(f"鏋氫妇璁惧澶辫触! 閿欒鐮�: {hex(ret)}")
+            
+        if deviceList.nDeviceNum == 0:
+            print("鏈壘鍒扮浉鏈鸿澶�")
+            return []
+        
+        cameras = []
+        for i in range(deviceList.nDeviceNum):
+            mvcc_dev_info = cast(deviceList.pDeviceInfo[i], POINTER(MV_CC_DEVICE_INFO)).contents
+            
+            camera_info = {
+                'index': i,
+                'device_info': mvcc_dev_info,
+                'type': None,
+                'model': '',
+                'serial': '',
+                'ip': '',
+                'user_defined_name': ''
+            }
+            
+            if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:
+                camera_info['type'] = 'GigE'
+                
+                # 鍨嬪彿
+                model_name = ""
+                for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
+                    if per == 0:
+                        break
+                    model_name += chr(per)
+                camera_info['model'] = model_name.strip()
+                
+                # 搴忓垪鍙�
+                serial = ""
+                for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chSerialNumber:
+                    if per == 0:
+                        break
+                    serial += chr(per)
+                camera_info['serial'] = serial.strip()
+                
+                # IP鍦板潃
+                nip1 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0xff000000) >> 24)
+                nip2 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x00ff0000) >> 16)
+                nip3 = ((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x0000ff00) >> 8)
+                nip4 = (mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp & 0x000000ff)
+                camera_info['ip'] = f"{nip1}.{nip2}.{nip3}.{nip4}"
+                
+                # 鐢ㄦ埛鑷畾涔夊悕绉�
+                user_name = ""
+                for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chUserDefinedName:
+                    if per == 0:
+                        break
+                    user_name += chr(per)
+                camera_info['user_defined_name'] = user_name.strip()
+                
+            elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:
+                camera_info['type'] = 'USB'
+                
+                # 鍨嬪彿
+                model_name = ""
+                for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:
+                    if per == 0:
+                        break
+                    model_name += chr(per)
+                camera_info['model'] = model_name.strip()
+                
+                # 搴忓垪鍙�
+                serial = ""
+                for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:
+                    if per == 0:
+                        break
+                    serial += chr(per)
+                camera_info['serial'] = serial.strip()
+                
+                # 鐢ㄦ埛鑷畾涔夊悕绉�
+                user_name = ""
+                for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chUserDefinedName:
+                    if per == 0:
+                        break
+                    user_name += chr(per)
+                camera_info['user_defined_name'] = user_name.strip()
+            
+            cameras.append(camera_info)
+        
+        return cameras
+    
+    def print_cameras(self):
+        """鎵撳嵃鎵�鏈夌浉鏈轰俊鎭�"""
+        cameras = self.list_cameras()
+        
+        if not cameras:
+            print("鏈壘鍒扮浉鏈�")
+            return
+        
+        print(f"\n鎵惧埌 {len(cameras)} 涓浉鏈�:")
+        print("=" * 80)
+        
+        for cam in cameras:
+            print(f"\n[{cam['index']}] {cam['type']} 鐩告満")
+            print(f"  鍨嬪彿: {cam['model']}")
+            print(f"  搴忓垪鍙�: {cam['serial']}")
+            if cam['ip']:
+                print(f"  IP鍦板潃: {cam['ip']}")
+            if cam['user_defined_name']:
+                print(f"  鑷畾涔夊悕绉�: {cam['user_defined_name']}")
+        
+        print("=" * 80)
+    
+    def open_camera(self, index=None, serial=None, ip=None, model=None):
+        """
+        鎵撳紑鎸囧畾鐩告満
+        
+        鍙傛暟:
+            index: 鐩告満绱㈠紩锛�0, 1, 2...锛�
+            serial: 鐩告満搴忓垪鍙�
+            ip: 鐩告満IP鍦板潃锛堜粎GigE鐩告満锛�
+            model: 鐩告満鍨嬪彿
+            
+        浼樺厛绾�: serial > ip > model > index
+        """
+        cameras = self.list_cameras()
+        
+        if not cameras:
+            raise Exception("鏈壘鍒扮浉鏈鸿澶�")
+        
+        # 鏍规嵁鏉′欢閫夋嫨鐩告満
+        selected_camera = None
+        
+        if serial:
+            # 鎸夊簭鍒楀彿鏌ユ壘
+            for cam in cameras:
+                if cam['serial'] == serial:
+                    selected_camera = cam
+                    print(f"閫氳繃搴忓垪鍙烽�夋嫨鐩告満: {serial}")
+                    break
+            if not selected_camera:
+                raise Exception(f"鏈壘鍒板簭鍒楀彿涓� {serial} 鐨勭浉鏈�")
+                
+        elif ip:
+            # 鎸塈P鍦板潃鏌ユ壘锛堜粎GigE锛�
+            for cam in cameras:
+                if cam['ip'] == ip:
+                    selected_camera = cam
+                    print(f"閫氳繃IP鍦板潃閫夋嫨鐩告満: {ip}")
+                    break
+            if not selected_camera:
+                raise Exception(f"鏈壘鍒癐P鍦板潃涓� {ip} 鐨勭浉鏈�")
+                
+        elif model:
+            # 鎸夊瀷鍙锋煡鎵�
+            for cam in cameras:
+                if model in cam['model']:
+                    selected_camera = cam
+                    print(f"閫氳繃鍨嬪彿閫夋嫨鐩告満: {model}")
+                    break
+            if not selected_camera:
+                raise Exception(f"鏈壘鍒板瀷鍙峰寘鍚� {model} 鐨勭浉鏈�")
+                
+        else:
+            # 鎸夌储寮曢�夋嫨锛堥粯璁ょ涓�涓級
+            if index is None:
+                index = 0
+            if index >= len(cameras):
+                raise Exception(f"鐩告満绱㈠紩 {index} 瓒呭嚭鑼冨洿锛堝叡 {len(cameras)} 涓浉鏈猴級")
+            selected_camera = cameras[index]
+            print(f"閫氳繃绱㈠紩閫夋嫨鐩告満: {index}")
+        
+        # 鎵撳嵃閫変腑鐨勭浉鏈轰俊鎭�
+        print(f"閫変腑鐩告満: [{selected_camera['type']}] {selected_camera['model']} (搴忓垪鍙�: {selected_camera['serial']})")
+        
+        # 鍒涘缓鍙ユ焺
+        self.cam = MvCamera()
+        ret = self.cam.MV_CC_CreateHandle(selected_camera['device_info'])
+        if ret != 0:
+            raise Exception(f"鍒涘缓鍙ユ焺澶辫触! 閿欒鐮�: {hex(ret)}")
+        
+        # 鎵撳紑璁惧
+        ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Exclusive, 0)
+        if ret != 0:
+            # 灏濊瘯鎺у埗妯″紡
+            ret = self.cam.MV_CC_OpenDevice(MV_ACCESS_Control, 0)
+            if ret != 0:
+                self.cam.MV_CC_DestroyHandle()
+                raise Exception(f"鎵撳紑璁惧澶辫触! 閿欒鐮�: {hex(ret)}")
+        
+        print("鐩告満鎵撳紑鎴愬姛")
+        return selected_camera
+    
+    def capture_image(self, filename="captured_image", storageAddress = "D:pic/", save_bmp=True, save_jpg=True, timeout=3000):
+        """
+        閲囬泦涓�甯у浘鍍�
+        
+        鍙傛暟:
+            filename: 鏂囦欢鍚嶏紙涓嶅惈鎵╁睍鍚嶏級
+            save_bmp: 鏄惁淇濆瓨BMP鏍煎紡
+            save_jpg: 鏄惁淇濆瓨JPG鏍煎紡
+            timeout: 瓒呮椂鏃堕棿锛堟绉掞級
+        """
+        if not self.cam:
+            raise Exception("鐩告満鏈墦寮�")
+        
+        # 璁剧疆瑙﹀彂妯″紡涓鸿繛缁噰闆�
+        ret = self.cam.MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)
+        if ret != 0:
+            print(f"璀﹀憡: 璁剧疆瑙﹀彂妯″紡澶辫触! 閿欒鐮�: {hex(ret)}")
+        
+        # 寮�濮嬪彇娴�
+        ret = self.cam.MV_CC_StartGrabbing()
+        if ret != 0:
+            raise Exception(f"寮�濮嬪彇娴佸け璐�! 閿欒鐮�: {hex(ret)}")
+        
+        print("寮�濮嬮噰闆�...")
+        time.sleep(1)  # 绛夊緟鐩告満绋冲畾
+        
+        # 鑾峰彇鍥惧儚
+        stFrameInfo = MV_FRAME_OUT()
+        memset(byref(stFrameInfo), 0, sizeof(stFrameInfo))
+        
+        ret = self.cam.MV_CC_GetImageBuffer(stFrameInfo, timeout)
+        if ret != 0:
+            self.cam.MV_CC_StopGrabbing()
+            raise Exception(f"鑾峰彇鍥惧儚澶辫触! 閿欒鐮�: {hex(ret)}")
+        
+        print(f"鎴愬姛鑾峰彇鍥惧儚: {stFrameInfo.stFrameInfo.nWidth}x{stFrameInfo.stFrameInfo.nHeight}")
+        
+        # 杞崲骞朵繚瀛樺浘鍍�
+        saved_files = []
+        
+        if save_bmp:
+            bmp_file = f"{storageAddress}{filename}.bmp"
+            if self._save_image(stFrameInfo, bmp_file, MV_Image_Bmp):
+                saved_files.append(bmp_file)
+        
+        if save_jpg:
+            jpg_file = f"{storageAddress}{filename}.jpg"
+            if self._save_image(stFrameInfo, jpg_file, MV_Image_Jpeg):
+                saved_files.append(jpg_file)
+        
+        # 閲婃斁鍥惧儚缂撳瓨
+        self.cam.MV_CC_FreeImageBuffer(stFrameInfo)
+        
+        # 鍋滄鍙栨祦
+        self.cam.MV_CC_StopGrabbing()
+        
+        return saved_files
+    
+    def _save_image(self, stFrameInfo, filename, image_type):
+        """鍐呴儴鏂规硶锛氫繚瀛樺浘鍍�"""
+        stConvertParam = MV_SAVE_IMAGE_PARAM_EX()
+        memset(byref(stConvertParam), 0, sizeof(stConvertParam))
+        
+        nBufSize = stFrameInfo.stFrameInfo.nWidth * stFrameInfo.stFrameInfo.nHeight * 3 + 2048
+        pBuf = (c_ubyte * nBufSize)()
+        
+        stConvertParam.nWidth = stFrameInfo.stFrameInfo.nWidth
+        stConvertParam.nHeight = stFrameInfo.stFrameInfo.nHeight
+        stConvertParam.pData = stFrameInfo.pBufAddr
+        stConvertParam.nDataLen = stFrameInfo.stFrameInfo.nFrameLen
+        stConvertParam.enPixelType = stFrameInfo.stFrameInfo.enPixelType
+        stConvertParam.pImageBuffer = cast(pBuf, POINTER(c_ubyte))
+        stConvertParam.nBufferSize = nBufSize
+        stConvertParam.enImageType = image_type
+        stConvertParam.nJpgQuality = 90
+        
+        ret = self.cam.MV_CC_SaveImageEx2(stConvertParam)
+        if ret != 0:
+            print(f"鍥惧儚杞崲澶辫触! 閿欒鐮�: {hex(ret)}")
+            return False
+        
+        try:
+            with open(filename, "wb") as f:
+                f.write(bytearray(pBuf[0:stConvertParam.nImageLen]))
+            print(f"鉁� 鍥惧儚宸蹭繚瀛�: {filename}")
+            return True
+        except Exception as e:
+            print(f"淇濆瓨鏂囦欢澶辫触: {e}")
+            return False
+    
+    def close_camera(self):
+        """鍏抽棴鐩告満"""
+        if self.cam:
+            self.cam.MV_CC_CloseDevice()
+            self.cam.MV_CC_DestroyHandle()
+            self.cam = None
+            print("鐩告満宸插叧闂�")
+    
+    def finalize(self):
+        """娓呯悊SDK璧勬簮"""
+        self.close_camera()
+        if self.is_initialized:
+            MvCamera.MV_CC_Finalize()
+            self.is_initialized = False
+            print("SDK宸叉竻鐞�")
+    
+    def __enter__(self):
+        """鏀寔with璇彞"""
+        self.initialize()
+        return self
+    
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """鏀寔with璇彞"""
+        self.finalize()
+
+
+# 浣跨敤绀轰緥
+if __name__ == "__main__":
+    try:
+        with HikCameraManager() as manager:
+            # 1. 鍒楀嚭鎵�鏈夌浉鏈�
+            manager.print_cameras()
+            
+            # 2. 閫夋嫨鐩告満鐨勫嚑绉嶆柟寮忥細
+            
+            # 鏂瑰紡1: 閫氳繃绱㈠紩閫夋嫨锛堥粯璁ょ涓�涓級
+            # manager.open_camera(index=0)
+            
+            # 鏂瑰紡2: 閫氳繃搴忓垪鍙烽�夋嫨
+            # manager.open_camera(serial="DA7598570")
+            
+            # 鏂瑰紡3: 閫氳繃IP鍦板潃閫夋嫨锛圙igE鐩告満锛�
+            manager.open_camera(ip="192.168.4.22")
+            
+            # 鏂瑰紡4: 閫氳繃鍨嬪彿閫夋嫨
+            # manager.open_camera(model="MV-CS200")
+            
+            # 3. 閲囬泦鍥惧儚
+            saved_files = manager.capture_image("test")
+            print(f"\n淇濆瓨鐨勬枃浠�: {saved_files}")
+            
+    except Exception as e:
+        print(f"閿欒: {e}")
+        import traceback
+        traceback.print_exc()

--
Gitblit v1.9.1