Table of Contents
- Overview
- System Requirements
- Installation and Setup
- Basic Concepts
- Quick Start Guide
- Camera Discovery and Connection
- Image Acquisition Fundamentals
- Camera Configuration
- Advanced Features
- Error Handling and Best Practices
- Complete Working Example
- Performance Optimization
- Troubleshooting
- API Reference Summary
Overview
The gxipy library is the Python interface for Daheng Imaging’s Galaxy SDK, providing comprehensive control over Daheng Imaging cameras. This library enables developers to integrate machine vision cameras into their Python applications with full access to camera features, image acquisition, and advanced processing capabilities.
Key Features
- Multi-interface Support: GigE Vision, USB3 Vision, USB2.0, CXP
- Real-time Image Acquisition: High-speed streaming with callback mechanisms
- Comprehensive Camera Control: Exposure, gain, triggering, ROI, pixel format
- Image Processing: Built-in color correction, format conversion, and enhancement
- Multi-camera Support: Simultaneous control of multiple cameras
- Cross-platform: Windows and Linux support
System Requirements
Hardware Requirements
- Operating System: Windows 7/8/10/11 (32/64-bit) or Linux Ubuntu 16.04+
- Memory: Minimum 4GB RAM (8GB+ recommended for multi-camera applications)
- USB: USB 3.0 ports for USB3 Vision cameras
- Network: Gigabit Ethernet for GigE Vision cameras
- CPU: Intel Core i5 or equivalent (multi-core recommended)
Software Requirements
- Python: 3.6 or higher
- Galaxy SDK: Must be installed before using gxipy
- Dependencies: NumPy, PIL (for image processing examples)
Installation and Setup
Step 1: Install Galaxy SDK
- Download the Galaxy SDK from Daheng Imaging’s official website
- Run the installer with administrator privileges
- Ensure all camera drivers are properly installed
Step 2: Install Python Dependencies
pip install numpy pillow
Step 3: Verify gxipy Installation
The gxipy library is typically located in the SDK installation directory:
- Windows:
C:\Program Files\Daheng Imaging\GalaxySDK\Development\Samples\Python\gxipy
- Linux:
Galaxy_Linux_Python_X.X.X.X/api/gxipy
Step 4: Setup Python Path
Add the gxipy directory to your Python path or copy it to your project directory.
Basic Concepts
DeviceManager
The DeviceManager is the central class for camera discovery and management:
- Handles SDK initialization and cleanup
- Enumerates available cameras
- Creates device connections
- Manages multiple cameras
Device Classes
Different camera interfaces have specialized device classes:
- GEVDevice: GigE Vision cameras
- U3VDevice: USB3 Vision cameras
- U2Device: USB2 cameras
- Device: Base class for all devices
DataStream
Handles image acquisition and buffering:
- Manages acquisition buffers
- Provides callback mechanisms
- Controls streaming parameters
Image Processing
Built-in processing capabilities:
- ImageFormatConvert: Pixel format conversion
- ImageProcess: Color correction and enhancement
- ImageProcessConfig: Processing parameter management
Quick Start Guide
Here’s a minimal example to capture an image:
import gxipy as gx
import numpy as np
from PIL import Image
# Initialize device manager
device_manager = gx.DeviceManager()
# Update device list
device_num, device_info_list = device_manager.update_device_list()
if device_num == 0:
print("No camera found")
exit()
# Open first camera
cam = device_manager.open_device_by_index(1)
# Start acquisition
cam.stream_on()
# Get an image
raw_image = cam.data_stream[0].get_image()
if raw_image is None:
print("Failed to get image")
else:
# Convert to numpy array
numpy_image = raw_image.get_numpy_array()
print(f"Image captured: {numpy_image.shape}")
# Stop acquisition and cleanup
cam.stream_off()
cam.close_device()
Camera Discovery and Connection
Discovering Cameras
import gxipy as gx
# Initialize device manager
device_manager = gx.DeviceManager()
# Update device list (with timeout)
device_num, device_info_list = device_manager.update_device_list(timeout=2000)
print(f"Found {device_num} cameras:")
for i, device_info in enumerate(device_info_list):
print(f"Camera {i+1}:")
print(f" Model: {device_info.get('model_name', 'Unknown')}")
print(f" Serial: {device_info.get('sn', 'Unknown')}")
print(f" IP: {device_info.get('ip', 'N/A')}")
print(f" Status: {device_info.get('access_status', 'Unknown')}")
Connection Methods
Connect by Index
# Connect to first available camera
cam = device_manager.open_device_by_index(1) # 1-based indexing
Connect by Serial Number
# Connect by serial number (most reliable)
cam = device_manager.open_device_by_sn("SERIAL_NUMBER_HERE")
Connect by IP Address (GigE only)
# Connect by IP address
cam = device_manager.open_device_by_ip("192.168.1.100")
Connect by User-defined Name
# Connect by user ID
cam = device_manager.open_device_by_user_id("Camera1")
Connection Parameters
# Open with specific access mode
cam = device_manager.open_device_by_index(1,
access_mode=gx.GxAccessMode.EXCLUSIVE)
Image Acquisition Fundamentals
Basic Acquisition Setup
# Configure basic acquisition parameters
cam.TriggerMode.set(gx.GxTriggerModeEntry.OFF) # Free-running mode
cam.AcquisitionMode.set(gx.GxAcquisitionModeEntry.CONTINUOUS)
# Set exposure time (microseconds)
cam.ExposureTime.set(10000) # 10ms
# Set pixel format
cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
# Start acquisition
cam.stream_on()
Image Acquisition Methods
Method 1: Synchronous Acquisition
# Get single image (blocking)
raw_image = cam.data_stream[0].get_image(timeout=1000) # 1 second timeout
if raw_image is None:
print("Timeout: No image received")
else:
numpy_array = raw_image.get_numpy_array()
print(f"Image size: {numpy_array.shape}")
Method 2: Asynchronous Acquisition with Callbacks
def image_callback(raw_image):
"""Callback function for image processing"""
numpy_array = raw_image.get_numpy_array()
print(f"Callback: Image {numpy_array.shape} received")
# Process image here
# Register callback
cam.data_stream[0].register_capture_callback(image_callback)
# Start acquisition
cam.stream_on()
# Let it run for a while
import time
time.sleep(5)
# Stop and cleanup
cam.data_stream[0].unregister_capture_callback()
cam.stream_off()
Image Data Handling
# Get image and metadata
raw_image = cam.data_stream[0].get_image()
if raw_image is not None:
# Get image properties
width = raw_image.get_width()
height = raw_image.get_height()
pixel_format = raw_image.get_pixel_format()
frame_id = raw_image.get_frame_id()
timestamp = raw_image.get_timestamp()
print(f"Image: {width}x{height}, Format: {pixel_format}")
print(f"Frame ID: {frame_id}, Timestamp: {timestamp}")
# Convert to numpy array
numpy_image = raw_image.get_numpy_array()
# Save as image file
if pixel_format in [gx.GxPixelFormatEntry.MONO8]:
img = Image.fromarray(numpy_image, mode='L')
elif pixel_format in [gx.GxPixelFormatEntry.BGR8]:
img = Image.fromarray(numpy_image, mode='RGB')
else:
print(f"Unsupported pixel format for direct save: {pixel_format}")
img = None
if img:
img.save("captured_image.jpg")
Camera Configuration
Essential Camera Settings
Exposure Control
# Manual exposure
cam.ExposureAuto.set(gx.GxAutoEntry.OFF)
cam.ExposureTime.set(50000) # 50ms in microseconds
# Auto exposure
cam.ExposureAuto.set(gx.GxAutoEntry.CONTINUOUS)
# Get exposure range
exp_min = cam.ExposureTime.get_min()
exp_max = cam.ExposureTime.get_max()
print(f"Exposure range: {exp_min} - {exp_max} microseconds")
Gain Control
# Check if gain is available
if cam.Gain.is_implemented():
# Set manual gain
cam.GainAuto.set(gx.GxAutoEntry.OFF)
cam.Gain.set(10.0) # dB
# Auto gain
cam.GainAuto.set(gx.GxAutoEntry.CONTINUOUS)
Region of Interest (ROI)
# Get sensor dimensions
sensor_width = cam.SensorWidth.get()
sensor_height = cam.SensorHeight.get()
max_width = cam.WidthMax.get()
max_height = cam.HeightMax.get()
print(f"Sensor: {sensor_width}x{sensor_height}")
print(f"Max ROI: {max_width}x{max_height}")
# Set ROI (ensure values are within bounds and follow increments)
roi_x = 100
roi_y = 100
roi_width = 800
roi_height = 600
# Check and align to increments
width_inc = cam.Width.get_inc()
height_inc = cam.Height.get_inc()
x_inc = cam.OffsetX.get_inc()
y_inc = cam.OffsetY.get_inc()
# Align values
roi_x = (roi_x // x_inc) * x_inc
roi_y = (roi_y // y_inc) * y_inc
roi_width = (roi_width // width_inc) * width_inc
roi_height = (roi_height // height_inc) * height_inc
# Set ROI
cam.OffsetX.set(roi_x)
cam.OffsetY.set(roi_y)
cam.Width.set(roi_width)
cam.Height.set(roi_height)
print(f"ROI set to: {roi_x},{roi_y} {roi_width}x{roi_height}")
Pixel Format Configuration
# Check available pixel formats
if cam.PixelFormat.is_implemented():
format_list = cam.PixelFormat.get_enum_description()
print("Available pixel formats:")
for fmt in format_list:
print(f" {fmt}")
# Set specific format
cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
current_format = cam.PixelFormat.get()
print(f"Current format: {current_format}")
Frame Rate Control
# Enable frame rate control
if cam.AcquisitionFrameRateMode.is_implemented():
cam.AcquisitionFrameRateMode.set(gx.GxAcquisitionFrameRateModeEntry.ON)
# Set frame rate
max_fps = cam.AcquisitionFrameRate.get_max()
cam.AcquisitionFrameRate.set(min(30.0, max_fps))
actual_fps = cam.CurrentAcquisitionFrameRate.get()
print(f"Frame rate set to {actual_fps} fps")
Triggering Configuration
Hardware Triggering
# Configure hardware trigger
cam.TriggerMode.set(gx.GxTriggerModeEntry.ON)
cam.TriggerSource.set(gx.GxTriggerSourceEntry.LINE2) # External trigger input
cam.TriggerActivation.set(gx.GxTriggerActivationEntry.RISINGEDGE)
# Optional: Set trigger delay
if cam.TriggerDelay.is_implemented():
cam.TriggerDelay.set(100.0) # 100 microseconds delay
print("Hardware trigger configured")
Software Triggering
# Configure software trigger
cam.TriggerMode.set(gx.GxTriggerModeEntry.ON)
cam.TriggerSource.set(gx.GxTriggerSourceEntry.SOFTWARE)
# Start acquisition
cam.stream_on()
# Send software trigger
cam.TriggerSoftware.send_command()
# Get triggered image
raw_image = cam.data_stream[0].get_image(timeout=5000)
if raw_image:
print("Software trigger successful")
Advanced Features
Multi-Camera Setup
class MultiCameraManager:
def __init__(self):
self.device_manager = gx.DeviceManager()
self.cameras = []
def discover_cameras(self):
device_num, device_info_list = self.device_manager.update_device_list()
print(f"Found {device_num} cameras")
return device_info_list
def connect_cameras(self, serial_numbers):
"""Connect to cameras by serial number"""
for sn in serial_numbers:
try:
cam = self.device_manager.open_device_by_sn(sn)
self.cameras.append({
'camera': cam,
'serial': sn,
'active': False
})
print(f"Connected to camera: {sn}")
except Exception as e:
print(f"Failed to connect to camera {sn}: {e}")
def configure_all_cameras(self):
"""Configure all cameras with same settings"""
for cam_info in self.cameras:
cam = cam_info['camera']
try:
# Basic configuration
cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
cam.ExposureTime.set(10000)
cam.TriggerMode.set(gx.GxTriggerModeEntry.OFF)
cam.AcquisitionMode.set(gx.GxAcquisitionModeEntry.CONTINUOUS)
print(f"Configured camera: {cam_info['serial']}")
except Exception as e:
print(f"Configuration failed for {cam_info['serial']}: {e}")
def start_all_acquisition(self):
"""Start acquisition on all cameras"""
for cam_info in self.cameras:
try:
cam_info['camera'].stream_on()
cam_info['active'] = True
print(f"Started acquisition: {cam_info['serial']}")
except Exception as e:
print(f"Start failed for {cam_info['serial']}: {e}")
def capture_from_all(self, timeout=1000):
"""Capture images from all cameras"""
images = {}
for cam_info in self.cameras:
if cam_info['active']:
try:
raw_image = cam_info['camera'].data_stream[0].get_image(timeout)
if raw_image:
images[cam_info['serial']] = raw_image.get_numpy_array()
except Exception as e:
print(f"Capture failed for {cam_info['serial']}: {e}")
return images
def cleanup(self):
"""Stop all cameras and cleanup"""
for cam_info in self.cameras:
try:
if cam_info['active']:
cam_info['camera'].stream_off()
cam_info['camera'].close_device()
except Exception as e:
print(f"Cleanup failed for {cam_info['serial']}: {e}")
# Usage example
multi_cam = MultiCameraManager()
devices = multi_cam.discover_cameras()
serial_numbers = [dev['sn'] for dev in devices[:2]] # Use first 2 cameras
multi_cam.connect_cameras(serial_numbers)
multi_cam.configure_all_cameras()
multi_cam.start_all_acquisition()
# Capture synchronized images
images = multi_cam.capture_from_all()
for serial, image in images.items():
print(f"Camera {serial}: Image shape {image.shape}")
multi_cam.cleanup()
Image Processing Integration
# Create image processing objects
image_process = device_manager.create_image_process()
color_converter = device_manager.create_image_format_convert()
def process_raw_image(raw_image):
"""Process raw image with built-in functions"""
try:
# Get basic info
width = raw_image.get_width()
height = raw_image.get_height()
pixel_format = raw_image.get_pixel_format()
# Convert pixel format if needed
if pixel_format == gx.GxPixelFormatEntry.BAYER_RG8:
# Convert Bayer to RGB
output_image = color_converter.convert(raw_image,
gx.GxPixelFormatEntry.RGB8)
return output_image.get_numpy_array()
else:
return raw_image.get_numpy_array()
except Exception as e:
print(f"Image processing error: {e}")
return None
# Apply processing during acquisition
cam.stream_on()
for i in range(10):
raw_image = cam.data_stream[0].get_image()
if raw_image:
processed_image = process_raw_image(raw_image)
if processed_image is not None:
print(f"Processed image {i}: {processed_image.shape}")
Callback-based High-Speed Acquisition
import threading
import queue
import time
class HighSpeedAcquisition:
def __init__(self, camera):
self.camera = camera
self.image_queue = queue.Queue(maxsize=100)
self.acquisition_active = False
self.stats = {
'frames_received': 0,
'frames_processed': 0,
'frames_dropped': 0,
'start_time': None
}
def image_callback(self, raw_image):
"""High-speed image callback"""
self.stats['frames_received'] += 1
try:
# Non-blocking queue put
self.image_queue.put_nowait({
'image': raw_image.get_numpy_array(),
'timestamp': raw_image.get_timestamp(),
'frame_id': raw_image.get_frame_id()
})
except queue.Full:
self.stats['frames_dropped'] += 1
def processing_thread(self):
"""Separate thread for image processing"""
while self.acquisition_active:
try:
# Get image with timeout
image_data = self.image_queue.get(timeout=1.0)
# Process image here
self.process_image(image_data)
self.stats['frames_processed'] += 1
except queue.Empty:
continue
def process_image(self, image_data):
"""Override this method for custom processing"""
# Example: Calculate mean intensity
mean_intensity = image_data['image'].mean()
print(f"Frame {image_data['frame_id']}: Mean={mean_intensity:.1f}")
def start_acquisition(self):
"""Start high-speed acquisition"""
self.stats['start_time'] = time.time()
self.acquisition_active = True
# Start processing thread
self.process_thread = threading.Thread(target=self.processing_thread)
self.process_thread.start()
# Register callback and start camera
self.camera.data_stream[0].register_capture_callback(self.image_callback)
self.camera.stream_on()
print("High-speed acquisition started")
def stop_acquisition(self):
"""Stop acquisition and cleanup"""
self.camera.stream_off()
self.camera.data_stream[0].unregister_capture_callback()
self.acquisition_active = False
self.process_thread.join()
# Print statistics
duration = time.time() - self.stats['start_time']
fps = self.stats['frames_received'] / duration
print(f"Acquisition stopped:")
print(f" Duration: {duration:.1f}s")
print(f" Frames received: {self.stats['frames_received']}")
print(f" Frames processed: {self.stats['frames_processed']}")
print(f" Frames dropped: {self.stats['frames_dropped']}")
print(f" Average FPS: {fps:.1f}")
# Usage
high_speed_acq = HighSpeedAcquisition(cam)
high_speed_acq.start_acquisition()
time.sleep(10) # Run for 10 seconds
high_speed_acq.stop_acquisition()
Error Handling and Best Practices
Exception Handling
import gxipy as gx
def safe_camera_operation():
device_manager = None
cam = None
try:
# Initialize device manager
device_manager = gx.DeviceManager()
# Update device list with error handling
try:
device_num, device_info_list = device_manager.update_device_list(timeout=5000)
if device_num == 0:
raise RuntimeError("No cameras found")
except Exception as e:
print(f"Device enumeration failed: {e}")
return False
# Connect to camera
try:
cam = device_manager.open_device_by_index(1)
print("Camera connected successfully")
except gx.InvalidAccess:
print("Camera is already in use by another application")
return False
except gx.NotFoundDevice:
print("Camera not found or disconnected")
return False
except Exception as e:
print(f"Camera connection failed: {e}")
return False
# Configure camera
try:
cam.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
cam.ExposureTime.set(10000)
print("Camera configured successfully")
except gx.InvalidParameter as e:
print(f"Invalid parameter: {e}")
return False
except Exception as e:
print(f"Configuration failed: {e}")
return False
# Start acquisition
try:
cam.stream_on()
print("Acquisition started")
# Capture images
for i in range(5):
raw_image = cam.data_stream[0].get_image(timeout=2000)
if raw_image is None:
print(f"Timeout on image {i}")
continue
numpy_image = raw_image.get_numpy_array()
print(f"Image {i}: {numpy_image.shape}")
except Exception as e:
print(f"Acquisition failed: {e}")
return False
return True
except Exception as e:
print(f"Unexpected error: {e}")
return False
finally:
# Cleanup - always executed
if cam is not None:
try:
cam.stream_off()
cam.close_device()
print("Camera closed successfully")
except Exception as e:
print(f"Camera cleanup failed: {e}")
# Run safe operation
success = safe_camera_operation()
print(f"Operation {'succeeded' if success else 'failed'}")
Best Practices
1. Resource Management
# Use context managers when possible
class CameraContext:
def __init__(self, device_manager, camera_index):
self.device_manager = device_manager
self.camera_index = camera_index
self.camera = None
def __enter__(self):
self.camera = self.device_manager.open_device_by_index(self.camera_index)
return self.camera
def __exit__(self, exc_type, exc_val, exc_tb):
if self.camera:
try:
self.camera.stream_off()
except:
pass
self.camera.close_device()
# Usage
device_manager = gx.DeviceManager()
device_manager.update_device_list()
with CameraContext(device_manager, 1) as cam:
# Camera operations here
cam.stream_on()
image = cam.data_stream[0].get_image()
# Automatic cleanup when exiting block
2. Parameter Validation
def set_safe_exposure(camera, exposure_us):
"""Safely set exposure time with validation"""
try:
if not camera.ExposureTime.is_implemented():
print("Exposure time not supported")
return False
min_exp = camera.ExposureTime.get_min()
max_exp = camera.ExposureTime.get_max()
if exposure_us < min_exp:
print(f"Exposure too low, setting to minimum: {min_exp}")
exposure_us = min_exp
elif exposure_us > max_exp:
print(f"Exposure too high, setting to maximum: {max_exp}")
exposure_us = max_exp
camera.ExposureTime.set(exposure_us)
actual_exp = camera.ExposureTime.get()
print(f"Exposure set to: {actual_exp} μs")
return True
except Exception as e:
print(f"Failed to set exposure: {e}")
return False
3. Performance Monitoring
class PerformanceMonitor:
def __init__(self):
self.reset()
def reset(self):
self.frame_count = 0
self.start_time = time.time()
self.last_fps_time = self.start_time
self.fps_window = []
def update(self):
self.frame_count += 1
current_time = time.time()
# Calculate FPS every second
if current_time - self.last_fps_time >= 1.0:
fps = self.frame_count / (current_time - self.start_time)
self.fps_window.append(fps)
# Keep only last 10 FPS measurements
if len(self.fps_window) > 10:
self.fps_window.pop(0)
avg_fps = sum(self.fps_window) / len(self.fps_window)
print(f"FPS: {fps:.1f}, Average: {avg_fps:.1f}")
self.last_fps_time = current_time
# Usage in acquisition loop
monitor = PerformanceMonitor()
cam.stream_on()
for i in range(100):
raw_image = cam.data_stream[0].get_image()
if raw_image:
monitor.update()
# Process image...
Complete Working Example
#!/usr/bin/env python3
"""
Complete GxiPy Camera Application Example
Demonstrates professional camera control with error handling and monitoring
"""
import gxipy as gx
import numpy as np
from PIL import Image
import time
import sys
import os
from datetime import datetime
class DahengCamera:
def __init__(self):
self.device_manager = None
self.camera = None
self.is_acquiring = False
self.image_count = 0
def initialize(self):
"""Initialize the camera system"""
try:
print("Initializing Daheng camera system...")
self.device_manager = gx.DeviceManager()
return True
except Exception as e:
print(f"Initialization failed: {e}")
return False
def discover_cameras(self):
"""Discover available cameras"""
try:
device_num, device_info_list = self.device_manager.update_device_list(timeout=3000)
if device_num == 0:
print("No cameras found!")
return []
print(f"\nFound {device_num} camera(s):")
for i, device_info in enumerate(device_info_list):
print(f" [{i+1}] {device_info.get('model_name', 'Unknown')}")
print(f" Serial: {device_info.get('sn', 'Unknown')}")
print(f" IP: {device_info.get('ip', 'N/A')}")
print(f" Status: {device_info.get('access_status', 'Unknown')}")
return device_info_list
except Exception as e:
print(f"Camera discovery failed: {e}")
return []
def connect_camera(self, index=1):
"""Connect to camera by index"""
try:
print(f"Connecting to camera {index}...")
self.camera = self.device_manager.open_device_by_index(index)
# Get camera information
model = self.camera.DeviceModelName.get()
serial = self.camera.DeviceSerialNumber.get()
print(f"Connected to: {model} (S/N: {serial})")
return True
except Exception as e:
print(f"Camera connection failed: {e}")
return False
def configure_camera(self, exposure_us=10000, gain_db=0.0):
"""Configure camera settings"""
try:
print("Configuring camera...")
# Set pixel format
self.camera.PixelFormat.set(gx.GxPixelFormatEntry.MONO8)
# Configure exposure
self.camera.ExposureAuto.set(gx.GxAutoEntry.OFF)
exp_min = self.camera.ExposureTime.get_min()
exp_max = self.camera.ExposureTime.get_max()
exposure_us = max(exp_min, min(exp_max, exposure_us))
self.camera.ExposureTime.set(exposure_us)
# Configure gain if available
if self.camera.Gain.is_implemented():
self.camera.GainAuto.set(gx.GxAutoEntry.OFF)
gain_min = self.camera.Gain.get_min()
gain_max = self.camera.Gain.get_max()
gain_db = max(gain_min, min(gain_max, gain_db))
self.camera.Gain.set(gain_db)
print(f"Gain set to: {self.camera.Gain.get():.1f} dB")
# Set acquisition mode
self.camera.TriggerMode.set(gx.GxTriggerModeEntry.OFF)
self.camera.AcquisitionMode.set(gx.GxAcquisitionModeEntry.CONTINUOUS)
# Get final settings
actual_exp = self.camera.ExposureTime.get()
width = self.camera.Width.get()
height = self.camera.Height.get()
print(f"Configuration complete:")
print(f" Resolution: {width}x{height}")
print(f" Exposure: {actual_exp:.0f} μs")
print(f" Pixel Format: MONO8")
return True
except Exception as e:
print(f"Camera configuration failed: {e}")
return False
def start_acquisition(self):
"""Start image acquisition"""
try:
print("Starting image acquisition...")
self.camera.stream_on()
self.is_acquiring = True
self.image_count = 0
print("Acquisition started successfully")
return True
except Exception as e:
print(f"Failed to start acquisition: {e}")
return False
def capture_image(self, timeout_ms=2000):
"""Capture a single image"""
if not self.is_acquiring:
print("Acquisition not started!")
return None
try:
raw_image = self.camera.data_stream[0].get_image(timeout=timeout_ms)
if raw_image is None:
print("Image capture timeout")
return None
# Get image data and metadata
numpy_image = raw_image.get_numpy_array()
frame_id = raw_image.get_frame_id()
timestamp = raw_image.get_timestamp()
self.image_count += 1
return {
'image': numpy_image,
'frame_id': frame_id,
'timestamp': timestamp,
'capture_id': self.image_count
}
except Exception as e:
print(f"Image capture failed: {e}")
return None
def save_image(self, image_data, filename=None):
"""Save image to file"""
try:
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"image_{timestamp}_{image_data['capture_id']:04d}.jpg"
# Convert numpy array to PIL Image and save
pil_image = Image.fromarray(image_data['image'], mode='L')
pil_image.save(filename)
print(f"Image saved: {filename}")
return filename
except Exception as e:
print(f"Failed to save image: {e}")
return None
def continuous_capture(self, duration_seconds=10, save_interval=1):
"""Capture images continuously for specified duration"""
print(f"Starting continuous capture for {duration_seconds} seconds...")
start_time = time.time()
last_save_time = start_time
while time.time() - start_time < duration_seconds:
image_data = self.capture_image()
if image_data:
current_time = time.time()
# Print status
elapsed = current_time - start_time
fps = self.image_count / elapsed if elapsed > 0 else 0
print(f"Frame {image_data['capture_id']}: "
f"{image_data['image'].shape} "
f"(FPS: {fps:.1f})")
# Save image at specified interval
if current_time - last_save_time >= save_interval:
self.save_image(image_data)
last_save_time = current_time
print(f"Continuous capture completed. Total frames: {self.image_count}")
def stop_acquisition(self):
"""Stop image acquisition"""
try:
if self.is_acquiring:
print("Stopping acquisition...")
self.camera.stream_off()
self.is_acquiring = False
print("Acquisition stopped")
except Exception as e:
print(f"Failed to stop acquisition: {e}")
def cleanup(self):
"""Cleanup camera resources"""
try:
self.stop_acquisition()
if self.camera:
self.camera.close_device()
print("Camera disconnected")
except Exception as e:
print(f"Cleanup error: {e}")
def main():
"""Main application function"""
print("=== Daheng Camera Application ===\n")
# Initialize camera
camera = DahengCamera()
if not camera.initialize():
sys.exit(1)
# Discover cameras
devices = camera.discover_cameras()
if not devices:
sys.exit(1)
# Connect to first camera
if not camera.connect_camera(1):
sys.exit(1)
# Configure camera
if not camera.configure_camera(exposure_us=20000, gain_db=5.0):
camera.cleanup()
sys.exit(1)
# Start acquisition
if not camera.start_acquisition():
camera.cleanup()
sys.exit(1)
try:
# Capture some individual images
print("\nCapturing 5 individual images...")
for i in range(5):
image_data = camera.capture_image()
if image_data:
print(f"Captured image {i+1}: {image_data['image'].shape}")
if i == 0: # Save first image
camera.save_image(image_data, "sample_image.jpg")
time.sleep(0.5)
# Continuous capture
print("\nStarting continuous capture...")
camera.continuous_capture(duration_seconds=5, save_interval=2)
except KeyboardInterrupt:
print("\nInterrupted by user")
finally:
# Always cleanup
camera.cleanup()
print("Application finished")
if __name__ == "__main__":
main()
Performance Optimization
Buffer Management
# Configure acquisition buffer count for high-speed applications
def configure_high_speed_buffers(camera, buffer_count=10):
"""Configure buffers for high-speed acquisition"""
try:
# Stop acquisition if running
camera.stream_off()
# Set buffer count (if supported)
stream = camera.data_stream[0]
if hasattr(stream, 'set_acquisition_buffer_number'):
stream.set_acquisition_buffer_number(buffer_count)
print(f"Buffer count set to: {buffer_count}")
# Configure payload size
payload_size = camera.PayloadSize.get()
print(f"Payload size: {payload_size} bytes")
return True
except Exception as e:
print(f"Buffer configuration failed: {e}")
return False
Memory Management
# Efficient memory handling for continuous acquisition
def efficient_acquisition_loop(camera, duration=10):
"""Memory-efficient acquisition loop"""
start_time = time.time()
frame_count = 0
# Pre-allocate memory for statistics
stats_buffer = []
while time.time() - start_time < duration:
raw_image = camera.data_stream[0].get_image(timeout=1000)
if raw_image is not None:
# Process without copying when possible
image_stats = {
'frame_id': raw_image.get_frame_id(),
'timestamp': raw_image.get_timestamp(),
'mean_intensity': raw_image.get_numpy_array().mean(),
}
stats_buffer.append(image_stats)
frame_count += 1
# Limit stats buffer size
if len(stats_buffer) > 1000:
stats_buffer = stats_buffer[-500:] # Keep last 500
print(f"Processed {frame_count} frames efficiently")
return stats_buffer
Troubleshooting
Common Issues and Solutions
Camera Not Found
def diagnose_camera_connection():
"""Diagnose camera connection issues"""
device_manager = gx.DeviceManager()
print("Diagnosing camera connection...")
# Check SDK installation
try:
device_num, _ = device_manager.update_device_list(timeout=1000)
print(f"✓ SDK initialized successfully")
except Exception as e:
print(f"✗ SDK initialization failed: {e}")
return
if device_num == 0:
print("✗ No cameras detected")
print("Troubleshooting steps:")
print(" 1. Check camera power and connections")
print(" 2. Verify network settings (for GigE cameras)")
print(" 3. Check USB cable and ports (for USB cameras)")
print(" 4. Ensure camera drivers are installed")
return
print(f"✓ Found {device_num} camera(s)")
Performance Issues
def diagnose_performance():
"""Check system performance for camera applications"""
import psutil
print("System performance check:")
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
print(f"CPU Usage: {cpu_percent}%")
# Memory usage
memory = psutil.virtual_memory()
print(f"Memory Usage: {memory.percent}% ({memory.available // 1024**2} MB available)")
# Disk usage for image storage
disk = psutil.disk_usage('.')
print(f"Disk Space: {disk.percent}% used ({disk.free // 1024**3} GB free)")
# Network interfaces (for GigE cameras)
net_interfaces = psutil.net_if_stats()
print("Network Interfaces:")
for interface, stats in net_interfaces.items():
if stats.isup:
print(f" {interface}: {'UP' if stats.isup else 'DOWN'}")
Error Code Reference
- InvalidAccess: Camera already in use or insufficient permissions
- NotFoundDevice: Camera disconnected or not available
- Timeout: Operation timed out, check network/USB connection
- InvalidParameter: Parameter value out of range or not supported
- NotImplemented: Feature not available on this camera model
API Reference Summary
DeviceManager Methods
update_device_list(timeout)
– Discover camerasopen_device_by_index(index)
– Connect by indexopen_device_by_sn(serial)
– Connect by serial numberopen_device_by_ip(ip)
– Connect by IP addresscreate_image_process()
– Create image processorcreate_image_format_convert()
– Create format converter
Device Properties
- Image Format:
Width
,Height
,OffsetX
,OffsetY
,PixelFormat
- Exposure:
ExposureTime
,ExposureAuto
,ExposureMode
- Gain:
Gain
,GainAuto
- Triggering:
TriggerMode
,TriggerSource
,TriggerActivation
- Acquisition:
AcquisitionMode
,AcquisitionFrameRate
DataStream Methods
get_image(timeout)
– Get single imageregister_capture_callback(callback)
– Register callbackunregister_capture_callback()
– Remove callback
Image Object Methods
get_numpy_array()
– Convert to NumPy arrayget_width()
,get_height()
– Image dimensionsget_pixel_format()
– Pixel formatget_frame_id()
– Frame counterget_timestamp()
– Image timestamp
This guide provides comprehensive coverage of the gxipy library for professional camera application development. For the latest API documentation and SDK updates, please refer to the official Daheng Imaging documentation included with your Galaxy SDK installation.