You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Not sure entirely where to post this script.. But I'm happy with it and figured I would share it with you. Its a pyside6 python GUI that can start up multiple cameras and record from them. It uses FFMPEG, multiprocessing, and threading for speed and safety.
import sys
import os
import threading
import time
import json
import numpy as np
import gxipy as gx
import cv2
import av
from av import VideoFrame
from fractions import Fraction
from pathlib import Path
import queue
from PySide6.QtWidgets import (
QApplication, QMainWindow, QWidget, QLabel, QPushButton, QVBoxLayout,
QHBoxLayout, QGridLayout, QSpinBox, QDoubleSpinBox, QLineEdit, QFileDialog,
QMessageBox, QCheckBox, QComboBox, QScrollArea, QSizePolicy
)
from PySide6.QtCore import Qt, QTimer
from PySide6.QtGui import QImage, QPixmap, QAction
from datetime import datetime
# --- Helper Functions ---
def cv_image_to_qt(cv_image):
"""Convert a BGR OpenCV image to QPixmap."""
height, width, channel = cv_image.shape
bytes_per_line = 3 * width
# Convert BGR to RGB
cv_image_rgb = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
q_image = QImage(cv_image_rgb.data, width, height, bytes_per_line, QImage.Format_RGB888)
return QPixmap.fromImage(q_image)
def get_timestamp():
"""Generate a formatted timestamp string."""
return datetime.now().strftime("%H:%M:%S %m-%d-%Y")
def create_storage_directory(storage_directory_root, storage_directory_name):
"""Create the storage directory if it doesn't exist."""
storage_directory = os.path.join(storage_directory_root, storage_directory_name)
storage_directory = os.path.normpath(storage_directory) # Normalize the path
if not os.path.exists(storage_directory):
os.makedirs(storage_directory, exist_ok=True)
print(f"Created storage directory: {storage_directory}")
return storage_directory
# --- Camera Manager Class ---
class CameraManager:
"""Manages camera devices."""
def __init__(self):
self.device_manager = gx.DeviceManager()
self.cameras = {}
def discover_cameras(self):
"""Discover connected camera devices."""
dev_num, dev_info_list = self.device_manager.update_device_list()
return dev_info_list
def open_camera(self, serial_number):
"""Open a camera by its serial number."""
if serial_number in self.cameras:
return self.cameras[serial_number]
cam = self.device_manager.open_device_by_sn(serial_number)
if cam is not None:
self.cameras[serial_number] = cam
return cam
def close_camera(self, serial_number):
"""Close a specific camera by its serial number."""
cam = self.cameras.pop(serial_number, None)
if cam:
cam.close_device()
def close_all_cameras(self):
"""Close all opened cameras."""
for cam in self.cameras.values():
cam.close_device()
self.cameras.clear()
# --- Camera Recorder Class ---
class CameraRecorder:
"""Manages recording from a camera using separate threads for capturing and encoding."""
def __init__(self, cam, settings, storage_directory, camera_lock):
self.cam = cam
self.settings = settings
self.storage_directory = storage_directory
self.camera_lock = camera_lock
self.frame_queue = queue.Queue(maxsize=100) # Adjust maxsize as needed
self.is_recording = False
self.stop_event = threading.Event()
self.capture_thread = threading.Thread(target=self.capture_frames)
self.encoder_thread = threading.Thread(target=self.encode_frames)
def start(self):
self.is_recording = True
self.capture_thread.start()
self.encoder_thread.start()
def stop(self):
self.is_recording = False
self.stop_event.set()
self.capture_thread.join()
self.encoder_thread.join()
def capture_frames(self):
"""Continuously capture frames and put them into the queue."""
while self.is_recording and not self.stop_event.is_set():
with self.camera_lock:
raw_image = self.cam.data_stream[0].get_image()
if raw_image is None:
continue
numpy_image = raw_image.get_numpy_array()
if numpy_image is None:
continue
bgr_image = cv2.cvtColor(numpy_image, cv2.COLOR_BAYER_RG2BGR)
# Add timestamp
current_timestamp = get_timestamp()
cv2.putText(bgr_image, current_timestamp, (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 4, (255, 255, 255), 3)
# Put frame into queue
try:
self.frame_queue.put(bgr_image, timeout=1)
except queue.Full:
print("Frame queue is full, dropping frame")
# Sleep to maintain frame rate
time.sleep(1.0 / self.settings['fps'])
def encode_frames(self):
"""Continuously encode frames from the queue into video files."""
frames_written = 0
output_container = None
stream = None
while self.is_recording and not self.stop_event.is_set():
# Start a new video file if needed
if frames_written == 0 or frames_written >= self.settings['video_length_frames']:
if output_container:
# Flush and close previous encoder and container
for packet in stream.encode():
output_container.mux(packet)
output_container.close()
print("Closed previous video file")
# Start a new video file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
video_filename = Path(self.storage_directory) / f"video_{timestamp}.mp4"
video_filename = str(video_filename).replace('\\', '/')
print(f"Recording to file: {video_filename}")
# Open new output container
output_container = av.open(video_filename, mode='w', format='mp4')
# Set up video stream
stream = output_container.add_stream('libx264', rate=self.settings['fps'])
stream.width = int(self.cam.Width.get())
stream.height = int(self.cam.Height.get())
stream.pix_fmt = 'yuv420p'
compression_settings = self.get_compression_settings(self.settings['compression_level'])
stream.options = compression_settings
stream.time_base = Fraction(1, int(self.settings['fps']))
frames_written = 0
try:
# Get frame from queue
bgr_image = self.frame_queue.get(timeout=1)
except queue.Empty:
continue # No frame available, continue
# Create a VideoFrame from the numpy array
frame = VideoFrame.from_ndarray(bgr_image, format='bgr24')
# Reformat the frame to 'yuv420p'
frame = frame.reformat(format='yuv420p')
# Encode the frame
for packet in stream.encode(frame):
output_container.mux(packet)
frames_written += 1
# Flush and close encoder and container
if output_container:
for packet in stream.encode():
output_container.mux(packet)
output_container.close()
print("Closed final video file")
def get_compression_settings(self, compression_level):
compression_presets = {
"Very Low": {'crf': '15', 'preset': 'slow'},
"Low": {'crf': '18', 'preset': 'slow'},
"Medium": {'crf': '23', 'preset': 'medium'},
"High": {'crf': '25', 'preset': 'fast'},
"Very High": {'crf': '33', 'preset': 'veryfast'}
}
return compression_presets.get(compression_level, compression_presets["Medium"])
# --- Camera Widget Class ---
class CameraWidget(QWidget):
"""Widget for displaying and controlling a single camera."""
def __init__(self, cam, serial_number):
super().__init__()
self.cam = cam
self.serial_number = serial_number
self.is_streaming = False
self.camera_lock = threading.Lock()
self.camera_recorder = None
self.init_ui()
self.start_camera_stream()
def start_camera_stream(self):
if not self.is_streaming:
self.cam.stream_on()
self.is_streaming = True
def stop_camera_stream(self):
if self.is_streaming:
self.cam.stream_off()
self.is_streaming = False
def init_ui(self):
self.layout = QVBoxLayout()
self.setLayout(self.layout)
# Title
self.title_label = QLabel(f"Camera SN: {self.serial_number}")
self.title_label.setAlignment(Qt.AlignCenter)
self.layout.addWidget(self.title_label)
# Video display label
self.video_label = QLabel("Video Stream")
self.video_label.setAlignment(Qt.AlignCenter)
# Set initial fixed size
self.video_label.setFixedSize(640, 480)
# Set size policy to Fixed
self.video_label.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Fixed)
self.layout.addWidget(self.video_label)
# Controls layout
self.controls_layout = QGridLayout()
# Exposure time
self.exposure_label = QLabel("Exposure Time (µs):")
self.exposure_spinbox = QSpinBox()
self.exposure_spinbox.setRange(1, 10000000)
self.exposure_spinbox.setValue(100000)
self.controls_layout.addWidget(self.exposure_label, 0, 0)
self.controls_layout.addWidget(self.exposure_spinbox, 0, 1)
# Gain
self.gain_label = QLabel("Gain:")
self.gain_spinbox = QDoubleSpinBox()
self.gain_spinbox.setRange(0, 24)
self.gain_spinbox.setDecimals(2)
self.gain_spinbox.setValue(0)
self.controls_layout.addWidget(self.gain_label, 1, 0)
self.controls_layout.addWidget(self.gain_spinbox, 1, 1)
# Frame rate
self.fps_label = QLabel("Frame Rate:")
self.fps_spinbox = QDoubleSpinBox()
self.fps_spinbox.setRange(0.1, 120)
self.fps_spinbox.setDecimals(1)
self.fps_spinbox.setValue(1)
self.controls_layout.addWidget(self.fps_label, 2, 0)
self.controls_layout.addWidget(self.fps_spinbox, 2, 1)
# Video length
self.video_length_label = QLabel("Video Length (min):")
self.video_length_spinbox = QSpinBox()
self.video_length_spinbox.setRange(1, 1440)
self.video_length_spinbox.setValue(2)
self.controls_layout.addWidget(self.video_length_label, 3, 0)
self.controls_layout.addWidget(self.video_length_spinbox, 3, 1)
# Storage directory
self.storage_label = QLabel("Root Storage Directory:")
self.storage_lineedit = QLineEdit(os.path.expanduser("~"))
self.browse_button = QPushButton("Browse")
self.browse_button.clicked.connect(self.browse_storage_directory)
self.controls_layout.addWidget(self.storage_label, 4, 0)
self.controls_layout.addWidget(self.storage_lineedit, 4, 1)
self.controls_layout.addWidget(self.browse_button, 4, 2)
# Directory name
self.dir_name_label = QLabel("Directory Name:")
self.dir_name_lineedit = QLineEdit("video_data")
self.controls_layout.addWidget(self.dir_name_label, 5, 0)
self.controls_layout.addWidget(self.dir_name_lineedit, 5, 1)
# Compression settings
self.compression_label = QLabel("Compression Level:")
self.compression_combo = QComboBox()
self.compression_combo.addItems(["Very Low", "Low", "Medium", "High", "Very High"])
self.compression_combo.setCurrentIndex(2) # Set "Medium" as default
self.controls_layout.addWidget(self.compression_label, 6, 0)
self.controls_layout.addWidget(self.compression_combo, 6, 1)
# Start/Stop buttons
self.start_button = QPushButton("Start Recording")
self.start_button.clicked.connect(self.start_recording)
self.stop_button = QPushButton("Stop Recording")
self.stop_button.clicked.connect(self.stop_recording)
self.stop_button.setEnabled(False)
self.controls_layout.addWidget(self.start_button, 7, 0)
self.controls_layout.addWidget(self.stop_button, 7, 1)
# Resize Feed button
self.resize_feed_button = QPushButton("Resize Feed")
self.resize_feed_button.clicked.connect(self.resize_feed)
self.controls_layout.addWidget(self.resize_feed_button, 8, 0, 1, 2)
# Add controls layout to main layout
self.layout.addLayout(self.controls_layout)
# Video update timer
self.timer = QTimer()
self.timer.timeout.connect(self.update_frame)
self.timer.start(1000 // 15) # Update at 15 FPS
def browse_storage_directory(self):
directory = QFileDialog.getExistingDirectory(self, "Select Root Storage Directory")
if directory:
self.storage_lineedit.setText(directory)
def start_recording(self):
if self.camera_recorder and self.camera_recorder.is_recording:
QMessageBox.warning(self, "Warning", "Recording is already in progress.")
return
# Prepare settings
settings = {
'exposure_time_us': self.exposure_spinbox.value(),
'gain': self.gain_spinbox.value(),
'fps': self.fps_spinbox.value(),
'video_length_frames': int(self.video_length_spinbox.value() * 60 * self.fps_spinbox.value()),
'compression_level': self.compression_combo.currentText()
}
# Set camera parameters in the main thread
with self.camera_lock:
self.cam.ExposureTime.set(settings['exposure_time_us'])
self.cam.Gain.set(settings['gain'])
self.cam.AcquisitionFrameRate.set(settings['fps'])
# Get storage directory details
storage_directory_root = self.storage_lineedit.text()
storage_directory_name = self.dir_name_lineedit.text()
if not storage_directory_root or not storage_directory_name:
QMessageBox.warning(self, "Warning", "Please specify both the root storage directory and directory name.")
return
# Create storage directory
storage_directory = create_storage_directory(storage_directory_root, storage_directory_name)
# Create configuration file
config_data = {
'camera_serial_number': self.serial_number,
'exposure_time_us': settings['exposure_time_us'],
'gain': settings['gain'],
'fps': settings['fps'],
'video_length_frames': settings['video_length_frames'],
'compression_level': settings['compression_level'],
'storage_directory': storage_directory,
'directory_name': storage_directory_name,
'recording_start_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
config_filename = Path(storage_directory) / 'config.json'
config_filename = str(config_filename).replace('\\', '/')
try:
with open(config_filename, 'w') as config_file:
json.dump(config_data, config_file, indent=4)
print(f"Configuration file saved at {config_filename}")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to save configuration file: {e}")
return
# Start camera recorder
self.camera_recorder = CameraRecorder(self.cam, settings, storage_directory, self.camera_lock)
self.camera_recorder.start()
# Update UI
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
def stop_recording(self):
if self.camera_recorder and self.camera_recorder.is_recording:
self.camera_recorder.stop()
# Update UI
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
def update_frame(self):
if not self.is_streaming:
return
with self.camera_lock:
# Get raw image
raw_image = self.cam.data_stream[0].get_image()
if raw_image is None:
return
# Convert raw image to numpy array
numpy_image = raw_image.get_numpy_array()
if numpy_image is None:
return
# Convert to BGR format
bgr_image = cv2.cvtColor(numpy_image, cv2.COLOR_BAYER_RG2BGR)
# Convert to QPixmap and display
pixmap = cv_image_to_qt(bgr_image)
self.video_label.setPixmap(pixmap.scaled(self.video_label.size(), Qt.KeepAspectRatio))
def resize_feed(self):
"""Resize the video feed to match the available space."""
# Calculate available height by subtracting the height of other widgets
total_height = self.height()
controls_height = self.controls_layout.sizeHint().height()
title_height = self.title_label.sizeHint().height()
available_height = total_height - controls_height - title_height - 20 # Subtract some padding
# Get the width of the widget
available_width = self.width() - 20 # Subtract some padding
# Set the fixed size of the video label
self.video_label.setFixedSize(available_width, available_height)
self.video_label.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Fixed)
def stop_all(self):
"""Stop streaming and recording."""
self.stop_recording()
self.stop_camera_stream()
def closeEvent(self, event):
self.stop_all()
event.accept()
# --- Main Application Class ---
class MainWindow(QMainWindow):
"""Main application window."""
def __init__(self):
super().__init__()
self.setWindowTitle("Multi-Camera Recording")
self.camera_manager = CameraManager()
self.camera_widgets = {}
self.init_ui()
def init_ui(self):
# Central widget and layout
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.main_layout = QVBoxLayout()
self.central_widget.setLayout(self.main_layout)
# Camera selection layout
self.camera_selection_layout = QHBoxLayout()
self.camera_combo = QComboBox()
self.refresh_button = QPushButton("Refresh Cameras")
self.refresh_button.clicked.connect(self.refresh_cameras)
self.add_camera_button = QPushButton("Add Camera")
self.add_camera_button.clicked.connect(self.add_selected_camera)
self.camera_selection_layout.addWidget(QLabel("Select Camera:"))
self.camera_selection_layout.addWidget(self.camera_combo)
self.camera_selection_layout.addWidget(self.refresh_button)
self.camera_selection_layout.addWidget(self.add_camera_button)
self.main_layout.addLayout(self.camera_selection_layout)
# Close stream layout
self.close_stream_layout = QHBoxLayout()
self.active_streams_combo = QComboBox()
self.close_stream_button = QPushButton("Close Stream")
self.close_stream_button.clicked.connect(self.close_selected_stream)
self.close_stream_layout.addWidget(QLabel("Active Streams:"))
self.close_stream_layout.addWidget(self.active_streams_combo)
self.close_stream_layout.addWidget(self.close_stream_button)
self.main_layout.addLayout(self.close_stream_layout)
# Scroll area for camera widgets
self.scroll_area = QScrollArea()
self.scroll_area_widget = QWidget()
self.cameras_layout = QVBoxLayout()
self.scroll_area_widget.setLayout(self.cameras_layout)
self.scroll_area.setWidgetResizable(True)
self.scroll_area.setWidget(self.scroll_area_widget)
self.main_layout.addWidget(self.scroll_area)
# Discover and list cameras
self.refresh_cameras()
# Menu bar for future extensions
self.menu_bar = self.menuBar()
self.file_menu = self.menu_bar.addMenu("File")
self.exit_action = QAction("Exit", self)
self.exit_action.triggered.connect(self.close)
self.file_menu.addAction(self.exit_action)
def refresh_cameras(self):
dev_info_list = self.camera_manager.discover_cameras()
self.camera_combo.clear()
for dev_info in dev_info_list:
serial_number = dev_info.get("sn")
self.camera_combo.addItem(serial_number)
def add_selected_camera(self):
serial_number = self.camera_combo.currentText()
if serial_number:
if serial_number in self.camera_widgets:
QMessageBox.warning(self, "Warning", f"Camera {serial_number} is already added.")
return
self.add_camera(serial_number)
else:
QMessageBox.warning(self, "Warning", "No camera selected.")
def add_camera(self, serial_number):
cam = self.camera_manager.open_camera(serial_number)
if cam:
camera_widget = CameraWidget(cam, serial_number)
self.camera_widgets[serial_number] = camera_widget
self.cameras_layout.addWidget(camera_widget)
self.active_streams_combo.addItem(serial_number)
else:
QMessageBox.critical(self, "Error", f"Failed to open camera with SN: {serial_number}")
def close_selected_stream(self):
serial_number = self.active_streams_combo.currentText()
if serial_number:
self.remove_camera(serial_number)
else:
QMessageBox.warning(self, "Warning", "No active stream selected.")
def remove_camera(self, serial_number):
camera_widget = self.camera_widgets.pop(serial_number, None)
if camera_widget:
camera_widget.stop_all()
# Remove widget from layout
self.cameras_layout.removeWidget(camera_widget)
camera_widget.deleteLater()
# Remove camera from manager
self.camera_manager.close_camera(serial_number)
# Remove from active streams combo box
index = self.active_streams_combo.findText(serial_number)
if index >= 0:
self.active_streams_combo.removeItem(index)
QMessageBox.information(self, "Info", f"Camera {serial_number} has been closed.")
else:
QMessageBox.warning(self, "Warning", f"No active stream for camera {serial_number}.")
def closeEvent(self, event):
# Close all cameras
for serial_number in list(self.camera_widgets.keys()):
self.remove_camera(serial_number)
self.camera_manager.close_all_cameras()
event.accept()
# --- Main Entry Point ---
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
This script was largely written by a long extended back and forth between myself and o1 preview :)
The text was updated successfully, but these errors were encountered:
Not sure entirely where to post this script.. But I'm happy with it and figured I would share it with you. Its a pyside6 python GUI that can start up multiple cameras and record from them. It uses FFMPEG, multiprocessing, and threading for speed and safety.
This script was largely written by a long extended back and forth between myself and o1 preview :)
The text was updated successfully, but these errors were encountered: