Industrial Camera
Industrial Camera

GxiPy Implementation Guide – Daheng Imaging Galaxy SDK

Table of Contents

  1. Overview
  2. System Requirements
  3. Installation and Setup
  4. Basic Concepts
  5. Quick Start Guide
  6. Camera Discovery and Connection
  7. Image Acquisition Fundamentals
  8. Camera Configuration
  9. Advanced Features
  10. Error Handling and Best Practices
  11. Complete Working Example
  12. Performance Optimization
  13. Troubleshooting
  14. API Reference Summary

Overview

The gxipy library is the Python interface for Daheng Imaging’s Galaxy SDK, providing comprehensive control over Daheng Imaging cameras. This library enables developers to integrate machine vision cameras into their Python applications with full access to camera features, image acquisition, and advanced processing capabilities.

Key Features

  • Multi-interface Support: GigE Vision, USB3 Vision, USB2.0, CXP
  • Real-time Image Acquisition: High-speed streaming with callback mechanisms
  • Comprehensive Camera Control: Exposure, gain, triggering, ROI, pixel format
  • Image Processing: Built-in color correction, format conversion, and enhancement
  • Multi-camera Support: Simultaneous control of multiple cameras
  • Cross-platform: Windows and Linux support

System Requirements

Hardware Requirements

  • Operating System: Windows 7/8/10/11 (32/64-bit) or Linux Ubuntu 16.04+
  • Memory: Minimum 4GB RAM (8GB+ recommended for multi-camera applications)
  • USB: USB 3.0 ports for USB3 Vision cameras
  • Network: Gigabit Ethernet for GigE Vision cameras
  • CPU: Intel Core i5 or equivalent (multi-core recommended)

Software Requirements

  • Python: 3.6 or higher
  • Galaxy SDK: Must be installed before using gxipy
  • Dependencies: NumPy, PIL (for image processing examples)

Installation and Setup

Step 1: Install Galaxy SDK

  1. Download the Galaxy SDK from Daheng Imaging’s official website
  2. Run the installer with administrator privileges
  3. Ensure all camera drivers are properly installed

Step 2: Install Python Dependencies

pip install numpy pillow

Step 3: Verify gxipy Installation

The gxipy library is typically located in the SDK installation directory:

  • Windows: C:\Program Files\Daheng Imaging\GalaxySDK\Development\Samples\Python\gxipy
  • Linux: Galaxy_Linux_Python_X.X.X.X/api/gxipy

Step 4: Setup Python Path

Add the gxipy directory to your Python path or copy it to your project directory.

Basic Concepts

DeviceManager

The DeviceManager is the central class for camera discovery and management:

  • Handles SDK initialization and cleanup
  • Enumerates available cameras
  • Creates device connections
  • Manages multiple cameras

Device Classes

Different camera interfaces have specialized device classes:

  • GEVDevice: GigE Vision cameras
  • U3VDevice: USB3 Vision cameras
  • U2Device: USB2 cameras
  • Device: Base class for all devices

DataStream

Handles image acquisition and buffering:

  • Manages acquisition buffers
  • Provides callback mechanisms
  • Controls streaming parameters

Image Processing

Built-in processing capabilities:

  • ImageFormatConvert: Pixel format conversion
  • ImageProcess: Color correction and enhancement
  • ImageProcessConfig: Processing parameter management

Quick Start Guide

Here’s a minimal example to capture an image:

import gxipy as gx
import numpy as np
from PIL import Image

# Initialize device manager
device_manager = gx.DeviceManager()

# Update device list
device_num, device_info_list = device_manager.update_device_list()
if device_num == 0:
    print("No camera found")
    exit()

# Open first camera
cam = device_manager.open_device_by_index(1)

# Start acquisition
cam.stream_on()

# Get an image
raw_image = cam.data_stream[0].get_image()
if raw_image is None:
    print("Failed to get image")
else:
    # Convert to numpy array
    numpy_image = raw_image.get_numpy_array()
    print(f"Image captured: {numpy_image.shape}")

# Stop acquisition and cleanup
cam.stream_off()
cam.close_device()

Camera Discovery and Connection

Discovering Cameras

import gxipy as gx

# Initialize device manager
device_manager = gx.DeviceManager()

# Update device list (with timeout)
device_num, device_info_list = device_manager.update_device_list(timeout=2000)

print(f"Found {device_num} cameras:")
for i, device_info in enumerate(device_info_list):
    print(f"Camera {i+1}:")
    print(f"  Model: {device_info.get('model_name', 'Unknown')}")
    print(f"  Serial: {device_info.get('sn', 'Unknown')}")
    print(f"  IP: {device_info.get('ip', 'N/A')}")
    print(f"  Status: {device_info.get('access_status', 'Unknown')}")

Connection Methods

Connect by Index

# Connect to first available camera
cam = device_manager.open_device_by_index(1)  # 1-based indexing

Connect by Serial Number

# Connect by serial number (most reliable)
cam = device_manager.open_device_by_sn("SERIAL_NUMBER_HERE")

Connect by IP Address (GigE only)

# Connect by IP address
cam = device_manager.open_device_by_ip("192.168.1.100")

Connect by User-defined Name

# Connect by user ID
cam = device_manager.open_device_by_user_id("Camera1")

Connection Parameters

# Open with specific access mode
cam = device_manager.open_device_by_index(1, 
                                         access_mode=gx.GxAccessMode.EXCLUSIVE)

Image Acquisition Fundamentals

Basic Acquisition Setup

# Configure basic acquisition parameters
cam.TriggerMode.set(gx.GxTriggerModeEntry.OFF)  # Free-running mode
cam.AcquisitionMode.set(gx.GxAcquisitionModeEntry.CONTINUOUS)

# Set exposure time (microseconds)
cam.ExposureTime.set(10000)  # 10ms

# Set pixel format
cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)

# Start acquisition
cam.stream_on()

Image Acquisition Methods

Method 1: Synchronous Acquisition

# Get single image (blocking)
raw_image = cam.data_stream[0].get_image(timeout=1000)  # 1 second timeout
if raw_image is None:
    print("Timeout: No image received")
else:
    numpy_array = raw_image.get_numpy_array()
    print(f"Image size: {numpy_array.shape}")

Method 2: Asynchronous Acquisition with Callbacks

def image_callback(raw_image):
    """Callback function for image processing"""
    numpy_array = raw_image.get_numpy_array()
    print(f"Callback: Image {numpy_array.shape} received")
    # Process image here

# Register callback
cam.data_stream[0].register_capture_callback(image_callback)

# Start acquisition
cam.stream_on()

# Let it run for a while
import time
time.sleep(5)

# Stop and cleanup
cam.data_stream[0].unregister_capture_callback()
cam.stream_off()

Image Data Handling

# Get image and metadata
raw_image = cam.data_stream[0].get_image()
if raw_image is not None:
    # Get image properties
    width = raw_image.get_width()
    height = raw_image.get_height()
    pixel_format = raw_image.get_pixel_format()
    frame_id = raw_image.get_frame_id()
    timestamp = raw_image.get_timestamp()

    print(f"Image: {width}x{height}, Format: {pixel_format}")
    print(f"Frame ID: {frame_id}, Timestamp: {timestamp}")

    # Convert to numpy array
    numpy_image = raw_image.get_numpy_array()

    # Save as image file
    if pixel_format in [gx.GxPixelFormatEntry.MONO8]:
        img = Image.fromarray(numpy_image, mode='L')
    elif pixel_format in [gx.GxPixelFormatEntry.BGR8]:
        img = Image.fromarray(numpy_image, mode='RGB')
    else:
        print(f"Unsupported pixel format for direct save: {pixel_format}")
        img = None

    if img:
        img.save("captured_image.jpg")

Camera Configuration

Essential Camera Settings

Exposure Control

# Manual exposure
cam.ExposureAuto.set(gx.GxAutoEntry.OFF)
cam.ExposureTime.set(50000)  # 50ms in microseconds

# Auto exposure
cam.ExposureAuto.set(gx.GxAutoEntry.CONTINUOUS)

# Get exposure range
exp_min = cam.ExposureTime.get_min()
exp_max = cam.ExposureTime.get_max()
print(f"Exposure range: {exp_min} - {exp_max} microseconds")

Gain Control

# Check if gain is available
if cam.Gain.is_implemented():
    # Set manual gain
    cam.GainAuto.set(gx.GxAutoEntry.OFF)
    cam.Gain.set(10.0)  # dB

    # Auto gain
    cam.GainAuto.set(gx.GxAutoEntry.CONTINUOUS)

Region of Interest (ROI)

# Get sensor dimensions
sensor_width = cam.SensorWidth.get()
sensor_height = cam.SensorHeight.get()
max_width = cam.WidthMax.get()
max_height = cam.HeightMax.get()

print(f"Sensor: {sensor_width}x{sensor_height}")
print(f"Max ROI: {max_width}x{max_height}")

# Set ROI (ensure values are within bounds and follow increments)
roi_x = 100
roi_y = 100
roi_width = 800
roi_height = 600

# Check and align to increments
width_inc = cam.Width.get_inc()
height_inc = cam.Height.get_inc()
x_inc = cam.OffsetX.get_inc()
y_inc = cam.OffsetY.get_inc()

# Align values
roi_x = (roi_x // x_inc) * x_inc
roi_y = (roi_y // y_inc) * y_inc
roi_width = (roi_width // width_inc) * width_inc
roi_height = (roi_height // height_inc) * height_inc

# Set ROI
cam.OffsetX.set(roi_x)
cam.OffsetY.set(roi_y)
cam.Width.set(roi_width)
cam.Height.set(roi_height)

print(f"ROI set to: {roi_x},{roi_y} {roi_width}x{roi_height}")

Pixel Format Configuration

# Check available pixel formats
if cam.PixelFormat.is_implemented():
    format_list = cam.PixelFormat.get_enum_description()
    print("Available pixel formats:")
    for fmt in format_list:
        print(f"  {fmt}")

    # Set specific format
    cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
    current_format = cam.PixelFormat.get()
    print(f"Current format: {current_format}")

Frame Rate Control

# Enable frame rate control
if cam.AcquisitionFrameRateMode.is_implemented():
    cam.AcquisitionFrameRateMode.set(gx.GxAcquisitionFrameRateModeEntry.ON)

    # Set frame rate
    max_fps = cam.AcquisitionFrameRate.get_max()
    cam.AcquisitionFrameRate.set(min(30.0, max_fps))

    actual_fps = cam.CurrentAcquisitionFrameRate.get()
    print(f"Frame rate set to {actual_fps} fps")

Triggering Configuration

Hardware Triggering

# Configure hardware trigger
cam.TriggerMode.set(gx.GxTriggerModeEntry.ON)
cam.TriggerSource.set(gx.GxTriggerSourceEntry.LINE2)  # External trigger input
cam.TriggerActivation.set(gx.GxTriggerActivationEntry.RISINGEDGE)

# Optional: Set trigger delay
if cam.TriggerDelay.is_implemented():
    cam.TriggerDelay.set(100.0)  # 100 microseconds delay

print("Hardware trigger configured")

Software Triggering

# Configure software trigger
cam.TriggerMode.set(gx.GxTriggerModeEntry.ON)
cam.TriggerSource.set(gx.GxTriggerSourceEntry.SOFTWARE)

# Start acquisition
cam.stream_on()

# Send software trigger
cam.TriggerSoftware.send_command()

# Get triggered image
raw_image = cam.data_stream[0].get_image(timeout=5000)
if raw_image:
    print("Software trigger successful")

Advanced Features

Multi-Camera Setup

class MultiCameraManager:
    def __init__(self):
        self.device_manager = gx.DeviceManager()
        self.cameras = []

    def discover_cameras(self):
        device_num, device_info_list = self.device_manager.update_device_list()
        print(f"Found {device_num} cameras")
        return device_info_list

    def connect_cameras(self, serial_numbers):
        """Connect to cameras by serial number"""
        for sn in serial_numbers:
            try:
                cam = self.device_manager.open_device_by_sn(sn)
                self.cameras.append({
                    'camera': cam,
                    'serial': sn,
                    'active': False
                })
                print(f"Connected to camera: {sn}")
            except Exception as e:
                print(f"Failed to connect to camera {sn}: {e}")

    def configure_all_cameras(self):
        """Configure all cameras with same settings"""
        for cam_info in self.cameras:
            cam = cam_info['camera']
            try:
                # Basic configuration
                cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
                cam.ExposureTime.set(10000)
                cam.TriggerMode.set(gx.GxTriggerModeEntry.OFF)
                cam.AcquisitionMode.set(gx.GxAcquisitionModeEntry.CONTINUOUS)
                print(f"Configured camera: {cam_info['serial']}")
            except Exception as e:
                print(f"Configuration failed for {cam_info['serial']}: {e}")

    def start_all_acquisition(self):
        """Start acquisition on all cameras"""
        for cam_info in self.cameras:
            try:
                cam_info['camera'].stream_on()
                cam_info['active'] = True
                print(f"Started acquisition: {cam_info['serial']}")
            except Exception as e:
                print(f"Start failed for {cam_info['serial']}: {e}")

    def capture_from_all(self, timeout=1000):
        """Capture images from all cameras"""
        images = {}
        for cam_info in self.cameras:
            if cam_info['active']:
                try:
                    raw_image = cam_info['camera'].data_stream[0].get_image(timeout)
                    if raw_image:
                        images[cam_info['serial']] = raw_image.get_numpy_array()
                except Exception as e:
                    print(f"Capture failed for {cam_info['serial']}: {e}")
        return images

    def cleanup(self):
        """Stop all cameras and cleanup"""
        for cam_info in self.cameras:
            try:
                if cam_info['active']:
                    cam_info['camera'].stream_off()
                cam_info['camera'].close_device()
            except Exception as e:
                print(f"Cleanup failed for {cam_info['serial']}: {e}")

# Usage example
multi_cam = MultiCameraManager()
devices = multi_cam.discover_cameras()
serial_numbers = [dev['sn'] for dev in devices[:2]]  # Use first 2 cameras
multi_cam.connect_cameras(serial_numbers)
multi_cam.configure_all_cameras()
multi_cam.start_all_acquisition()

# Capture synchronized images
images = multi_cam.capture_from_all()
for serial, image in images.items():
    print(f"Camera {serial}: Image shape {image.shape}")

multi_cam.cleanup()

Image Processing Integration

# Create image processing objects
image_process = device_manager.create_image_process()
color_converter = device_manager.create_image_format_convert()

def process_raw_image(raw_image):
    """Process raw image with built-in functions"""
    try:
        # Get basic info
        width = raw_image.get_width()
        height = raw_image.get_height()
        pixel_format = raw_image.get_pixel_format()

        # Convert pixel format if needed
        if pixel_format == gx.GxPixelFormatEntry.BAYER_RG8:
            # Convert Bayer to RGB
            output_image = color_converter.convert(raw_image, 
                                                  gx.GxPixelFormatEntry.RGB8)
            return output_image.get_numpy_array()
        else:
            return raw_image.get_numpy_array()

    except Exception as e:
        print(f"Image processing error: {e}")
        return None

# Apply processing during acquisition
cam.stream_on()
for i in range(10):
    raw_image = cam.data_stream[0].get_image()
    if raw_image:
        processed_image = process_raw_image(raw_image)
        if processed_image is not None:
            print(f"Processed image {i}: {processed_image.shape}")

Callback-based High-Speed Acquisition

import threading
import queue
import time

class HighSpeedAcquisition:
    def __init__(self, camera):
        self.camera = camera
        self.image_queue = queue.Queue(maxsize=100)
        self.acquisition_active = False
        self.stats = {
            'frames_received': 0,
            'frames_processed': 0,
            'frames_dropped': 0,
            'start_time': None
        }

    def image_callback(self, raw_image):
        """High-speed image callback"""
        self.stats['frames_received'] += 1

        try:
            # Non-blocking queue put
            self.image_queue.put_nowait({
                'image': raw_image.get_numpy_array(),
                'timestamp': raw_image.get_timestamp(),
                'frame_id': raw_image.get_frame_id()
            })
        except queue.Full:
            self.stats['frames_dropped'] += 1

    def processing_thread(self):
        """Separate thread for image processing"""
        while self.acquisition_active:
            try:
                # Get image with timeout
                image_data = self.image_queue.get(timeout=1.0)

                # Process image here
                self.process_image(image_data)
                self.stats['frames_processed'] += 1

            except queue.Empty:
                continue

    def process_image(self, image_data):
        """Override this method for custom processing"""
        # Example: Calculate mean intensity
        mean_intensity = image_data['image'].mean()
        print(f"Frame {image_data['frame_id']}: Mean={mean_intensity:.1f}")

    def start_acquisition(self):
        """Start high-speed acquisition"""
        self.stats['start_time'] = time.time()
        self.acquisition_active = True

        # Start processing thread
        self.process_thread = threading.Thread(target=self.processing_thread)
        self.process_thread.start()

        # Register callback and start camera
        self.camera.data_stream[0].register_capture_callback(self.image_callback)
        self.camera.stream_on()

        print("High-speed acquisition started")

    def stop_acquisition(self):
        """Stop acquisition and cleanup"""
        self.camera.stream_off()
        self.camera.data_stream[0].unregister_capture_callback()

        self.acquisition_active = False
        self.process_thread.join()

        # Print statistics
        duration = time.time() - self.stats['start_time']
        fps = self.stats['frames_received'] / duration

        print(f"Acquisition stopped:")
        print(f"  Duration: {duration:.1f}s")
        print(f"  Frames received: {self.stats['frames_received']}")
        print(f"  Frames processed: {self.stats['frames_processed']}")
        print(f"  Frames dropped: {self.stats['frames_dropped']}")
        print(f"  Average FPS: {fps:.1f}")

# Usage
high_speed_acq = HighSpeedAcquisition(cam)
high_speed_acq.start_acquisition()
time.sleep(10)  # Run for 10 seconds
high_speed_acq.stop_acquisition()

Error Handling and Best Practices

Exception Handling

import gxipy as gx

def safe_camera_operation():
    device_manager = None
    cam = None

    try:
        # Initialize device manager
        device_manager = gx.DeviceManager()

        # Update device list with error handling
        try:
            device_num, device_info_list = device_manager.update_device_list(timeout=5000)
            if device_num == 0:
                raise RuntimeError("No cameras found")
        except Exception as e:
            print(f"Device enumeration failed: {e}")
            return False

        # Connect to camera
        try:
            cam = device_manager.open_device_by_index(1)
            print("Camera connected successfully")
        except gx.InvalidAccess:
            print("Camera is already in use by another application")
            return False
        except gx.NotFoundDevice:
            print("Camera not found or disconnected")
            return False
        except Exception as e:
            print(f"Camera connection failed: {e}")
            return False

        # Configure camera
        try:
            cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
            cam.ExposureTime.set(10000)
            print("Camera configured successfully")
        except gx.InvalidParameter as e:
            print(f"Invalid parameter: {e}")
            return False
        except Exception as e:
            print(f"Configuration failed: {e}")
            return False

        # Start acquisition
        try:
            cam.stream_on()
            print("Acquisition started")

            # Capture images
            for i in range(5):
                raw_image = cam.data_stream[0].get_image(timeout=2000)
                if raw_image is None:
                    print(f"Timeout on image {i}")
                    continue

                numpy_image = raw_image.get_numpy_array()
                print(f"Image {i}: {numpy_image.shape}")

        except Exception as e:
            print(f"Acquisition failed: {e}")
            return False

        return True

    except Exception as e:
        print(f"Unexpected error: {e}")
        return False

    finally:
        # Cleanup - always executed
        if cam is not None:
            try:
                cam.stream_off()
                cam.close_device()
                print("Camera closed successfully")
            except Exception as e:
                print(f"Camera cleanup failed: {e}")

# Run safe operation
success = safe_camera_operation()
print(f"Operation {'succeeded' if success else 'failed'}")

Best Practices

1. Resource Management

# Use context managers when possible
class CameraContext:
    def __init__(self, device_manager, camera_index):
        self.device_manager = device_manager
        self.camera_index = camera_index
        self.camera = None

    def __enter__(self):
        self.camera = self.device_manager.open_device_by_index(self.camera_index)
        return self.camera

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.camera:
            try:
                self.camera.stream_off()
            except:
                pass
            self.camera.close_device()

# Usage
device_manager = gx.DeviceManager()
device_manager.update_device_list()

with CameraContext(device_manager, 1) as cam:
    # Camera operations here
    cam.stream_on()
    image = cam.data_stream[0].get_image()
    # Automatic cleanup when exiting block

2. Parameter Validation

def set_safe_exposure(camera, exposure_us):
    """Safely set exposure time with validation"""
    try:
        if not camera.ExposureTime.is_implemented():
            print("Exposure time not supported")
            return False

        min_exp = camera.ExposureTime.get_min()
        max_exp = camera.ExposureTime.get_max()

        if exposure_us < min_exp:
            print(f"Exposure too low, setting to minimum: {min_exp}")
            exposure_us = min_exp
        elif exposure_us > max_exp:
            print(f"Exposure too high, setting to maximum: {max_exp}")
            exposure_us = max_exp

        camera.ExposureTime.set(exposure_us)
        actual_exp = camera.ExposureTime.get()
        print(f"Exposure set to: {actual_exp} μs")
        return True

    except Exception as e:
        print(f"Failed to set exposure: {e}")
        return False

3. Performance Monitoring

class PerformanceMonitor:
    def __init__(self):
        self.reset()

    def reset(self):
        self.frame_count = 0
        self.start_time = time.time()
        self.last_fps_time = self.start_time
        self.fps_window = []

    def update(self):
        self.frame_count += 1
        current_time = time.time()

        # Calculate FPS every second
        if current_time - self.last_fps_time >= 1.0:
            fps = self.frame_count / (current_time - self.start_time)
            self.fps_window.append(fps)

            # Keep only last 10 FPS measurements
            if len(self.fps_window) > 10:
                self.fps_window.pop(0)

            avg_fps = sum(self.fps_window) / len(self.fps_window)
            print(f"FPS: {fps:.1f}, Average: {avg_fps:.1f}")

            self.last_fps_time = current_time

# Usage in acquisition loop
monitor = PerformanceMonitor()
cam.stream_on()

for i in range(100):
    raw_image = cam.data_stream[0].get_image()
    if raw_image:
        monitor.update()
        # Process image...

Complete Working Example

#!/usr/bin/env python3
"""
Complete GxiPy Camera Application Example
Demonstrates professional camera control with error handling and monitoring
"""

import gxipy as gx
import numpy as np
from PIL import Image
import time
import sys
import os
from datetime import datetime

class DahengCamera:
    def __init__(self):
        self.device_manager = None
        self.camera = None
        self.is_acquiring = False
        self.image_count = 0

    def initialize(self):
        """Initialize the camera system"""
        try:
            print("Initializing Daheng camera system...")
            self.device_manager = gx.DeviceManager()
            return True
        except Exception as e:
            print(f"Initialization failed: {e}")
            return False

    def discover_cameras(self):
        """Discover available cameras"""
        try:
            device_num, device_info_list = self.device_manager.update_device_list(timeout=3000)

            if device_num == 0:
                print("No cameras found!")
                return []

            print(f"\nFound {device_num} camera(s):")
            for i, device_info in enumerate(device_info_list):
                print(f"  [{i+1}] {device_info.get('model_name', 'Unknown')}")
                print(f"      Serial: {device_info.get('sn', 'Unknown')}")
                print(f"      IP: {device_info.get('ip', 'N/A')}")
                print(f"      Status: {device_info.get('access_status', 'Unknown')}")

            return device_info_list

        except Exception as e:
            print(f"Camera discovery failed: {e}")
            return []

    def connect_camera(self, index=1):
        """Connect to camera by index"""
        try:
            print(f"Connecting to camera {index}...")
            self.camera = self.device_manager.open_device_by_index(index)

            # Get camera information
            model = self.camera.DeviceModelName.get()
            serial = self.camera.DeviceSerialNumber.get()
            print(f"Connected to: {model} (S/N: {serial})")

            return True

        except Exception as e:
            print(f"Camera connection failed: {e}")
            return False

    def configure_camera(self, exposure_us=10000, gain_db=0.0):
        """Configure camera settings"""
        try:
            print("Configuring camera...")

            # Set pixel format
            self.camera.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)

            # Configure exposure
            self.camera.ExposureAuto.set(gx.GxAutoEntry.OFF)
            exp_min = self.camera.ExposureTime.get_min()
            exp_max = self.camera.ExposureTime.get_max()
            exposure_us = max(exp_min, min(exp_max, exposure_us))
            self.camera.ExposureTime.set(exposure_us)

            # Configure gain if available
            if self.camera.Gain.is_implemented():
                self.camera.GainAuto.set(gx.GxAutoEntry.OFF)
                gain_min = self.camera.Gain.get_min()
                gain_max = self.camera.Gain.get_max()
                gain_db = max(gain_min, min(gain_max, gain_db))
                self.camera.Gain.set(gain_db)
                print(f"Gain set to: {self.camera.Gain.get():.1f} dB")

            # Set acquisition mode
            self.camera.TriggerMode.set(gx.GxTriggerModeEntry.OFF)
            self.camera.AcquisitionMode.set(gx.GxAcquisitionModeEntry.CONTINUOUS)

            # Get final settings
            actual_exp = self.camera.ExposureTime.get()
            width = self.camera.Width.get()
            height = self.camera.Height.get()

            print(f"Configuration complete:")
            print(f"  Resolution: {width}x{height}")
            print(f"  Exposure: {actual_exp:.0f} μs")
            print(f"  Pixel Format: MONO8")

            return True

        except Exception as e:
            print(f"Camera configuration failed: {e}")
            return False

    def start_acquisition(self):
        """Start image acquisition"""
        try:
            print("Starting image acquisition...")
            self.camera.stream_on()
            self.is_acquiring = True
            self.image_count = 0
            print("Acquisition started successfully")
            return True

        except Exception as e:
            print(f"Failed to start acquisition: {e}")
            return False

    def capture_image(self, timeout_ms=2000):
        """Capture a single image"""
        if not self.is_acquiring:
            print("Acquisition not started!")
            return None

        try:
            raw_image = self.camera.data_stream[0].get_image(timeout=timeout_ms)
            if raw_image is None:
                print("Image capture timeout")
                return None

            # Get image data and metadata
            numpy_image = raw_image.get_numpy_array()
            frame_id = raw_image.get_frame_id()
            timestamp = raw_image.get_timestamp()

            self.image_count += 1

            return {
                'image': numpy_image,
                'frame_id': frame_id,
                'timestamp': timestamp,
                'capture_id': self.image_count
            }

        except Exception as e:
            print(f"Image capture failed: {e}")
            return None

    def save_image(self, image_data, filename=None):
        """Save image to file"""
        try:
            if filename is None:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                filename = f"image_{timestamp}_{image_data['capture_id']:04d}.jpg"

            # Convert numpy array to PIL Image and save
            pil_image = Image.fromarray(image_data['image'], mode='L')
            pil_image.save(filename)

            print(f"Image saved: {filename}")
            return filename

        except Exception as e:
            print(f"Failed to save image: {e}")
            return None

    def continuous_capture(self, duration_seconds=10, save_interval=1):
        """Capture images continuously for specified duration"""
        print(f"Starting continuous capture for {duration_seconds} seconds...")

        start_time = time.time()
        last_save_time = start_time

        while time.time() - start_time < duration_seconds:
            image_data = self.capture_image()
            if image_data:
                current_time = time.time()

                # Print status
                elapsed = current_time - start_time
                fps = self.image_count / elapsed if elapsed > 0 else 0
                print(f"Frame {image_data['capture_id']}: "
                      f"{image_data['image'].shape} "
                      f"(FPS: {fps:.1f})")

                # Save image at specified interval
                if current_time - last_save_time >= save_interval:
                    self.save_image(image_data)
                    last_save_time = current_time

        print(f"Continuous capture completed. Total frames: {self.image_count}")

    def stop_acquisition(self):
        """Stop image acquisition"""
        try:
            if self.is_acquiring:
                print("Stopping acquisition...")
                self.camera.stream_off()
                self.is_acquiring = False
                print("Acquisition stopped")

        except Exception as e:
            print(f"Failed to stop acquisition: {e}")

    def cleanup(self):
        """Cleanup camera resources"""
        try:
            self.stop_acquisition()

            if self.camera:
                self.camera.close_device()
                print("Camera disconnected")

        except Exception as e:
            print(f"Cleanup error: {e}")

def main():
    """Main application function"""
    print("=== Daheng Camera Application ===\n")

    # Initialize camera
    camera = DahengCamera()

    if not camera.initialize():
        sys.exit(1)

    # Discover cameras
    devices = camera.discover_cameras()
    if not devices:
        sys.exit(1)

    # Connect to first camera
    if not camera.connect_camera(1):
        sys.exit(1)

    # Configure camera
    if not camera.configure_camera(exposure_us=20000, gain_db=5.0):
        camera.cleanup()
        sys.exit(1)

    # Start acquisition
    if not camera.start_acquisition():
        camera.cleanup()
        sys.exit(1)

    try:
        # Capture some individual images
        print("\nCapturing 5 individual images...")
        for i in range(5):
            image_data = camera.capture_image()
            if image_data:
                print(f"Captured image {i+1}: {image_data['image'].shape}")
                if i == 0:  # Save first image
                    camera.save_image(image_data, "sample_image.jpg")
            time.sleep(0.5)

        # Continuous capture
        print("\nStarting continuous capture...")
        camera.continuous_capture(duration_seconds=5, save_interval=2)

    except KeyboardInterrupt:
        print("\nInterrupted by user")

    finally:
        # Always cleanup
        camera.cleanup()
        print("Application finished")

if __name__ == "__main__":
    main()

Performance Optimization

Buffer Management

# Configure acquisition buffer count for high-speed applications
def configure_high_speed_buffers(camera, buffer_count=10):
    """Configure buffers for high-speed acquisition"""
    try:
        # Stop acquisition if running
        camera.stream_off()

        # Set buffer count (if supported)
        stream = camera.data_stream[0]
        if hasattr(stream, 'set_acquisition_buffer_number'):
            stream.set_acquisition_buffer_number(buffer_count)
            print(f"Buffer count set to: {buffer_count}")

        # Configure payload size
        payload_size = camera.PayloadSize.get()
        print(f"Payload size: {payload_size} bytes")

        return True

    except Exception as e:
        print(f"Buffer configuration failed: {e}")
        return False

Memory Management

# Efficient memory handling for continuous acquisition
def efficient_acquisition_loop(camera, duration=10):
    """Memory-efficient acquisition loop"""
    start_time = time.time()
    frame_count = 0

    # Pre-allocate memory for statistics
    stats_buffer = []

    while time.time() - start_time < duration:
        raw_image = camera.data_stream[0].get_image(timeout=1000)

        if raw_image is not None:
            # Process without copying when possible
            image_stats = {
                'frame_id': raw_image.get_frame_id(),
                'timestamp': raw_image.get_timestamp(),
                'mean_intensity': raw_image.get_numpy_array().mean(),
            }

            stats_buffer.append(image_stats)
            frame_count += 1

            # Limit stats buffer size
            if len(stats_buffer) > 1000:
                stats_buffer = stats_buffer[-500:]  # Keep last 500

    print(f"Processed {frame_count} frames efficiently")
    return stats_buffer

Troubleshooting

Common Issues and Solutions

Camera Not Found

def diagnose_camera_connection():
    """Diagnose camera connection issues"""
    device_manager = gx.DeviceManager()

    print("Diagnosing camera connection...")

    # Check SDK installation
    try:
        device_num, _ = device_manager.update_device_list(timeout=1000)
        print(f"✓ SDK initialized successfully")
    except Exception as e:
        print(f"✗ SDK initialization failed: {e}")
        return

    if device_num == 0:
        print("✗ No cameras detected")
        print("Troubleshooting steps:")
        print("  1. Check camera power and connections")
        print("  2. Verify network settings (for GigE cameras)")
        print("  3. Check USB cable and ports (for USB cameras)")
        print("  4. Ensure camera drivers are installed")
        return

    print(f"✓ Found {device_num} camera(s)")

Performance Issues

def diagnose_performance():
    """Check system performance for camera applications"""
    import psutil

    print("System performance check:")

    # CPU usage
    cpu_percent = psutil.cpu_percent(interval=1)
    print(f"CPU Usage: {cpu_percent}%")

    # Memory usage
    memory = psutil.virtual_memory()
    print(f"Memory Usage: {memory.percent}% ({memory.available // 1024**2} MB available)")

    # Disk usage for image storage
    disk = psutil.disk_usage('.')
    print(f"Disk Space: {disk.percent}% used ({disk.free // 1024**3} GB free)")

    # Network interfaces (for GigE cameras)
    net_interfaces = psutil.net_if_stats()
    print("Network Interfaces:")
    for interface, stats in net_interfaces.items():
        if stats.isup:
            print(f"  {interface}: {'UP' if stats.isup else 'DOWN'}")

Error Code Reference

  • InvalidAccess: Camera already in use or insufficient permissions
  • NotFoundDevice: Camera disconnected or not available
  • Timeout: Operation timed out, check network/USB connection
  • InvalidParameter: Parameter value out of range or not supported
  • NotImplemented: Feature not available on this camera model

API Reference Summary

DeviceManager Methods

  • update_device_list(timeout) – Discover cameras
  • open_device_by_index(index) – Connect by index
  • open_device_by_sn(serial) – Connect by serial number
  • open_device_by_ip(ip) – Connect by IP address
  • create_image_process() – Create image processor
  • create_image_format_convert() – Create format converter

Device Properties

  • Image Format: Width, Height, OffsetX, OffsetY, PixelFormat
  • Exposure: ExposureTime, ExposureAuto, ExposureMode
  • Gain: Gain, GainAuto
  • Triggering: TriggerMode, TriggerSource, TriggerActivation
  • Acquisition: AcquisitionMode, AcquisitionFrameRate

DataStream Methods

  • get_image(timeout) – Get single image
  • register_capture_callback(callback) – Register callback
  • unregister_capture_callback() – Remove callback

Image Object Methods

  • get_numpy_array() – Convert to NumPy array
  • get_width(), get_height() – Image dimensions
  • get_pixel_format() – Pixel format
  • get_frame_id() – Frame counter
  • get_timestamp() – Image timestamp

This guide provides comprehensive coverage of the gxipy library for professional camera application development. For the latest API documentation and SDK updates, please refer to the official Daheng Imaging documentation included with your Galaxy SDK installation.

Comments

No comments yet. Why don’t you start the discussion?

Leave a Reply