From 24f309a0af6c8e634f29fa76ac2d4aa7b05fb382 Mon Sep 17 00:00:00 2001 From: Marcel Schaeben Date: Mon, 25 Nov 2024 17:26:22 +0100 Subject: [PATCH] wip --- .github/workflows/build-win.yml | 58 + build_win.sh | 35 + build_win_hook.py | 7 + camera_worker.py | 1104 +++++++++---------- helpers.py | 9 + main.py | 1837 ++++++++++++++++--------------- photo_browser.py | 482 ++++---- requirements.txt | 6 +- settings_dialog.py | 139 +-- 9 files changed, 1895 insertions(+), 1782 deletions(-) create mode 100644 .github/workflows/build-win.yml create mode 100644 build_win.sh create mode 100644 build_win_hook.py create mode 100644 helpers.py diff --git a/.github/workflows/build-win.yml b/.github/workflows/build-win.yml new file mode 100644 index 0000000..5e34070 --- /dev/null +++ b/.github/workflows/build-win.yml @@ -0,0 +1,58 @@ +name: Build for Windows + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +jobs: + build: + runs-on: windows-latest + defaults: + run: + shell: msys2 {0} + + steps: + - uses: actions/checkout@v4 + + - name: Setup MSYS2 + uses: msys2/setup-msys2@v2 + with: + msystem: MINGW64 + update: false # true + install: >- + mingw-w64-x86_64-python + mingw-w64-x86_64-python-pip + mingw-w64-x86_64-libgphoto2 + mingw-w64-x86_64-qt6-base + mingw-w64-x86_64-pkg-config + mingw-w64-x86_64-python-pip + mingw-w64-x86_64-python-pyqt6 + mingw-w64-x86_64-python-pillow + mingw-w64-x86_64-pyinstaller + wget + p7zip + mingw-w64-x86_64-jq + + - name: Set up Python environment + run: | + python -m venv --system-site-packages .venv + source .venv/bin/activate + python -m pip install --upgrade pip + install gphoto2 --user --no-binary :all: + python -m pip install -r requirements.txt + echo "VIRTUAL_ENV=$(pwd)/.venv" >> $GITHUB_ENV + echo "$(pwd)/.venv/bin" >> $GITHUB_PATH + - name: Build + run: | + chmod +x build_win.sh + ./build_win.sh + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: build-artifacts + path: | + dist/main diff --git a/build_win.sh b/build_win.sh new file mode 100644 index 0000000..4e2d876 --- /dev/null +++ b/build_win.sh @@ -0,0 +1,35 @@ +#!/bin/bash +set -euo pipefail + +# Get latest release info from GitHub API +LIBUSB_RELEASE_INFO=$(wget -qO- https://api.github.com/repos/libusb/libusb/releases/latest) + +# Extract Windows release asset URL (only .7z file) +LIBUSB_WIN_URL=$(echo "$LIBUSB_RELEASE_INFO" | \ + jq -r '.assets[] | select(.name | endswith(".7z")) | .browser_download_url') +LIBUSB_FILENAME=$(basename "$LIBUSB_WIN_URL") + +# Create vendor directory if it doesn't exist +mkdir -p vendor/libusb + +# Download and extract +cd vendor +if [ ! -f "$LIBUSB_FILENAME" ]; then + wget -N "$LIBUSB_WIN_URL" +else + echo $LIBUSB_FILENAME already exists, skipping download. +fi +7z x "$LIBUSB_FILENAME" -aos -olibusb + +# Clean up downloaded archive +#rm libusb.7z + +cd .. + +pyinstaller --onedir \ + --add-binary /mingw64/lib/libgphoto2_port/0.12.2/:. \ + --add-binary /mingw64/lib/libgphoto2/2.5.31/:. \ + --add-binary ./vendor/libusb/MinGW64/dll/libusb-1.0.dll:. \ + --add-data ui:ui main.py \ + --runtime-hook ./build_win_hook.py \ + --noconfirm diff --git a/build_win_hook.py b/build_win_hook.py new file mode 100644 index 0000000..5e0d1f0 --- /dev/null +++ b/build_win_hook.py @@ -0,0 +1,7 @@ +import os +import sys +import logging + +logging.info(f"Set IOLIBS and CAMLIBS to: #{sys._MEIPASS}") +os.environ["IOLIBS"] = sys._MEIPASS +os.environ["CAMLIBS"] = sys._MEIPASS \ No newline at end of file diff --git a/camera_worker.py b/camera_worker.py index fb4c23e..16852ea 100644 --- a/camera_worker.py +++ b/camera_worker.py @@ -1,552 +1,552 @@ -import io -import logging -import os -import re -from contextlib import contextmanager -from enum import Enum -from string import Template -from time import sleep -from typing import NamedTuple, Literal, Generator, Union - -import gphoto2 as gp -from PIL import Image -from PyQt6.QtCore import pyqtSignal, QObject, QElapsedTimer, QTimer, Qt, pyqtSlot -from PyQt6.QtWidgets import QApplication -from gphoto2 import CameraWidget - -EVENT_DESCRIPTIONS = { - gp.GP_EVENT_UNKNOWN: "Unknown", - gp.GP_EVENT_CAPTURE_COMPLETE: "Capture Complete", - gp.GP_EVENT_FILE_ADDED: "File Added", - gp.GP_EVENT_FOLDER_ADDED: "Folder Added", - gp.GP_EVENT_TIMEOUT: "Timeout" -} - -class NikonPTPError(Enum): - OutOfFocus = "0xa002" - -class ConfigRequest(): - class Signal(QObject): - got_config = pyqtSignal(gp.CameraWidget) - - def __init__(self): - self.signal = ConfigRequest.Signal() - - -class CaptureImagesRequest(): - class Signal(QObject): - file_received = pyqtSignal(str) - - file_path_template: str - num_images: int - expect_files: int = 1 - max_burst: int = 1 - skip: int = 0 - manual_trigger: bool = False - image_quality: str | None = None - - def __init__(self, file_path_template, num_images, expect_files = 1, max_burst = 1, skip = 0, manual_trigger = False, image_quality = None): - self.file_path_template = file_path_template - self.num_images = num_images - self.expect_files = expect_files - self.max_burst = max_burst - self.skip = skip - self.manual_trigger = manual_trigger - self.image_quality = image_quality - - self.signal = CaptureImagesRequest.Signal() - - - -class LiveViewImage(NamedTuple): - image: Image.Image - -class CameraStates: - class Waiting: - pass - - class Found: - def __init__(self, camera_name: str): - super().__init__() - self.camera_name = camera_name - - class Disconnected: - def __init__(self, camera_name: str, auto_reconnect: bool = True): - super().__init__() - self.auto_reconnect = auto_reconnect - self.camera_name = camera_name - - class Connecting: - def __init__(self, camera_name: str): - self.camera_name = camera_name - - class Disconnecting: - pass - - class Ready: - def __init__(self, camera_name: str): - super().__init__() - self.camera_name = camera_name - - class LiveViewStarted(NamedTuple): - current_lightmeter_value: int - - class LiveViewActive: - pass - - class FocusStarted: - pass - - class FocusFinished(NamedTuple): - success: bool - - class LiveViewStopped: - pass - - class CaptureInProgress: - def __init__(self, capture_request: CaptureImagesRequest, num_captured: int): - super().__init__() - self.num_captured = num_captured - self.capture_request = capture_request - - class CaptureFinished: - def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int, num_captured: int): - super().__init__() - self.num_captured = num_captured - self.elapsed_time = elapsed_time - self.capture_request = capture_request - - class CaptureCancelling: - pass - - class CaptureCanceled: - def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int): - super().__init__() - self.elapsed_time = elapsed_time - self.capture_request = capture_request - - class CaptureError: - def __init__(self, capture_request: CaptureImagesRequest, error: str): - super().__init__() - self.capture_request = capture_request - self.error = error - - class IOError: - def __init__(self, error: str): - super().__init__() - self.error = error - - class ConnectionError: - def __init__(self, error: gp.GPhoto2Error): - super().__init__() - self.error = error - - StateType = Union[ - Waiting, Found, Disconnected, Connecting, Disconnecting, Ready, CaptureInProgress, - CaptureFinished, CaptureCanceled, CaptureCancelling, CaptureError, IOError, ConnectionError, LiveViewStarted, LiveViewStopped, - LiveViewActive, FocusStarted, FocusFinished - ] - - -class CameraCommands(QObject): - capture_images = pyqtSignal(CaptureImagesRequest) - find_camera = pyqtSignal() - connect_camera = pyqtSignal() - disconnect_camera = pyqtSignal() - reconnect_camera = pyqtSignal() - set_config = pyqtSignal(gp.CameraWidget) - set_single_config = pyqtSignal(str, str) - cancel = pyqtSignal() - live_view = pyqtSignal(bool) - trigger_autofocus = pyqtSignal() - get_config = pyqtSignal(ConfigRequest) - - -class PropertyChangeEvent(NamedTuple): - property: str - property_name: str - value: str | float - - -class CameraEvents(QObject): - config_updated = pyqtSignal(gp.CameraWidget) - - -class CameraWorker(QObject): - initialized = pyqtSignal() - state_changed = pyqtSignal(object) - property_changed = pyqtSignal(PropertyChangeEvent) - preview_image = pyqtSignal(LiveViewImage) - - def __init__(self, parent=None): - super(CameraWorker, self).__init__(parent) - - self.__logger = logging.getLogger(self.__class__.__name__) - - self.__logging_callback_extract_gp2_error = None - self.__state = None - self.commands = CameraCommands() - self.events = CameraEvents() - - self.camera: gp.Camera = None - self.camera_name: str = None - - self.filesCounter = 0 - self.captureComplete = False - - self.shouldCancel = False - self.liveView = False - self.timer: QTimer = None - - self.__last_ptp_error: NikonPTPError = None - - def initialize(self): - self.__logger.info("Init Camera Worker") - self.timer = QTimer() - - self.commands.capture_images.connect(self.captureImages) - self.commands.find_camera.connect(self.__find_camera) - self.commands.connect_camera.connect(self.__connect_camera) - self.commands.disconnect_camera.connect(lambda: self.__disconnect_camera(False)) - self.commands.reconnect_camera.connect(lambda: self.__disconnect_camera(True)) - self.commands.set_config.connect(self.__set_config) - self.commands.set_single_config.connect(self.__set_single_config) - self.commands.cancel.connect(self.__cancel) - self.commands.live_view.connect(lambda active: self.__start_live_view() if active else self.__stop_live_view()) - self.commands.trigger_autofocus.connect(self.__trigger_autofocus) - self.commands.get_config.connect(self.__get_config) - - self.__set_state(CameraStates.Waiting()) - - self.initialized.emit() - - # self.__logging_callback_python_logging = gp.check_result(gp.use_python_logging(mapping={ - # gp.GP_LOG_ERROR: logging.INFO, - # gp.GP_LOG_DEBUG: logging.DEBUG, - # gp.GP_LOG_VERBOSE: logging.DEBUG - 3, - # gp.GP_LOG_DATA: logging.DEBUG - 6})) - - self.__logging_callback_extract_gp2_error = gp.check_result( - gp.gp_log_add_func(gp.GP_LOG_ERROR, self.__extract_gp2_error_from_log)) - - def __extract_gp2_error_from_log(self, _level: int, domain: bytes, string: bytes, _data=None): - error_str = string - for ptp_error in NikonPTPError: - error_suffix = "(%s)" % ptp_error.value - if error_str.endswith(error_suffix): - self.__last_ptp_error = ptp_error - self.__logger.debug("PTP Error: {} {}".format(ptp_error, error_suffix)) - - def __set_state(self, state: CameraStates.StateType): - self.__state = state - self.__logger.debug("Set camera state: " + state.__class__.__name__) - self.state_changed.emit(state) - - @staticmethod - def __handle_camera_error(func): - def wrapper(self, *args, **kwargs): - try: - return func(self, *args, **kwargs) - except gp.GPhoto2Error as err: - self.__logger.exception("Camera Error {0}: {1}".format(err.code, err.string)) - self.__set_state(CameraStates.ConnectionError(error=err)) - self.__disconnect_camera() - - return wrapper - - def __find_camera(self): - self.__set_state(CameraStates.Waiting()) - camera_list = None - while not camera_list and not self.thread().isInterruptionRequested(): - self.__logger.info("Waiting for camera...") - camera_list = list(gp.Camera.autodetect()) - sleep(1) - - name, _ = camera_list[0] - self.__set_state(CameraStates.Found(camera_name=name)) - - @__handle_camera_error - def __connect_camera(self): - self.__set_state(CameraStates.Connecting(self.camera_name)) - self.camera = gp.Camera() - self.__last_ptp_error = None - - self.camera.init() - self.empty_event_queue(1000) - with self.__open_config("read") as cfg: - self.events.config_updated.emit(cfg) - self.camera_name = "%s %s" % ( - cfg.get_child_by_name("manufacturer").get_value(), - cfg.get_child_by_name("cameramodel").get_value() - ) - self.__set_state(CameraStates.Ready(self.camera_name)) - - while self.camera: - if self.thread().isInterruptionRequested(): - self.__disconnect_camera() - self.thread().exit() - return - - try: - if isinstance(self.__state, CameraStates.LiveViewActive): - self.__live_view_capture_preview() - # self.empty_event_queue(1) - self.thread().msleep(50) - else: - self.empty_event_queue(1) - finally: - QApplication.processEvents() - - def __disconnect_camera(self, auto_reconnect=True): - self.__set_state(CameraStates.Disconnecting()) - - # Ignore errors while disconnecting - try: - self.camera.exit() - except gp.GPhoto2Error: - pass - except AttributeError: # Camera already gone - pass - - self.__set_state(CameraStates.Disconnected(camera_name=self.camera_name, auto_reconnect=auto_reconnect)) - self.camera = None - self.camera_name = None - - @__handle_camera_error - def __set_single_config(self, name, value): - self.__logger.info("Set config %s to %s" % (name, value)) - with self.__open_config("write") as cfg: - cfg_widget = cfg.get_child_by_name(name) - cfg_widget.set_value(value) - - self.empty_event_queue() - self.__emit_current_config() - - @__handle_camera_error - def __set_config(self, cfg: gp.CameraWidget): - self.camera.set_config(cfg) - - @__handle_camera_error - def __get_config(self, req: ConfigRequest): - with self.__open_config("read") as cfg: - req.signal.got_config.emit(cfg) - - def __emit_current_config(self): - with self.__open_config("read") as cfg: - self.events.config_updated.emit(cfg) - - def __cancel(self): - self.shouldCancel = True - self.__set_state(CameraStates.CaptureCancelling()) - - def empty_event_queue(self, timeout=100): - event_type, data = self.camera.wait_for_event(timeout) - - while event_type != gp.GP_EVENT_TIMEOUT: - self.__logger.debug("Event: %s, data: %s" % (EVENT_DESCRIPTIONS.get(event_type, "Unknown"), data)) - QApplication.processEvents() - - if event_type == gp.GP_EVENT_FILE_ADDED: - cam_file_path = os.path.join(data.folder, data.name) - self.__logger.info("New file: %s" % cam_file_path) - basename, extension = os.path.splitext(data.name) - - if isinstance(self.__state, CameraStates.CaptureInProgress) and not self.shouldCancel and not self.thread().isInterruptionRequested(): - tpl = Template(self.__state.capture_request.file_path_template) - file_target_path = tpl.substitute( - basename=basename, - extension=extension, - num=str(self.filesCounter + 1).zfill(3) - ) - cam_file = self.camera.file_get( - data.folder, data.name, gp.GP_FILE_TYPE_NORMAL) - self.__logger.info("Saving to %s" % file_target_path) - cam_file.save(file_target_path) - - current_capture_req = self.__state.capture_request - current_capture_req.signal.file_received.emit(file_target_path) - if self.filesCounter % current_capture_req.expect_files == 0: - num_captured = int(self.filesCounter / current_capture_req.expect_files) - self.__set_state(CameraStates.CaptureInProgress(self.__state.capture_request, num_captured)) - - self.filesCounter += 1 - else: - self.__logger.warning( - "Received file but capture not in progress, ignoring. State: " + self.__state.__class__.__name__) - - self.camera.file_delete(data.folder, data.name) - - elif event_type == gp.GP_EVENT_CAPTURE_COMPLETE: - self.captureComplete = True - - elif event_type == gp.GP_EVENT_UNKNOWN: - match = re.search(r'PTP Property (\w+) changed, "(\w+)" to "(-?\d+[,\.]\d+)"', data) - if match: - property = match.group(1) - property_name = match.group(2) - value_str = match.group(3) - try: - value = float(value_str.replace(',', '.')) - except ValueError: - value = value_str - self.property_changed.emit(PropertyChangeEvent(property=property, property_name=property_name, value=value)) - # print(f"Property '{property_name}' changed to {value}") - - # try to grab another event - event_type, data = self.camera.wait_for_event(1) - - @__handle_camera_error - def __start_live_view(self): - lightmeter: int - with self.__open_config("write") as cfg: - self.__try_set_config(cfg, "capturetarget", "Internal RAM") - self.__try_set_config(cfg, "recordingmedia", "SDRAM") - self.__try_set_config(cfg, "viewfinder", 1) - self.__try_set_config(cfg, "liveviewsize", "VGA") - lightmeter = cfg.get_child_by_name("lightmeter").get_value() - self.__set_state(CameraStates.LiveViewStarted(current_lightmeter_value=lightmeter)) - self.__set_state(CameraStates.LiveViewActive()) - - @__handle_camera_error - def __live_view_capture_preview(self): - lightmeter = None - try: - camera_file = self.camera.capture_preview() - file_data = camera_file.get_data_and_size() - image = Image.open(io.BytesIO(file_data)) - - self.empty_event_queue(1) - self.preview_image.emit(LiveViewImage(image=image)) - - # - # self.preview_image.emit(LiveViewImage(image=image, lightmeter_value=lightmeter)) - except gp.GPhoto2Error: - self.__stop_live_view() - - @__handle_camera_error - def __stop_live_view(self): - if isinstance(self.__state, CameraStates.LiveViewActive): - with self.__open_config("write") as cfg: - self.__try_set_config(cfg, "viewfinder", 0) - self.__try_set_config(cfg, "autofocusdrive", 0) - self.__set_state(CameraStates.LiveViewStopped()) - self.__set_state(CameraStates.Ready(self.camera_name)) - - @__handle_camera_error - def __trigger_autofocus(self): - lightmeter = None - self.__set_state(CameraStates.FocusStarted()) - try: - with self.__open_config("write") as cfg: - lightmeter = cfg.get_child_by_name("lightmeter").get_value() - self.__try_set_config(cfg, "autofocusdrive", 1) - self.__set_state(CameraStates.FocusFinished(success=True)) - except gp.GPhoto2Error: - if self.__last_ptp_error == NikonPTPError.OutOfFocus: - self.__logger.warning("Could not get focus (light: %s)." % lightmeter) - self.__set_state(CameraStates.FocusFinished(success=False)) - else: - raise - finally: - with self.__open_config("write") as cfg: - self.__try_set_config(cfg, "autofocusdrive", 0) - self.__set_state(CameraStates.LiveViewActive()) - # TODO handle general camera error - - def captureImages(self, capture_req: CaptureImagesRequest): - if isinstance(self.__state, CameraStates.LiveViewActive): - self.__stop_live_view() - - self.__logger.info("Start capture (%s)", str(capture_req)) - - timer = QElapsedTimer() - try: - timer.start() - self.empty_event_queue() - self.filesCounter = 0 - self.captureComplete = False - - self.__set_state(CameraStates.CaptureInProgress(capture_request=capture_req, num_captured=0)) - - with self.__open_config("write") as cfg: - self.__try_set_config(cfg, "capturetarget", "Internal RAM") - self.__try_set_config(cfg, "recordingmedia", "SDRAM") - self.__try_set_config(cfg, "viewfinder", 1) - self.__try_set_config(cfg, "autofocusdrive", 0) - self.__try_set_config(cfg, "focusmode", "Manual") - self.__try_set_config(cfg, "focusmode2", "MF (fixed)") - if capture_req.image_quality: - self.__try_set_config(cfg, "imagequality", capture_req.image_quality) - - self.thread().sleep(1) - - remaining = capture_req.num_images * capture_req.expect_files - while remaining > 0 and not self.shouldCancel and not self.thread().isInterruptionRequested(): - burst = min(capture_req.max_burst, int(remaining / capture_req.expect_files)) - if not capture_req.manual_trigger: - with self.__open_config("write") as cfg: - self.__try_set_config(cfg, "burstnumber", burst) - - self.captureComplete = False - if not capture_req.manual_trigger: - self.camera.trigger_capture() - while not self.captureComplete: - self.empty_event_queue(timeout=100) - QApplication.processEvents() - - remaining = capture_req.num_images * capture_req.expect_files - self.filesCounter - self.__logger.info("Curr. files: {0} (remaining: {1}).".format(self.filesCounter, remaining)) - - if not capture_req.manual_trigger: - with self.__open_config("write") as cfg: - self.__try_set_config(cfg, "burstnumber", burst) - - self.__logger.info("No. Files captured: {0} (took {1}).".format(self.filesCounter, timer.elapsed())) - - if not self.shouldCancel: - num_captured = int(self.filesCounter / capture_req.expect_files) - self.__set_state( - CameraStates.CaptureFinished(capture_req, elapsed_time=timer.elapsed(), num_captured=num_captured)) - else: - self.__logger.info("Capture cancelled") - self.__set_state(CameraStates.CaptureCanceled(capture_req, elapsed_time=timer.elapsed())) - - except gp.GPhoto2Error as err: - self.__set_state(CameraStates.CaptureError(capture_req, err.string)) - finally: - self.shouldCancel = False - # If camera is still there, try to reset Camera to a default state - if self.camera: - try: - with self.__open_config("write") as cfg: - # TODO: enable again when trigger works - self.__try_set_config(cfg, "viewfinder", 0) - if not capture_req.manual_trigger: - self.__try_set_config(cfg, "burstnumber", 1) - self.empty_event_queue() - self.__set_state(CameraStates.Ready(self.camera_name)) - except gp.GPhoto2Error as err: - self.__set_state(CameraStates.ConnectionError(err.string)) - - @contextmanager - def __open_config(self, mode: Literal["read", "write"]) -> Generator[CameraWidget, None, None]: - cfg: CameraWidget = None - try: - cfg = self.camera.get_config() - yield cfg - finally: - if mode == "write": - self.camera.set_config(cfg) - elif not mode == "read": - raise Exception("Invalid cfg open mode: %s" % mode) - - def __try_set_config(self, config: CameraWidget, name: str, value) -> None: - try: - config_widget = config.get_child_by_name(name) - config_widget.set_value(value) - self.__logger.info("Set config '%s' to %s." % (name, str(value))) - except gp.GPhoto2Error: - self.__logger.error("Config '%s' not supported by camera." % name) - - def __del__(self): - self.__disconnect_camera() +import io +import logging +import os +import re +from contextlib import contextmanager +from enum import Enum +from string import Template +from time import sleep +from typing import NamedTuple, Literal, Generator, Union + +import gphoto2 as gp +from PIL import Image +from PyQt6.QtCore import pyqtSignal, QObject, QElapsedTimer, QTimer, Qt, pyqtSlot +from PyQt6.QtWidgets import QApplication +from gphoto2 import CameraWidget + +EVENT_DESCRIPTIONS = { + gp.GP_EVENT_UNKNOWN: "Unknown", + gp.GP_EVENT_CAPTURE_COMPLETE: "Capture Complete", + gp.GP_EVENT_FILE_ADDED: "File Added", + gp.GP_EVENT_FOLDER_ADDED: "Folder Added", + gp.GP_EVENT_TIMEOUT: "Timeout" +} + +class NikonPTPError(Enum): + OutOfFocus = "0xa002" + +class ConfigRequest(): + class Signal(QObject): + got_config = pyqtSignal(gp.CameraWidget) + + def __init__(self): + self.signal = ConfigRequest.Signal() + + +class CaptureImagesRequest(): + class Signal(QObject): + file_received = pyqtSignal(str) + + file_path_template: str + num_images: int + expect_files: int = 1 + max_burst: int = 1 + skip: int = 0 + manual_trigger: bool = False + image_quality: str | None = None + + def __init__(self, file_path_template, num_images, expect_files = 1, max_burst = 1, skip = 0, manual_trigger = False, image_quality = None): + self.file_path_template = file_path_template + self.num_images = num_images + self.expect_files = expect_files + self.max_burst = max_burst + self.skip = skip + self.manual_trigger = manual_trigger + self.image_quality = image_quality + + self.signal = CaptureImagesRequest.Signal() + + + +class LiveViewImage(NamedTuple): + image: Image.Image + +class CameraStates: + class Waiting: + pass + + class Found: + def __init__(self, camera_name: str): + super().__init__() + self.camera_name = camera_name + + class Disconnected: + def __init__(self, camera_name: str, auto_reconnect: bool = True): + super().__init__() + self.auto_reconnect = auto_reconnect + self.camera_name = camera_name + + class Connecting: + def __init__(self, camera_name: str): + self.camera_name = camera_name + + class Disconnecting: + pass + + class Ready: + def __init__(self, camera_name: str): + super().__init__() + self.camera_name = camera_name + + class LiveViewStarted(NamedTuple): + current_lightmeter_value: int + + class LiveViewActive: + pass + + class FocusStarted: + pass + + class FocusFinished(NamedTuple): + success: bool + + class LiveViewStopped: + pass + + class CaptureInProgress: + def __init__(self, capture_request: CaptureImagesRequest, num_captured: int): + super().__init__() + self.num_captured = num_captured + self.capture_request = capture_request + + class CaptureFinished: + def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int, num_captured: int): + super().__init__() + self.num_captured = num_captured + self.elapsed_time = elapsed_time + self.capture_request = capture_request + + class CaptureCancelling: + pass + + class CaptureCanceled: + def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int): + super().__init__() + self.elapsed_time = elapsed_time + self.capture_request = capture_request + + class CaptureError: + def __init__(self, capture_request: CaptureImagesRequest, error: str): + super().__init__() + self.capture_request = capture_request + self.error = error + + class IOError: + def __init__(self, error: str): + super().__init__() + self.error = error + + class ConnectionError: + def __init__(self, error: gp.GPhoto2Error): + super().__init__() + self.error = error + + StateType = Union[ + Waiting, Found, Disconnected, Connecting, Disconnecting, Ready, CaptureInProgress, + CaptureFinished, CaptureCanceled, CaptureCancelling, CaptureError, IOError, ConnectionError, LiveViewStarted, LiveViewStopped, + LiveViewActive, FocusStarted, FocusFinished + ] + + +class CameraCommands(QObject): + capture_images = pyqtSignal(CaptureImagesRequest) + find_camera = pyqtSignal() + connect_camera = pyqtSignal() + disconnect_camera = pyqtSignal() + reconnect_camera = pyqtSignal() + set_config = pyqtSignal(gp.CameraWidget) + set_single_config = pyqtSignal(str, str) + cancel = pyqtSignal() + live_view = pyqtSignal(bool) + trigger_autofocus = pyqtSignal() + get_config = pyqtSignal(ConfigRequest) + + +class PropertyChangeEvent(NamedTuple): + property: str + property_name: str + value: str | float + + +class CameraEvents(QObject): + config_updated = pyqtSignal(gp.CameraWidget) + + +class CameraWorker(QObject): + initialized = pyqtSignal() + state_changed = pyqtSignal(object) + property_changed = pyqtSignal(PropertyChangeEvent) + preview_image = pyqtSignal(LiveViewImage) + + def __init__(self, parent=None): + super(CameraWorker, self).__init__(parent) + + self.__logger = logging.getLogger(self.__class__.__name__) + + self.__logging_callback_extract_gp2_error = None + self.__state = None + self.commands = CameraCommands() + self.events = CameraEvents() + + self.camera: gp.Camera = None + self.camera_name: str = None + + self.filesCounter = 0 + self.captureComplete = False + + self.shouldCancel = False + self.liveView = False + self.timer: QTimer = None + + self.__last_ptp_error: NikonPTPError = None + + def initialize(self): + self.__logger.info("Init Camera Worker") + self.timer = QTimer() + + self.commands.capture_images.connect(self.captureImages) + self.commands.find_camera.connect(self.__find_camera) + self.commands.connect_camera.connect(self.__connect_camera) + self.commands.disconnect_camera.connect(lambda: self.__disconnect_camera(False)) + self.commands.reconnect_camera.connect(lambda: self.__disconnect_camera(True)) + self.commands.set_config.connect(self.__set_config) + self.commands.set_single_config.connect(self.__set_single_config) + self.commands.cancel.connect(self.__cancel) + self.commands.live_view.connect(lambda active: self.__start_live_view() if active else self.__stop_live_view()) + self.commands.trigger_autofocus.connect(self.__trigger_autofocus) + self.commands.get_config.connect(self.__get_config) + + self.__set_state(CameraStates.Waiting()) + + self.initialized.emit() + + # self.__logging_callback_python_logging = gp.check_result(gp.use_python_logging(mapping={ + # gp.GP_LOG_ERROR: logging.INFO, + # gp.GP_LOG_DEBUG: logging.DEBUG, + # gp.GP_LOG_VERBOSE: logging.DEBUG - 3, + # gp.GP_LOG_DATA: logging.DEBUG - 6})) + + self.__logging_callback_extract_gp2_error = gp.check_result( + gp.gp_log_add_func(gp.GP_LOG_ERROR, self.__extract_gp2_error_from_log)) + + def __extract_gp2_error_from_log(self, _level: int, domain: bytes, string: bytes, _data=None): + error_str = string + for ptp_error in NikonPTPError: + error_suffix = "(%s)" % ptp_error.value + if error_str.endswith(error_suffix): + self.__last_ptp_error = ptp_error + self.__logger.debug("PTP Error: {} {}".format(ptp_error, error_suffix)) + + def __set_state(self, state: CameraStates.StateType): + self.__state = state + self.__logger.debug("Set camera state: " + state.__class__.__name__) + self.state_changed.emit(state) + + @staticmethod + def __handle_camera_error(func): + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except gp.GPhoto2Error as err: + self.__logger.exception("Camera Error {0}: {1}".format(err.code, err.string)) + self.__set_state(CameraStates.ConnectionError(error=err)) + self.__disconnect_camera() + + return wrapper + + def __find_camera(self): + self.__set_state(CameraStates.Waiting()) + camera_list = None + while not camera_list and not self.thread().isInterruptionRequested(): + self.__logger.info("Waiting for camera...") + camera_list = list(gp.Camera.autodetect()) + sleep(1) + + name, _ = camera_list[0] + self.__set_state(CameraStates.Found(camera_name=name)) + + @__handle_camera_error + def __connect_camera(self): + self.__set_state(CameraStates.Connecting(self.camera_name)) + self.camera = gp.Camera() + self.__last_ptp_error = None + + self.camera.init() + self.empty_event_queue(1000) + with self.__open_config("read") as cfg: + self.events.config_updated.emit(cfg) + self.camera_name = "%s %s" % ( + cfg.get_child_by_name("manufacturer").get_value(), + cfg.get_child_by_name("cameramodel").get_value() + ) + self.__set_state(CameraStates.Ready(self.camera_name)) + + while self.camera: + if self.thread().isInterruptionRequested(): + self.__disconnect_camera() + self.thread().exit() + return + + try: + if isinstance(self.__state, CameraStates.LiveViewActive): + self.__live_view_capture_preview() + # self.empty_event_queue(1) + self.thread().msleep(50) + else: + self.empty_event_queue(1) + finally: + QApplication.processEvents() + + def __disconnect_camera(self, auto_reconnect=True): + self.__set_state(CameraStates.Disconnecting()) + + # Ignore errors while disconnecting + try: + self.camera.exit() + except gp.GPhoto2Error: + pass + except AttributeError: # Camera already gone + pass + + self.__set_state(CameraStates.Disconnected(camera_name=self.camera_name, auto_reconnect=auto_reconnect)) + self.camera = None + self.camera_name = None + + @__handle_camera_error + def __set_single_config(self, name, value): + self.__logger.info("Set config %s to %s" % (name, value)) + with self.__open_config("write") as cfg: + cfg_widget = cfg.get_child_by_name(name) + cfg_widget.set_value(value) + + self.empty_event_queue() + self.__emit_current_config() + + @__handle_camera_error + def __set_config(self, cfg: gp.CameraWidget): + self.camera.set_config(cfg) + + @__handle_camera_error + def __get_config(self, req: ConfigRequest): + with self.__open_config("read") as cfg: + req.signal.got_config.emit(cfg) + + def __emit_current_config(self): + with self.__open_config("read") as cfg: + self.events.config_updated.emit(cfg) + + def __cancel(self): + self.shouldCancel = True + self.__set_state(CameraStates.CaptureCancelling()) + + def empty_event_queue(self, timeout=100): + event_type, data = self.camera.wait_for_event(timeout) + + while event_type != gp.GP_EVENT_TIMEOUT: + self.__logger.debug("Event: %s, data: %s" % (EVENT_DESCRIPTIONS.get(event_type, "Unknown"), data)) + QApplication.processEvents() + + if event_type == gp.GP_EVENT_FILE_ADDED: + cam_file_path = os.path.join(data.folder, data.name) + self.__logger.info("New file: %s" % cam_file_path) + basename, extension = os.path.splitext(data.name) + + if isinstance(self.__state, CameraStates.CaptureInProgress) and not self.shouldCancel and not self.thread().isInterruptionRequested(): + tpl = Template(self.__state.capture_request.file_path_template) + file_target_path = tpl.substitute( + basename=basename, + extension=extension, + num=str(self.filesCounter + 1).zfill(3) + ) + cam_file = self.camera.file_get( + data.folder, data.name, gp.GP_FILE_TYPE_NORMAL) + self.__logger.info("Saving to %s" % file_target_path) + cam_file.save(file_target_path) + + current_capture_req = self.__state.capture_request + current_capture_req.signal.file_received.emit(file_target_path) + if self.filesCounter % current_capture_req.expect_files == 0: + num_captured = int(self.filesCounter / current_capture_req.expect_files) + self.__set_state(CameraStates.CaptureInProgress(self.__state.capture_request, num_captured)) + + self.filesCounter += 1 + else: + self.__logger.warning( + "Received file but capture not in progress, ignoring. State: " + self.__state.__class__.__name__) + + self.camera.file_delete(data.folder, data.name) + + elif event_type == gp.GP_EVENT_CAPTURE_COMPLETE: + self.captureComplete = True + + elif event_type == gp.GP_EVENT_UNKNOWN: + match = re.search(r'PTP Property (\w+) changed, "(\w+)" to "(-?\d+[,\.]\d+)"', data) + if match: + property = match.group(1) + property_name = match.group(2) + value_str = match.group(3) + try: + value = float(value_str.replace(',', '.')) + except ValueError: + value = value_str + self.property_changed.emit(PropertyChangeEvent(property=property, property_name=property_name, value=value)) + # print(f"Property '{property_name}' changed to {value}") + + # try to grab another event + event_type, data = self.camera.wait_for_event(1) + + @__handle_camera_error + def __start_live_view(self): + lightmeter: int + with self.__open_config("write") as cfg: + self.__try_set_config(cfg, "capturetarget", "Internal RAM") + self.__try_set_config(cfg, "recordingmedia", "SDRAM") + self.__try_set_config(cfg, "viewfinder", 1) + self.__try_set_config(cfg, "liveviewsize", "VGA") + lightmeter = cfg.get_child_by_name("lightmeter").get_value() + self.__set_state(CameraStates.LiveViewStarted(current_lightmeter_value=lightmeter)) + self.__set_state(CameraStates.LiveViewActive()) + + @__handle_camera_error + def __live_view_capture_preview(self): + lightmeter = None + try: + camera_file = self.camera.capture_preview() + file_data = camera_file.get_data_and_size() + image = Image.open(io.BytesIO(file_data)) + + self.empty_event_queue(1) + self.preview_image.emit(LiveViewImage(image=image)) + + # + # self.preview_image.emit(LiveViewImage(image=image, lightmeter_value=lightmeter)) + except gp.GPhoto2Error: + self.__stop_live_view() + + @__handle_camera_error + def __stop_live_view(self): + if isinstance(self.__state, CameraStates.LiveViewActive): + with self.__open_config("write") as cfg: + self.__try_set_config(cfg, "viewfinder", 0) + self.__try_set_config(cfg, "autofocusdrive", 0) + self.__set_state(CameraStates.LiveViewStopped()) + self.__set_state(CameraStates.Ready(self.camera_name)) + + @__handle_camera_error + def __trigger_autofocus(self): + lightmeter = None + self.__set_state(CameraStates.FocusStarted()) + try: + with self.__open_config("write") as cfg: + lightmeter = cfg.get_child_by_name("lightmeter").get_value() + self.__try_set_config(cfg, "autofocusdrive", 1) + self.__set_state(CameraStates.FocusFinished(success=True)) + except gp.GPhoto2Error: + if self.__last_ptp_error == NikonPTPError.OutOfFocus: + self.__logger.warning("Could not get focus (light: %s)." % lightmeter) + self.__set_state(CameraStates.FocusFinished(success=False)) + else: + raise + finally: + with self.__open_config("write") as cfg: + self.__try_set_config(cfg, "autofocusdrive", 0) + self.__set_state(CameraStates.LiveViewActive()) + # TODO handle general camera error + + def captureImages(self, capture_req: CaptureImagesRequest): + if isinstance(self.__state, CameraStates.LiveViewActive): + self.__stop_live_view() + + self.__logger.info("Start capture (%s)", str(capture_req)) + + timer = QElapsedTimer() + try: + timer.start() + self.empty_event_queue() + self.filesCounter = 0 + self.captureComplete = False + + self.__set_state(CameraStates.CaptureInProgress(capture_request=capture_req, num_captured=0)) + + with self.__open_config("write") as cfg: + self.__try_set_config(cfg, "capturetarget", "Internal RAM") + self.__try_set_config(cfg, "recordingmedia", "SDRAM") + self.__try_set_config(cfg, "viewfinder", 1) + self.__try_set_config(cfg, "autofocusdrive", 0) + self.__try_set_config(cfg, "focusmode", "Manual") + self.__try_set_config(cfg, "focusmode2", "MF (fixed)") + if capture_req.image_quality: + self.__try_set_config(cfg, "imagequality", capture_req.image_quality) + + self.thread().sleep(1) + + remaining = capture_req.num_images * capture_req.expect_files + while remaining > 0 and not self.shouldCancel and not self.thread().isInterruptionRequested(): + burst = min(capture_req.max_burst, int(remaining / capture_req.expect_files)) + if not capture_req.manual_trigger: + with self.__open_config("write") as cfg: + self.__try_set_config(cfg, "burstnumber", burst) + + self.captureComplete = False + if not capture_req.manual_trigger: + self.camera.trigger_capture() + while not self.captureComplete: + self.empty_event_queue(timeout=100) + QApplication.processEvents() + + remaining = capture_req.num_images * capture_req.expect_files - self.filesCounter + self.__logger.info("Curr. files: {0} (remaining: {1}).".format(self.filesCounter, remaining)) + + if not capture_req.manual_trigger: + with self.__open_config("write") as cfg: + self.__try_set_config(cfg, "burstnumber", burst) + + self.__logger.info("No. Files captured: {0} (took {1}).".format(self.filesCounter, timer.elapsed())) + + if not self.shouldCancel: + num_captured = int(self.filesCounter / capture_req.expect_files) + self.__set_state( + CameraStates.CaptureFinished(capture_req, elapsed_time=timer.elapsed(), num_captured=num_captured)) + else: + self.__logger.info("Capture cancelled") + self.__set_state(CameraStates.CaptureCanceled(capture_req, elapsed_time=timer.elapsed())) + + except gp.GPhoto2Error as err: + self.__set_state(CameraStates.CaptureError(capture_req, err.string)) + finally: + self.shouldCancel = False + # If camera is still there, try to reset Camera to a default state + if self.camera: + try: + with self.__open_config("write") as cfg: + # TODO: enable again when trigger works + self.__try_set_config(cfg, "viewfinder", 0) + if not capture_req.manual_trigger: + self.__try_set_config(cfg, "burstnumber", 1) + self.empty_event_queue() + self.__set_state(CameraStates.Ready(self.camera_name)) + except gp.GPhoto2Error as err: + self.__set_state(CameraStates.ConnectionError(err.string)) + + @contextmanager + def __open_config(self, mode: Literal["read", "write"]) -> Generator[CameraWidget, None, None]: + cfg: CameraWidget = None + try: + cfg = self.camera.get_config() + yield cfg + finally: + if mode == "write": + self.camera.set_config(cfg) + elif not mode == "read": + raise Exception("Invalid cfg open mode: %s" % mode) + + def __try_set_config(self, config: CameraWidget, name: str, value) -> None: + try: + config_widget = config.get_child_by_name(name) + config_widget.set_value(value) + self.__logger.info("Set config '%s' to %s." % (name, str(value))) + except gp.GPhoto2Error: + self.__logger.error("Config '%s' not supported by camera." % name) + + def __del__(self): + self.__disconnect_camera() diff --git a/helpers.py b/helpers.py new file mode 100644 index 0000000..b407e79 --- /dev/null +++ b/helpers.py @@ -0,0 +1,9 @@ +from os import path +import sys + +def get_ui_path(file: str): + if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): + bundle_dir = sys._MEIPASS + else: + bundle_dir = path.abspath(path.dirname("__FILE__")) + return path.join(bundle_dir, file) \ No newline at end of file diff --git a/main.py b/main.py index 7f2f782..05400fb 100755 --- a/main.py +++ b/main.py @@ -1,918 +1,919 @@ -import asyncio.exceptions -import json -import logging -import os -import sys -import threading -from enum import Enum -from pathlib import Path - -import gphoto2 as gp -import qasync -from qasync import QEventLoop, QThreadExecutor -from PIL.ImageQt import ImageQt -from PyQt6.QtCore import QThread, QSettings, QStandardPaths, pyqtSignal, Qt -from PyQt6.QtGui import QPixmap, QAction, QPixmapCache, QIcon, QColor, QCloseEvent, QBrush, QPainter, QCursor -from PyQt6.QtWidgets import ( - QApplication, QMainWindow, QPushButton, QWidget, QFrame, QLineEdit, - QComboBox, QLabel, QToolBox, QProgressBar, QMenu, QAbstractButton, QInputDialog, QMessageBox, QStyle, QDialog, - QLCDNumber, QGraphicsView, QSizePolicy, QVBoxLayout -) -from PyQt6.uic import loadUi -from send2trash import send2trash - - -try: - from bt_controller_controller import BtControllerController, BtControllerCommand, BtControllerRequest, BtControllerState - BT_AVAILABLE = True -except: - BT_AVAILABLE = False - -from camera_worker import CameraWorker, CaptureImagesRequest, CameraStates, PropertyChangeEvent, ConfigRequest -from open_session_dialog import OpenSessionDialog -from photo_browser import PhotoBrowser -from settings_dialog import SettingsDialog -from spinner import Spinner -from camera_config_dialog import CameraConfigDialog - - -class Session: - def __init__(self, name, working_dir): - self.images_dir_loaded = False - self.preview_dir_loaded = False - - self.name = name - self.session_dir = os.path.join(working_dir, self.name) - self.preview_dir = os.path.join(self.session_dir, "test") - self.images_dir = os.path.join(self.session_dir, "images") - self.preview_count = 0 - - -# Corresponds to itemIndex of the captureView QToolBox -class CaptureMode(Enum): - Preview = 0 - RTI = 1 - - -class RTICaptureMainWindow(QMainWindow): - find_camera = pyqtSignal() - - def __init__(self, parent=None): - super().__init__(parent) - - self.logger = logging.getLogger(self.__class__.__name__) - - self.camera_worker = CameraWorker() - self.__session: Session = None - self.camera_state: CameraStates.StateType = None - self.cam_config_dialog: CameraConfigDialog = None - - # Set up UI and find controls - loadUi('ui/main_window.ui', self) - self.disconnect_camera_button: QPushButton = self.findChild(QPushButton, "disconnectCameraButton") - self.connect_camera_button: QPushButton = self.findChild(QPushButton, "connectCameraButton") - self.camera_busy_spinner: Spinner = self.findChild(QWidget, "cameraBusySpinner") - self.camera_state_label: QLabel = self.findChild(QLabel, "cameraStateLabel") - self.camera_state_icon: QLabel = self.findChild(QLabel, "cameraStateIcon") - - self.bluetooth_frame: QFrame = self.findChild(QFrame, "bluetoothFrame") - self.bluetooth_state_icon: QLabel = self.findChild(QLabel, "bluetoothStateLabel") - self.bluetooth_connecting_spinner: Spinner = self.findChild(QWidget, "bluetoothConnectingSpinner") - - self.session_controls: QWidget = self.findChild(QWidget, "sessionControls") - self.session_name_edit: QLineEdit = self.findChild(QLineEdit, "sessionNameEdit") - self.start_session_button: QPushButton = self.findChild(QPushButton, "startSessionButton") - self.close_session_button: QPushButton = self.findChild(QPushButton, "closeSessionButton") - self.session_loading_spinner: Spinner = self.findChild(QWidget, "sessionLoadingSpinner") - self.session_menu_button: QAbstractButton = self.findChild(QWidget, "sessionMenuButton") - - self.live_view_controls: QWidget = self.findChild(QWidget, "liveViewControls") - self.toggle_live_view_button: QPushButton = self.findChild(QPushButton, "toggleLiveViewButton") - self.autofocus_button: QPushButton = self.findChild(QPushButton, "autofocusButton") - self.light_lcd_number: QLCDNumber = self.findChild(QLCDNumber, "lightLCDNumber") - self.light_lcd_frame: QFrame = self.findChild(QFrame, "lightLCDFrame") - self.live_view_error_label: QLabel = self.findChild(QLabel, "liveviewErrorLabel") - - self.preview_led_select: QComboBox = self.findChild(QComboBox, "previewLedSelect") - self.preview_led_frame: QFrame = self.findChild(QFrame, "previewLedFrame") - - self.capture_view: QToolBox = self.findChild(QToolBox, "captureView") - self.rtiPage: QWidget = self.findChild(QWidget, "rtiPage") - self.previewPage: QWidget = self.findChild(QWidget, "previewPage") - self.previewImageBrowser: PhotoBrowser = self.findChild(QWidget, "previewImageBrowser") - self.rtiImageBrowser: PhotoBrowser = self.findChild(QWidget, "rtiImageBrowser") - self.capture_button: QPushButton = self.findChild(QPushButton, "captureButton") - self.cancel_capture_button: QPushButton = self.findChild(QPushButton, "cancelCaptureButton") - - self.rti_progress_view: QWidget = self.findChild(QWidget, "rtiProgressView") - self.capture_progress_bar: QProgressBar = self.findChild(QProgressBar, "captureProgressBar") - self.capture_status_label: QLabel = self.findChild(QLabel, "captureStatusLabel") - - self.camera_controls: QFrame = self.findChild(QFrame, "cameraControls") - self.camera_config_controls: QWidget = self.findChild(QWidget, "cameraConfigControls") - self.f_number_select: QComboBox = self.findChild(QComboBox, "fNumberSelect") - self.shutter_speed_select: QComboBox = self.findChild(QComboBox, "shutterSpeedSelect") - self.crop_select: QComboBox = self.findChild(QComboBox, "cropSelect") - self.iso_select: QComboBox = self.findChild(QComboBox, "isoSelect") - - self.settings_button: QPushButton = self.findChild(QPushButton, "settingsButton") - - self.session_menu = QMenu(self) - self.open_session_action = QAction('Vorherige Sitzung öffnen...', self) - self.open_session_action.triggered.connect(self.open_existing_session_directory) - self.open_session_action.setIcon(QIcon("ui/open.svg")) - self.rename_session_action = QAction('Sitzung umbenennen...', self) - self.rename_session_action.triggered.connect(self.rename_current_session) - self.rename_session_action.setIcon(QIcon("ui/rename.svg")) - self.session_menu.addActions([self.open_session_action, self.rename_session_action]) - - self.settings_menu = QMenu(self) - self.open_program_settings_action = QAction('Allgemeine Einstellungen') - self.open_program_settings_action.triggered.connect(self.open_settings) - self.open_program_settings_action.setIcon(QIcon("ui/general_settings.svg")) - self.open_advanced_cam_config_action = QAction('Erweiterte Kamerakonfiguration') - self.open_advanced_cam_config_action.triggered.connect(self.open_advanced_capture_settings) - self.open_advanced_cam_config_action.setIcon(QIcon("ui/cam_settings.svg")) - self.settings_menu.addActions([self.open_program_settings_action, self.open_advanced_cam_config_action]) - - self.mirror_graphics_view: QGraphicsView | None = None - self.second_screen_window: QDialog | None = None - - self.session_name_edit.textChanged.connect( - lambda text: self.start_session_button.setEnabled( - True if len(text) > 0 else False - )) - - self.cancel_capture_button.setVisible(False) - - for i in range(60): - self.preview_led_select.addItem(str(i + 1), i) - - self.set_camera_connection_busy(True) - self.capture_mode = CaptureMode.Preview - self.set_session(None) - - self.camera_thread = QThread() - self.camera_worker.moveToThread(self.camera_thread) - self.camera_worker.state_changed.connect(self.set_camera_state) - self.camera_worker.events.config_updated.connect(self.on_config_update) - self.camera_worker.property_changed.connect(self.on_property_change) - self.camera_worker.preview_image.connect(lambda image: self.previewImageBrowser.show_preview(ImageQt(image.image))) - self.camera_worker.initialized.connect(lambda: self.camera_worker.commands.find_camera.emit()) - self.camera_thread.started.connect(self.camera_worker.initialize) - self.camera_thread.start() - - self.bt_controller: BtControllerController | None = None - - self.update_ui_bluetooth() - - self.init_mirror_view() - QApplication.instance().screenAdded.connect(self.reset_mirror_view) - QApplication.instance().screenRemoved.connect(self.reset_mirror_view) - QApplication.instance().primaryScreenChanged.connect(self.reset_mirror_view) - - - def init_mirror_view(self): - screens = QApplication.screens() - mirror_view_enabled = QSettings().value("enableSecondScreenMirror", type=bool) - if mirror_view_enabled and len(screens) > 1: - second_screen = screens[1] - self.second_screen_window = QDialog() - self.second_screen_window.setWindowFlags(Qt.WindowType.WindowStaysOnTopHint | Qt.WindowType.FramelessWindowHint) - self.second_screen_window.setWindowTitle("Secondary View") - self.second_screen_window.setGeometry(second_screen.availableGeometry()) - self.second_screen_window.showFullScreen() - - self.mirror_graphics_view = QGraphicsView(self.second_screen_window) - self.mirror_graphics_view.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) - self.mirror_graphics_view.setBackgroundBrush(QBrush(QColor(30, 30, 30))) - self.mirror_graphics_view.setRenderHint(QPainter.RenderHint.Antialiasing) - layout = QVBoxLayout(self.second_screen_window) - layout.setContentsMargins(0, 0, 0, 0) - layout.addWidget(self.mirror_graphics_view) - self.second_screen_window.setLayout(layout) - - self.update_mirror_view() - - def disable_mirror_view(self): - if self.second_screen_window: - self.second_screen_window.close() - self.second_screen_window = None - self.mirror_graphics_view = None - - def reset_mirror_view(self): - self.disable_mirror_view() - self.init_mirror_view() - - async def init_bluetooth(self): - if not self.bt_controller: - self.bt_controller = BtControllerController() - self.bt_controller.state_changed.connect(self.update_ui_bluetooth) - await self.bt_controller.connect() - - @property - def capture_mode(self) -> CaptureMode: - return CaptureMode(self.capture_view.currentIndex()) - - @capture_mode.setter - def capture_mode(self, mode: CaptureMode): - self.capture_view.setCurrentIndex(mode.value) - self.update_ui() - - def get_camera_state(self): - return self.camera_state - - def set_camera_state(self, state: CameraStates.StateType): - self.logger.debug("Handle camera state:" + state.__class__.__name__) - self.camera_state = state - self.update_ui() - - match state: - case CameraStates.Waiting(): - pass - - case CameraStates.Found(): - self.camera_worker.commands.connect_camera.emit() - - case CameraStates.Disconnected(): - if state.auto_reconnect: - self.camera_worker.commands.find_camera.emit() - - case CameraStates.Connecting(): - pass - - case CameraStates.Disconnecting(): - if self.cam_config_dialog: - self.cam_config_dialog.reject() - - case CameraStates.ConnectionError(): - self.logger.error(state.error) - - case CameraStates.Ready(): - pass - - case CameraStates.LiveViewStarted(): - if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED: - request = BtControllerRequest(BtControllerCommand.PILOT_LIGHT_ON) - request.signals.success.connect(lambda: print("BT Success!")) - request.signals.error.connect(lambda e: logging.exception(e)) - self.bt_controller.send_command(request) - - case CameraStates.LiveViewStopped(): - if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED: - request = BtControllerRequest(BtControllerCommand.LED_OFF) - request.signals.success.connect(lambda: print("BT Success!")) - request.signals.error.connect(lambda e: logging.exception(e)) - self.bt_controller.send_command(request) - - case CameraStates.Disconnecting(): - pass - - case CameraStates.CaptureInProgress(): - pass - - case CameraStates.CaptureFinished(): - if self.capture_mode == CaptureMode.Preview: - self.session.preview_count += 1 - else: - self.write_lp() - self.dump_camera_config() - - case CameraStates.CaptureCanceled(): - pass - - def update_ui(self): - # variables on which the UI state depends - camera_state = self.camera_state - - has_session = self.session is not None - session_loaded = has_session \ - and self.session.preview_dir_loaded \ - and self.session.images_dir_loaded - capture_mode = self.capture_mode - - # configure UI according to the capture mode - for item_index in range(self.capture_view.count()): - if item_index == self.capture_mode.value: - self.capture_view.setItemIcon(item_index, QIcon("ui/chevron_down.svg")) - else: - self.capture_view.setItemIcon(item_index, QIcon("ui/chevron_right.svg")) - - - - - # configure UI according to the state of the current session - self.session_name_edit.setEnabled(not has_session) - self.start_session_button.setVisible(not has_session) - self.open_session_action.setEnabled(not has_session) - self.rename_session_action.setEnabled(session_loaded) - - self.close_session_button.setVisible(has_session) - self.close_session_button.setText("Sitzung beenden" if session_loaded else "Laden abbrechen...") - - self.session_loading_spinner.isAnimated = has_session and not session_loaded - self.capture_view.setEnabled(has_session) - - self.capture_progress_bar.setMaximum(60) - self.capture_progress_bar.setValue(self.rtiImageBrowser.num_files() if session_loaded else 0) - - if has_session: - self.session_name_edit.setText(self.session.name) - - # configure UI according to the camera state - match camera_state: - case CameraStates.Waiting(): - self.camera_state_label.setText("Suche Kamera...") - self.camera_state_icon.setPixmap(QPixmap("ui/camera_waiting.png")) - self.open_advanced_cam_config_action.setEnabled(False) - - self.connect_camera_button.setEnabled(False) - self.disconnect_camera_button.setVisible(False) - self.camera_busy_spinner.isAnimated = True - self.capture_status_label.setText(None) - - self.live_view_controls.setEnabled(False) - self.light_lcd_frame.setEnabled(False) - self.light_lcd_number.display(None) - self.live_view_error_label.setText(None) - - self.camera_controls.setEnabled(False) - self.camera_config_controls.setEnabled(False) - self.capture_button.setText("Nicht verbunden") - self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) - self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) - - case CameraStates.Found(): - pass - - case CameraStates.Disconnected(): - self.camera_state_label.setText("Kamera getrennt
%s" % camera_state.camera_name) - self.camera_state_icon.setPixmap(QPixmap("ui/camera_not_ok.png")) - - self.connect_camera_button.setEnabled(True) - self.connect_camera_button.setVisible(True) - self.disconnect_camera_button.setVisible(False) - self.camera_busy_spinner.isAnimated = False - - self.toggle_live_view_button.setChecked(False) - self.autofocus_button.setEnabled(False) - - - self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) - self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) - - case CameraStates.Connecting(): - self.camera_state_label.setText("Verbinde...
%s" % camera_state.camera_name) - self.connect_camera_button.setEnabled(False) - self.camera_busy_spinner.isAnimated = True - - case CameraStates.ConnectionError(): - pass - - case CameraStates.Ready(): - self.camera_state_label.setText("Kamera verbunden
%s" % camera_state.camera_name) - self.camera_state_icon.setPixmap(QPixmap("ui/camera_ok.png")) - - self.open_advanced_cam_config_action.setEnabled(True) - - self.disconnect_camera_button.setEnabled(True) - self.disconnect_camera_button.setVisible(True) - self.connect_camera_button.setVisible(False) - self.camera_busy_spinner.isAnimated = False - - self.live_view_controls.setEnabled(True) - self.toggle_live_view_button.setChecked(False) - self.autofocus_button.setEnabled(False) - - self.camera_controls.setEnabled(True if session_loaded else False) - self.camera_config_controls.setEnabled(True) - if self.capture_mode == CaptureMode.Preview: - self.capture_button.setText("Vorschaubild aufnehmen") - else: - self.capture_button.setText("RTI-Aufnahme starten") - self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) - self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) - - case CameraStates.Disconnecting(): - self.camera_state_label.setText("Trenne Kamera...") - self.disconnect_camera_button.setEnabled(False) - self.disconnect_camera_button.setVisible(True) - self.open_advanced_cam_config_action.setEnabled(False) - - self.live_view_controls.setEnabled(False) - - self.camera_controls.setEnabled(False) - self.camera_config_controls.setEnabled(False) - self.capture_button.setText("Nicht verbunden") - - case CameraStates.LiveViewStarted(): - self.camera_config_controls.setEnabled(False) - self.autofocus_button.setEnabled(True) - self.light_lcd_frame.setEnabled(True) - self.update_lightmeter(camera_state.current_lightmeter_value) - - case CameraStates.LiveViewActive(): - pass - - case CameraStates.FocusStarted(): - self.autofocus_button.setEnabled(False) - - case CameraStates.FocusFinished(): - self.autofocus_button.setEnabled(True) - if not camera_state.success: - self.live_view_error_label.setText("Konnte nicht fokussieren. Zu dunkel?") - else: - self.live_view_error_label.setText(None) - - case CameraStates.LiveViewStopped(): - self.previewImageBrowser.show_preview(None) - self.light_lcd_number.display(None) - self.light_lcd_frame.setEnabled(False) - self.live_view_error_label.setText(None) - - - case CameraStates.CaptureInProgress(): - # if prev - # disable combo boxes - self.session_controls.setEnabled(False) - self.disconnect_camera_button.setEnabled(False) - - self.live_view_controls.setEnabled(False) - self.toggle_live_view_button.setChecked(False) - - - self.capture_button.setVisible(False) - self.cancel_capture_button.setVisible(True) - self.cancel_capture_button.setEnabled(True) - self.capture_status_label.setStyleSheet(None) - self.capture_status_label.setText(None) - - self.camera_config_controls.setEnabled(False) - - if self.capture_mode == CaptureMode.Preview: - self.capture_view.setItemEnabled(CaptureMode.RTI.value, False) - else: - self.capture_view.setItemEnabled(CaptureMode.Preview.value, False) - - self.capture_progress_bar.setMaximum(camera_state.capture_request.num_images) - self.capture_progress_bar.setValue(camera_state.num_captured) - - case CameraStates.CaptureCancelling(): - self.cancel_capture_button.setEnabled(False) - - case CameraStates.CaptureCanceled(): - self.capture_status_label.setText("Aufnahme abgebrochen!") - self.capture_status_label.setStyleSheet("color: red;") - - self.session_controls.setEnabled(True) - self.cancel_capture_button.setVisible(False) - self.capture_button.setVisible(True) - self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) - self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) - - case CameraStates.CaptureError(): - self.capture_status_label.setText("Fehler: %s" % str(camera_state.error)) - self.capture_status_label.setStyleSheet("color: red;") - - self.session_controls.setEnabled(True) - self.cancel_capture_button.setVisible(False) - self.capture_button.setVisible(True) - self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) - self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) - - case CameraStates.CaptureFinished(): - self.capture_status_label.setText("Fertig in %ss!" % str(camera_state.elapsed_time / 1000)) - self.capture_progress_bar.setValue(camera_state.num_captured) - self.session_controls.setEnabled(True) - self.cancel_capture_button.setVisible(False) - self.capture_button.setVisible(True) - self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) - self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) - - def update_ui_bluetooth(self): - if self.bt_controller is not None: - self.bluetooth_frame.setVisible(True) - self.preview_led_frame.setVisible(True) - - match self.bt_controller.state: - case BtControllerState.DISCONNECTED: - self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_disconnected.svg")) - self.preview_led_select.setEnabled(False) - self.bluetooth_connecting_spinner.isAnimated = False - self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller getrennt") - case BtControllerState.CONNECTING: - self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_connecting.svg")) - self.bluetooth_connecting_spinner.isAnimated = True - self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird aufgebaut...") - case BtControllerState.CONNECTED: - self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_connected.svg")) - self.preview_led_select.setEnabled(True) - self.bluetooth_connecting_spinner.isAnimated = False - self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller aktiv") - case BtControllerState.DISCONNECTING: - self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_connecting.svg")) - self.bluetooth_connecting_spinner.isAnimated = True - self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird getrennt...") - - else: - self.bluetooth_frame.setVisible(False) - self.preview_led_frame.setVisible(False) - - @property - def session(self) -> Session: - return self.__session - - def set_session(self, _session): - self.__session = _session - self.update_ui() - - if _session is None: - self.session_name_edit.clear() - self.session_name_edit.setFocus() - return - - os.makedirs(_session.session_dir, exist_ok=True) - os.makedirs(_session.preview_dir, exist_ok=True) - os.makedirs(_session.images_dir, exist_ok=True) - - # both browser will emit the directory_loaded signal connected to the - # session_directory_loaded slot below (in Qt Designer/Creator, main_window.ui) - self.previewImageBrowser.open_directory(self.session.preview_dir) - self.rtiImageBrowser.open_directory(self.session.images_dir) - - def on_capture_mode_changed(self): - self.update_mirror_view() - if self.capture_mode == CaptureMode.RTI and isinstance(self.camera_state, CameraStates.LiveViewActive): - self.camera_worker.commands.live_view.emit(False) - self.update_ui() - - def update_mirror_view(self): - if self.capture_mode == CaptureMode.Preview: - self.previewImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view) - else: - self.rtiImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view) - - def open_settings(self): - q_settings = QSettings() - dialog = SettingsDialog(q_settings, self) - dialog.setModal(True) - if dialog.exec(): - for name, value in dialog.settings.items(): - q_settings.setValue(name, value) - if name == "maxPixmapCache": - QPixmapCache.setCacheLimit(value * 1024) - elif name == "enableBluetooth": - event_loop = asyncio.get_running_loop() - if value is True: - event_loop.create_task(self.init_bluetooth()) - elif self.bt_controller: - self.bt_controller.bt_disconnect() - self.update_ui_bluetooth() - elif name == "enableSecondScreenMirror": - self.reset_mirror_view() - - def open_advanced_capture_settings(self): - def open_dialog(cfg: gp.CameraWidget): - self.cam_config_dialog = CameraConfigDialog(cfg, self) - if self.cam_config_dialog.exec(): - self.camera_worker.commands.set_config.emit(cfg) - print(cfg.__dict__) - self.cam_config_dialog = None - - req = ConfigRequest() - req.signal.got_config.connect(open_dialog) - self.camera_worker.commands.get_config.emit(req) - - def set_camera_connection_busy(self, busy: bool = True): - self.connect_camera_button.setEnabled(not busy) - self.disconnect_camera_button.setEnabled(not busy) - self.camera_busy_spinner.isAnimated = busy - - def connect_camera(self): - self.camera_worker.commands.connect_camera.emit() - - def disconnect_camera(self): - self.camera_worker.commands.disconnect_camera.emit() - - def create_session(self): - name = self.session_name_edit.text() - - print("Create" + name) - session = Session(name, QSettings().value("workingDirectory")) - if Path(session.session_dir).exists(): - result = QMessageBox.warning(self, "Fehler", - "Sitzung %s existiert bereits. Soll sie erneut geöffnet werden?" % name, - QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No) - if result == QMessageBox.StandardButton.No: - return - - self.set_session(session) - - def session_directory_loaded(self, path): - if not self.session: - return - - if os.path.normpath(path) == os.path.normpath(self.session.preview_dir): - self.session.preview_dir_loaded = True - self.session.preview_count = self.previewImageBrowser.last_index() - - elif os.path.normpath(path) == os.path.normpath(self.session.images_dir): - self.session.images_dir_loaded = True - - self.update_ui() - - def close_session(self): - self.previewImageBrowser.close_directory() - self.rtiImageBrowser.close_directory() - self.set_session(None) - - def show_session_menu(self): - self.session_menu.exec(self.session_menu_button.mapToGlobal(self.session_menu_button.rect().bottomLeft())) - - def show_settings_menu(self): - self.settings_menu.exec(self.settings_button.mapToGlobal(self.session_menu_button.rect().bottomLeft())) - - def open_existing_session_directory(self): - working_dir = QSettings().value("workingDirectory") - dialog = OpenSessionDialog(working_dir, self) - path = dialog.get_session_path() - if path: - session_name = Path(path).name - self.set_session(Session(session_name, working_dir)) - - def rename_current_session(self): - new_name, ok = QInputDialog.getText(self, "Aktuelle Sitzung umbenennen", "Neuer Name", text=self.session.name) - if ok: - session_dir = self.session.session_dir - session_dir_parent = Path(session_dir).parent - new_session_dir = os.path.join(session_dir_parent, os.path.join(session_dir_parent, new_name)) - - if Path(new_session_dir).exists(): - QMessageBox.critical(self, "Fehler", "Sitzung %s existiert bereits." % new_name) - return - - image_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)] - preview_files = [os.path.join(self.session.preview_dir, f) for f in os.listdir(self.session.preview_dir)] - - for file in image_files + preview_files: - path = Path(file) - parent_path = path.parent - file_name = path.name - basename, ext = os.path.splitext(file_name) - if basename.startswith(self.session.name): - new_filename = basename.replace(self.session.name, new_name, 1) + ext - new_path = os.path.join(parent_path, new_filename) - os.rename(path, new_path) - - os.rename(session_dir, new_session_dir) - self.close_session() - self.set_session(Session(new_name, session_dir_parent)) - - def write_lp(self): - file_names = [os.path.basename(file_path) for file_path in self.rtiImageBrowser.files()] - num_files = len(file_names) - if num_files != 60: - logging.warning("Wrong number of files, not writing LP file.") - return - - lp_template_path = "cceh-dome-template.lp" - lp_output_path = os.path.join(self.session.images_dir, self.session.name + ".lp") - with open(lp_template_path, 'r') as lp_template_file, open(lp_output_path, 'w') as lp_output_file: - logging.info("Writing LP file: " + lp_output_path) - lp_output_file.write(str(num_files) + "\n") - for i, input_line in enumerate(lp_template_file): - output_line = file_names[i] + " " + input_line - lp_output_file.write(output_line) - - def dump_camera_config(self): - output_path = os.path.join(self.session.images_dir, "camera_config.json") - self.logger.info(f"Writing camera configuration dump: {output_path}") - - if not self.session: - return - def on_got_config(cfg: gp.CameraWidget): - cfg_dict = {} - - def traverse_widget(widget, widget_dict): - widget_type = widget.get_type() - - if widget_type == gp.GP_WIDGET_SECTION or widget_type == gp.GP_WIDGET_WINDOW: - # If the widget is a section, traverse its children - child_count = widget.count_children() - for i in range(child_count): - child = widget.get_child(i) - child_dict = {} - traverse_widget(child, child_dict) - widget_dict[child.get_name()] = child_dict - else: - try: - widget_dict['value'] = widget.get_value() - except gp.GPhoto2Error as err: - if err.code == -2: - self.logger.warning(f"Could not get config value for {cfg.get_label()} ({cfg.get_name()}).") - - widget_dict['label'] = widget.get_label() - - return widget_dict - - traverse_widget(cfg, cfg_dict) - - try: - with open(output_path, "w") as output_file: - json.dump(cfg_dict, output_file, indent=4) - except Exception as e: - self.logger.error(f"Could not write camera config dump to {output_path}:") - self.logger.exception(e) - - - req = ConfigRequest() - req.signal.got_config.connect(on_got_config) - self.camera_worker.commands.get_config.emit(req) - - - def config_hookup_select(self, config: gp.CameraWidget, config_name, combo_box: QComboBox, value_map: dict = None): - try: - combo_box.currentIndexChanged.disconnect() - except: - pass - cfg = config.get_child_by_name(config_name) - combo_box.clear() - for idx, choice in enumerate(cfg.get_choices()): - choice_label = value_map[choice] if value_map and choice in value_map else choice - combo_box.addItem(choice_label, choice) - if choice == cfg.get_value(): - combo_box.setCurrentIndex(idx) - - combo_box.currentIndexChanged.connect(lambda: self.camera_worker.commands.set_single_config.emit( - config_name, combo_box.currentData() - )) - - def on_config_update(self, config: gp.CameraWidget): - self.config_hookup_select(config, "iso", self.iso_select) - self.config_hookup_select(config, "f-number", self.f_number_select) - self.config_hookup_select(config, "shutterspeed2", self.shutter_speed_select) - self.config_hookup_select(config, "d030", self.crop_select, { - "0": "Voll", - "1": "Klein", - "2": "Mittel", - "3": "Mittel 2" - }) - - def on_property_change(self, event: PropertyChangeEvent): - match event.property_name: - case "lightmeter": - if isinstance(self.camera_state, CameraStates.LiveViewActive): - self.update_lightmeter(event.value) - - def update_lightmeter(self, value): - if isinstance(value, float): - self.light_lcd_number.display(int(value)) - else: - self.light_lcd_number.display(None) - - def enable_live_view(self, enable: bool): - self.camera_worker.commands.live_view.emit(enable) - - def trigger_autofocus(self): - self.camera_worker.commands.trigger_autofocus.emit() - - def capture_image(self): - capture_req: CaptureImagesRequest - - # Capture Previews - if self.capture_mode == CaptureMode.Preview: - filename_template = self.session.name.replace(" ", "_") + "_test_" + str( - self.session.preview_count + 1) + "${extension}" - file_path_template = os.path.join(self.session.preview_dir, filename_template) - capture_req = CaptureImagesRequest(file_path_template, num_images=1, image_quality="JPEG Fine") - - # Capture RTI Series - else: - if self.rtiImageBrowser.num_files() > 0: - message_box = QMessageBox(QMessageBox.Icon.Warning, "RTI-Serie aufnehmen", - "Vorhandene Aufnahmen werden gelöscht.") - message_box.addButton( - QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogCancelButton), "Abbrechen"), - QMessageBox.ButtonRole.NoRole) - message_box.addButton( - QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogOkButton), "Fortfahren"), - QMessageBox.ButtonRole.YesRole) - if not message_box.exec(): - return - - existing_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)] - send2trash(existing_files) - - filename_template = self.session.name.replace(" ", "_") + "_${num}${extension}" - file_path_template = os.path.join(self.session.images_dir, filename_template) - capture_req = CaptureImagesRequest(file_path_template, num_images=60, expect_files=2, - max_burst=int(QSettings().value("maxBurstNumber")), skip=0, image_quality="NEF+Fine") - self.capture_progress_bar.setMaximum(119) - self.capture_progress_bar.setValue(0) - - def on_file_received(path: str): - print("Rec: " + path) - capture_req.signal.file_received.connect(on_file_received) - - def start_capture(show_button_message: bool): - if self.capture_mode == CaptureMode.RTI and show_button_message: - press_buttons_dialog = QDialog() - loadUi("ui/press-buttons-dialog.ui", press_buttons_dialog) - if not press_buttons_dialog.exec(): - return - - self.camera_worker.commands.capture_images.emit(capture_req) - - if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED: - initial_led = 0 if self.capture_mode == CaptureMode.RTI else self.preview_led_select.currentData() - request = BtControllerRequest(BtControllerCommand.SET_LED, initial_led) - request.signals.success.connect(lambda: start_capture(False)) - request.signals.error.connect(lambda: start_capture(True)) - self.bt_controller.send_command(request) - else: - start_capture(True) - - - def on_capture_cancelled(self): - logging.info("Capture cancelled") - - def cancel_capture(self): - self.cancel_capture_button.setEnabled(False) - self.camera_worker.commands.cancel.emit() - - def closeEvent(self, event: QCloseEvent): - QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor) - self.camera_thread.requestInterruption() - self.camera_thread.exit() - if self.bt_controller and self.bt_controller.state != BtControllerState.DISCONNECTED: - self.bt_controller.bt_disconnect() - self.camera_thread.wait() - if self.second_screen_window: - self.second_screen_window.close() - - super().closeEvent(event) - -if __name__ == "__main__": - win: RTICaptureMainWindow - - logging.basicConfig( - format='%(levelname)s: %(name)s: %(message)s', level=logging.INFO) - - app = QApplication(sys.argv) - app.setOrganizationName("CCeH") - app.setOrganizationDomain("cceh.uni-koeln.de") - app.setApplicationName("Byzanz RTI") - - settings = QSettings() - if "workingDirectory" not in settings.allKeys(): - settings.setValue("workingDirectory", - QStandardPaths.writableLocation(QStandardPaths.StandardLocation.PicturesLocation)) - - if "maxPixmapCache" not in settings.allKeys(): - settings.setValue("maxPixmapCache", 1024) - - if "maxBurstNumber" not in settings.allKeys(): - settings.setValue("maxBurstNumber", 60) - - if "enableBluetooth" not in settings.allKeys(): - settings.setValue("enableBluetooth", True) - - if "enableSecondScreenMirror" not in settings.allKeys(): - settings.setValue("enableSecondScreenMirror", True) - - QPixmapCache.setCacheLimit(int(settings.value("maxPixmapCache")) * 1024) - - loop: qasync.QEventLoop = qasync.QEventLoop(app) - asyncio.set_event_loop(loop) - - app_close_event = asyncio.Event() - app.aboutToQuit.connect(app_close_event.set) - - orig_exceptionhook = sys.__excepthook__ - - win = RTICaptureMainWindow() - win.show() - - def excepthook(exc_type, exc_value, exc_traceback): - logging.exception(msg="Exception", exc_info=(exc_type, exc_value, exc_traceback)) - if win.bt_controller and win.bt_controller.state != BtControllerState.DISCONNECTED: - win.bt_controller.bt_disconnect() - - loop.call_soon(lambda _loop: _loop.stop(), loop) - - - # sys.excepthook = excepthook - # threading.excepthook = excepthook - - - with loop: - if BT_AVAILABLE and QSettings().value("enableBluetooth", type=bool): - loop.create_task(win.init_bluetooth()) - else: - logging.info("Bluetooth not available. Is bleak installed?") - - loop.run_until_complete(app_close_event.wait()) - - # asyncio.get_running_loop().run_forever() - - +import asyncio.exceptions +import json +import logging +import os +import sys +import threading +from enum import Enum +from pathlib import Path + +import gphoto2 as gp +import qasync +from qasync import QEventLoop, QThreadExecutor +from PIL.ImageQt import ImageQt +from PyQt6.QtCore import QThread, QSettings, QStandardPaths, pyqtSignal, Qt +from PyQt6.QtGui import QPixmap, QAction, QPixmapCache, QIcon, QColor, QCloseEvent, QBrush, QPainter, QCursor +from PyQt6.QtWidgets import ( + QApplication, QMainWindow, QPushButton, QWidget, QFrame, QLineEdit, + QComboBox, QLabel, QToolBox, QProgressBar, QMenu, QAbstractButton, QInputDialog, QMessageBox, QStyle, QDialog, + QLCDNumber, QGraphicsView, QSizePolicy, QVBoxLayout +) +from PyQt6.uic import loadUi +from send2trash import send2trash + +from helpers import get_ui_path + +try: + from bt_controller_controller import BtControllerController, BtControllerCommand, BtControllerRequest, BtControllerState + BT_AVAILABLE = True +except: + BT_AVAILABLE = False + +from camera_worker import CameraWorker, CaptureImagesRequest, CameraStates, PropertyChangeEvent, ConfigRequest +from open_session_dialog import OpenSessionDialog +from photo_browser import PhotoBrowser +from settings_dialog import SettingsDialog +from spinner import Spinner +from camera_config_dialog import CameraConfigDialog + + +class Session: + def __init__(self, name, working_dir): + self.images_dir_loaded = False + self.preview_dir_loaded = False + + self.name = name + self.session_dir = os.path.join(working_dir, self.name) + self.preview_dir = os.path.join(self.session_dir, "test") + self.images_dir = os.path.join(self.session_dir, "images") + self.preview_count = 0 + + +# Corresponds to itemIndex of the captureView QToolBox +class CaptureMode(Enum): + Preview = 0 + RTI = 1 + + +class RTICaptureMainWindow(QMainWindow): + find_camera = pyqtSignal() + + def __init__(self, parent=None): + super().__init__(parent) + + self.logger = logging.getLogger(self.__class__.__name__) + + self.camera_worker = CameraWorker() + self.__session: Session = None + self.camera_state: CameraStates.StateType = None + self.cam_config_dialog: CameraConfigDialog = None + + # Set up UI and find controls + loadUi(get_ui_path('ui/main_window.ui'), self) + self.disconnect_camera_button: QPushButton = self.findChild(QPushButton, "disconnectCameraButton") + self.connect_camera_button: QPushButton = self.findChild(QPushButton, "connectCameraButton") + self.camera_busy_spinner: Spinner = self.findChild(QWidget, "cameraBusySpinner") + self.camera_state_label: QLabel = self.findChild(QLabel, "cameraStateLabel") + self.camera_state_icon: QLabel = self.findChild(QLabel, "cameraStateIcon") + + self.bluetooth_frame: QFrame = self.findChild(QFrame, "bluetoothFrame") + self.bluetooth_state_icon: QLabel = self.findChild(QLabel, "bluetoothStateLabel") + self.bluetooth_connecting_spinner: Spinner = self.findChild(QWidget, "bluetoothConnectingSpinner") + + self.session_controls: QWidget = self.findChild(QWidget, "sessionControls") + self.session_name_edit: QLineEdit = self.findChild(QLineEdit, "sessionNameEdit") + self.start_session_button: QPushButton = self.findChild(QPushButton, "startSessionButton") + self.close_session_button: QPushButton = self.findChild(QPushButton, "closeSessionButton") + self.session_loading_spinner: Spinner = self.findChild(QWidget, "sessionLoadingSpinner") + self.session_menu_button: QAbstractButton = self.findChild(QWidget, "sessionMenuButton") + + self.live_view_controls: QWidget = self.findChild(QWidget, "liveViewControls") + self.toggle_live_view_button: QPushButton = self.findChild(QPushButton, "toggleLiveViewButton") + self.autofocus_button: QPushButton = self.findChild(QPushButton, "autofocusButton") + self.light_lcd_number: QLCDNumber = self.findChild(QLCDNumber, "lightLCDNumber") + self.light_lcd_frame: QFrame = self.findChild(QFrame, "lightLCDFrame") + self.live_view_error_label: QLabel = self.findChild(QLabel, "liveviewErrorLabel") + + self.preview_led_select: QComboBox = self.findChild(QComboBox, "previewLedSelect") + self.preview_led_frame: QFrame = self.findChild(QFrame, "previewLedFrame") + + self.capture_view: QToolBox = self.findChild(QToolBox, "captureView") + self.rtiPage: QWidget = self.findChild(QWidget, "rtiPage") + self.previewPage: QWidget = self.findChild(QWidget, "previewPage") + self.previewImageBrowser: PhotoBrowser = self.findChild(QWidget, "previewImageBrowser") + self.rtiImageBrowser: PhotoBrowser = self.findChild(QWidget, "rtiImageBrowser") + self.capture_button: QPushButton = self.findChild(QPushButton, "captureButton") + self.cancel_capture_button: QPushButton = self.findChild(QPushButton, "cancelCaptureButton") + + self.rti_progress_view: QWidget = self.findChild(QWidget, "rtiProgressView") + self.capture_progress_bar: QProgressBar = self.findChild(QProgressBar, "captureProgressBar") + self.capture_status_label: QLabel = self.findChild(QLabel, "captureStatusLabel") + + self.camera_controls: QFrame = self.findChild(QFrame, "cameraControls") + self.camera_config_controls: QWidget = self.findChild(QWidget, "cameraConfigControls") + self.f_number_select: QComboBox = self.findChild(QComboBox, "fNumberSelect") + self.shutter_speed_select: QComboBox = self.findChild(QComboBox, "shutterSpeedSelect") + self.crop_select: QComboBox = self.findChild(QComboBox, "cropSelect") + self.iso_select: QComboBox = self.findChild(QComboBox, "isoSelect") + + self.settings_button: QPushButton = self.findChild(QPushButton, "settingsButton") + + self.session_menu = QMenu(self) + self.open_session_action = QAction('Vorherige Sitzung öffnen...', self) + self.open_session_action.triggered.connect(self.open_existing_session_directory) + self.open_session_action.setIcon(QIcon(get_ui_path("ui/open.svg"))) + self.rename_session_action = QAction('Sitzung umbenennen...', self) + self.rename_session_action.triggered.connect(self.rename_current_session) + self.rename_session_action.setIcon(QIcon(get_ui_path("ui/rename.svg"))) + self.session_menu.addActions([self.open_session_action, self.rename_session_action]) + + self.settings_menu = QMenu(self) + self.open_program_settings_action = QAction('Allgemeine Einstellungen') + self.open_program_settings_action.triggered.connect(self.open_settings) + self.open_program_settings_action.setIcon(QIcon(get_ui_path("ui/general_settings.svg"))) + self.open_advanced_cam_config_action = QAction('Erweiterte Kamerakonfiguration') + self.open_advanced_cam_config_action.triggered.connect(self.open_advanced_capture_settings) + self.open_advanced_cam_config_action.setIcon(QIcon(get_ui_path("ui/cam_settings.svg"))) + self.settings_menu.addActions([self.open_program_settings_action, self.open_advanced_cam_config_action]) + + self.mirror_graphics_view: QGraphicsView | None = None + self.second_screen_window: QDialog | None = None + + self.session_name_edit.textChanged.connect( + lambda text: self.start_session_button.setEnabled( + True if len(text) > 0 else False + )) + + self.cancel_capture_button.setVisible(False) + + for i in range(60): + self.preview_led_select.addItem(str(i + 1), i) + + self.set_camera_connection_busy(True) + self.capture_mode = CaptureMode.Preview + self.set_session(None) + + self.camera_thread = QThread() + self.camera_worker.moveToThread(self.camera_thread) + self.camera_worker.state_changed.connect(self.set_camera_state) + self.camera_worker.events.config_updated.connect(self.on_config_update) + self.camera_worker.property_changed.connect(self.on_property_change) + self.camera_worker.preview_image.connect(lambda image: self.previewImageBrowser.show_preview(ImageQt(image.image))) + self.camera_worker.initialized.connect(lambda: self.camera_worker.commands.find_camera.emit()) + self.camera_thread.started.connect(self.camera_worker.initialize) + self.camera_thread.start() + + self.bt_controller: BtControllerController | None = None + + self.update_ui_bluetooth() + + self.init_mirror_view() + QApplication.instance().screenAdded.connect(self.reset_mirror_view) + QApplication.instance().screenRemoved.connect(self.reset_mirror_view) + QApplication.instance().primaryScreenChanged.connect(self.reset_mirror_view) + + + def init_mirror_view(self): + screens = QApplication.screens() + mirror_view_enabled = QSettings().value("enableSecondScreenMirror", type=bool) + if mirror_view_enabled and len(screens) > 1: + second_screen = screens[1] + self.second_screen_window = QDialog() + self.second_screen_window.setWindowFlags(Qt.WindowType.WindowStaysOnTopHint | Qt.WindowType.FramelessWindowHint) + self.second_screen_window.setWindowTitle("Secondary View") + self.second_screen_window.setGeometry(second_screen.availableGeometry()) + self.second_screen_window.showFullScreen() + + self.mirror_graphics_view = QGraphicsView(self.second_screen_window) + self.mirror_graphics_view.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + self.mirror_graphics_view.setBackgroundBrush(QBrush(QColor(30, 30, 30))) + self.mirror_graphics_view.setRenderHint(QPainter.RenderHint.Antialiasing) + layout = QVBoxLayout(self.second_screen_window) + layout.setContentsMargins(0, 0, 0, 0) + layout.addWidget(self.mirror_graphics_view) + self.second_screen_window.setLayout(layout) + + self.update_mirror_view() + + def disable_mirror_view(self): + if self.second_screen_window: + self.second_screen_window.close() + self.second_screen_window = None + self.mirror_graphics_view = None + + def reset_mirror_view(self): + self.disable_mirror_view() + self.init_mirror_view() + + async def init_bluetooth(self): + if not self.bt_controller: + self.bt_controller = BtControllerController() + self.bt_controller.state_changed.connect(self.update_ui_bluetooth) + await self.bt_controller.connect() + + @property + def capture_mode(self) -> CaptureMode: + return CaptureMode(self.capture_view.currentIndex()) + + @capture_mode.setter + def capture_mode(self, mode: CaptureMode): + self.capture_view.setCurrentIndex(mode.value) + self.update_ui() + + def get_camera_state(self): + return self.camera_state + + def set_camera_state(self, state: CameraStates.StateType): + self.logger.debug("Handle camera state:" + state.__class__.__name__) + self.camera_state = state + self.update_ui() + + match state: + case CameraStates.Waiting(): + pass + + case CameraStates.Found(): + self.camera_worker.commands.connect_camera.emit() + + case CameraStates.Disconnected(): + if state.auto_reconnect: + self.camera_worker.commands.find_camera.emit() + + case CameraStates.Connecting(): + pass + + case CameraStates.Disconnecting(): + if self.cam_config_dialog: + self.cam_config_dialog.reject() + + case CameraStates.ConnectionError(): + self.logger.error(state.error) + + case CameraStates.Ready(): + pass + + case CameraStates.LiveViewStarted(): + if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED: + request = BtControllerRequest(BtControllerCommand.PILOT_LIGHT_ON) + request.signals.success.connect(lambda: print("BT Success!")) + request.signals.error.connect(lambda e: logging.exception(e)) + self.bt_controller.send_command(request) + + case CameraStates.LiveViewStopped(): + if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED: + request = BtControllerRequest(BtControllerCommand.LED_OFF) + request.signals.success.connect(lambda: print("BT Success!")) + request.signals.error.connect(lambda e: logging.exception(e)) + self.bt_controller.send_command(request) + + case CameraStates.Disconnecting(): + pass + + case CameraStates.CaptureInProgress(): + pass + + case CameraStates.CaptureFinished(): + if self.capture_mode == CaptureMode.Preview: + self.session.preview_count += 1 + else: + self.write_lp() + self.dump_camera_config() + + case CameraStates.CaptureCanceled(): + pass + + def update_ui(self): + # variables on which the UI state depends + camera_state = self.camera_state + + has_session = self.session is not None + session_loaded = has_session \ + and self.session.preview_dir_loaded \ + and self.session.images_dir_loaded + capture_mode = self.capture_mode + + # configure UI according to the capture mode + for item_index in range(self.capture_view.count()): + if item_index == self.capture_mode.value: + self.capture_view.setItemIcon(item_index, QIcon(get_ui_path("ui/chevron_down.svg"))) + else: + self.capture_view.setItemIcon(item_index, QIcon(get_ui_path("ui/chevron_right.svg"))) + + + + + # configure UI according to the state of the current session + self.session_name_edit.setEnabled(not has_session) + self.start_session_button.setVisible(not has_session) + self.open_session_action.setEnabled(not has_session) + self.rename_session_action.setEnabled(session_loaded) + + self.close_session_button.setVisible(has_session) + self.close_session_button.setText("Sitzung beenden" if session_loaded else "Laden abbrechen...") + + self.session_loading_spinner.isAnimated = has_session and not session_loaded + self.capture_view.setEnabled(has_session) + + self.capture_progress_bar.setMaximum(60) + self.capture_progress_bar.setValue(self.rtiImageBrowser.num_files() if session_loaded else 0) + + if has_session: + self.session_name_edit.setText(self.session.name) + + # configure UI according to the camera state + match camera_state: + case CameraStates.Waiting(): + self.camera_state_label.setText("Suche Kamera...") + self.camera_state_icon.setPixmap(QPixmap(get_ui_path("ui/camera_waiting.png"))) + self.open_advanced_cam_config_action.setEnabled(False) + + self.connect_camera_button.setEnabled(False) + self.disconnect_camera_button.setVisible(False) + self.camera_busy_spinner.isAnimated = True + self.capture_status_label.setText(None) + + self.live_view_controls.setEnabled(False) + self.light_lcd_frame.setEnabled(False) + self.light_lcd_number.display(None) + self.live_view_error_label.setText(None) + + self.camera_controls.setEnabled(False) + self.camera_config_controls.setEnabled(False) + self.capture_button.setText("Nicht verbunden") + self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) + self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) + + case CameraStates.Found(): + pass + + case CameraStates.Disconnected(): + self.camera_state_label.setText("Kamera getrennt
%s" % camera_state.camera_name) + self.camera_state_icon.setPixmap(QPixmap(get_ui_path("ui/camera_not_ok.png"))) + + self.connect_camera_button.setEnabled(True) + self.connect_camera_button.setVisible(True) + self.disconnect_camera_button.setVisible(False) + self.camera_busy_spinner.isAnimated = False + + self.toggle_live_view_button.setChecked(False) + self.autofocus_button.setEnabled(False) + + + self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) + self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) + + case CameraStates.Connecting(): + self.camera_state_label.setText("Verbinde...
%s" % camera_state.camera_name) + self.connect_camera_button.setEnabled(False) + self.camera_busy_spinner.isAnimated = True + + case CameraStates.ConnectionError(): + pass + + case CameraStates.Ready(): + self.camera_state_label.setText("Kamera verbunden
%s" % camera_state.camera_name) + self.camera_state_icon.setPixmap(QPixmap(get_ui_path("ui/camera_ok.png"))) + + self.open_advanced_cam_config_action.setEnabled(True) + + self.disconnect_camera_button.setEnabled(True) + self.disconnect_camera_button.setVisible(True) + self.connect_camera_button.setVisible(False) + self.camera_busy_spinner.isAnimated = False + + self.live_view_controls.setEnabled(True) + self.toggle_live_view_button.setChecked(False) + self.autofocus_button.setEnabled(False) + + self.camera_controls.setEnabled(True if session_loaded else False) + self.camera_config_controls.setEnabled(True) + if self.capture_mode == CaptureMode.Preview: + self.capture_button.setText("Vorschaubild aufnehmen") + else: + self.capture_button.setText("RTI-Aufnahme starten") + self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) + self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) + + case CameraStates.Disconnecting(): + self.camera_state_label.setText("Trenne Kamera...") + self.disconnect_camera_button.setEnabled(False) + self.disconnect_camera_button.setVisible(True) + self.open_advanced_cam_config_action.setEnabled(False) + + self.live_view_controls.setEnabled(False) + + self.camera_controls.setEnabled(False) + self.camera_config_controls.setEnabled(False) + self.capture_button.setText("Nicht verbunden") + + case CameraStates.LiveViewStarted(): + self.camera_config_controls.setEnabled(False) + self.autofocus_button.setEnabled(True) + self.light_lcd_frame.setEnabled(True) + self.update_lightmeter(camera_state.current_lightmeter_value) + + case CameraStates.LiveViewActive(): + pass + + case CameraStates.FocusStarted(): + self.autofocus_button.setEnabled(False) + + case CameraStates.FocusFinished(): + self.autofocus_button.setEnabled(True) + if not camera_state.success: + self.live_view_error_label.setText("Konnte nicht fokussieren. Zu dunkel?") + else: + self.live_view_error_label.setText(None) + + case CameraStates.LiveViewStopped(): + self.previewImageBrowser.show_preview(None) + self.light_lcd_number.display(None) + self.light_lcd_frame.setEnabled(False) + self.live_view_error_label.setText(None) + + + case CameraStates.CaptureInProgress(): + # if prev + # disable combo boxes + self.session_controls.setEnabled(False) + self.disconnect_camera_button.setEnabled(False) + + self.live_view_controls.setEnabled(False) + self.toggle_live_view_button.setChecked(False) + + + self.capture_button.setVisible(False) + self.cancel_capture_button.setVisible(True) + self.cancel_capture_button.setEnabled(True) + self.capture_status_label.setStyleSheet(None) + self.capture_status_label.setText(None) + + self.camera_config_controls.setEnabled(False) + + if self.capture_mode == CaptureMode.Preview: + self.capture_view.setItemEnabled(CaptureMode.RTI.value, False) + else: + self.capture_view.setItemEnabled(CaptureMode.Preview.value, False) + + self.capture_progress_bar.setMaximum(camera_state.capture_request.num_images) + self.capture_progress_bar.setValue(camera_state.num_captured) + + case CameraStates.CaptureCancelling(): + self.cancel_capture_button.setEnabled(False) + + case CameraStates.CaptureCanceled(): + self.capture_status_label.setText("Aufnahme abgebrochen!") + self.capture_status_label.setStyleSheet("color: red;") + + self.session_controls.setEnabled(True) + self.cancel_capture_button.setVisible(False) + self.capture_button.setVisible(True) + self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) + self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) + + case CameraStates.CaptureError(): + self.capture_status_label.setText("Fehler: %s" % str(camera_state.error)) + self.capture_status_label.setStyleSheet("color: red;") + + self.session_controls.setEnabled(True) + self.cancel_capture_button.setVisible(False) + self.capture_button.setVisible(True) + self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) + self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) + + case CameraStates.CaptureFinished(): + self.capture_status_label.setText("Fertig in %ss!" % str(camera_state.elapsed_time / 1000)) + self.capture_progress_bar.setValue(camera_state.num_captured) + self.session_controls.setEnabled(True) + self.cancel_capture_button.setVisible(False) + self.capture_button.setVisible(True) + self.capture_view.setItemEnabled(CaptureMode.Preview.value, True) + self.capture_view.setItemEnabled(CaptureMode.RTI.value, True) + + def update_ui_bluetooth(self): + if self.bt_controller is not None: + self.bluetooth_frame.setVisible(True) + self.preview_led_frame.setVisible(True) + + match self.bt_controller.state: + case BtControllerState.DISCONNECTED: + self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_disconnected.svg"))) + self.preview_led_select.setEnabled(False) + self.bluetooth_connecting_spinner.isAnimated = False + self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller getrennt") + case BtControllerState.CONNECTING: + self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_connecting.svg"))) + self.bluetooth_connecting_spinner.isAnimated = True + self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird aufgebaut...") + case BtControllerState.CONNECTED: + self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_connected.svg"))) + self.preview_led_select.setEnabled(True) + self.bluetooth_connecting_spinner.isAnimated = False + self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller aktiv") + case BtControllerState.DISCONNECTING: + self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_connecting.svg"))) + self.bluetooth_connecting_spinner.isAnimated = True + self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird getrennt...") + + else: + self.bluetooth_frame.setVisible(False) + self.preview_led_frame.setVisible(False) + + @property + def session(self) -> Session: + return self.__session + + def set_session(self, _session): + self.__session = _session + self.update_ui() + + if _session is None: + self.session_name_edit.clear() + self.session_name_edit.setFocus() + return + + os.makedirs(_session.session_dir, exist_ok=True) + os.makedirs(_session.preview_dir, exist_ok=True) + os.makedirs(_session.images_dir, exist_ok=True) + + # both browser will emit the directory_loaded signal connected to the + # session_directory_loaded slot below (in Qt Designer/Creator, main_window.ui) + self.previewImageBrowser.open_directory(self.session.preview_dir) + self.rtiImageBrowser.open_directory(self.session.images_dir) + + def on_capture_mode_changed(self): + self.update_mirror_view() + if self.capture_mode == CaptureMode.RTI and isinstance(self.camera_state, CameraStates.LiveViewActive): + self.camera_worker.commands.live_view.emit(False) + self.update_ui() + + def update_mirror_view(self): + if self.capture_mode == CaptureMode.Preview: + self.previewImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view) + else: + self.rtiImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view) + + def open_settings(self): + q_settings = QSettings() + dialog = SettingsDialog(q_settings, self) + dialog.setModal(True) + if dialog.exec(): + for name, value in dialog.settings.items(): + q_settings.setValue(name, value) + if name == "maxPixmapCache": + QPixmapCache.setCacheLimit(value * 1024) + elif name == "enableBluetooth": + event_loop = asyncio.get_running_loop() + if value is True: + event_loop.create_task(self.init_bluetooth()) + elif self.bt_controller: + self.bt_controller.bt_disconnect() + self.update_ui_bluetooth() + elif name == "enableSecondScreenMirror": + self.reset_mirror_view() + + def open_advanced_capture_settings(self): + def open_dialog(cfg: gp.CameraWidget): + self.cam_config_dialog = CameraConfigDialog(cfg, self) + if self.cam_config_dialog.exec(): + self.camera_worker.commands.set_config.emit(cfg) + print(cfg.__dict__) + self.cam_config_dialog = None + + req = ConfigRequest() + req.signal.got_config.connect(open_dialog) + self.camera_worker.commands.get_config.emit(req) + + def set_camera_connection_busy(self, busy: bool = True): + self.connect_camera_button.setEnabled(not busy) + self.disconnect_camera_button.setEnabled(not busy) + self.camera_busy_spinner.isAnimated = busy + + def connect_camera(self): + self.camera_worker.commands.connect_camera.emit() + + def disconnect_camera(self): + self.camera_worker.commands.disconnect_camera.emit() + + def create_session(self): + name = self.session_name_edit.text() + + print("Create" + name) + session = Session(name, QSettings().value("workingDirectory")) + if Path(session.session_dir).exists(): + result = QMessageBox.warning(self, "Fehler", + "Sitzung %s existiert bereits. Soll sie erneut geöffnet werden?" % name, + QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No) + if result == QMessageBox.StandardButton.No: + return + + self.set_session(session) + + def session_directory_loaded(self, path): + if not self.session: + return + + if os.path.normpath(path) == os.path.normpath(self.session.preview_dir): + self.session.preview_dir_loaded = True + self.session.preview_count = self.previewImageBrowser.last_index() + + elif os.path.normpath(path) == os.path.normpath(self.session.images_dir): + self.session.images_dir_loaded = True + + self.update_ui() + + def close_session(self): + self.previewImageBrowser.close_directory() + self.rtiImageBrowser.close_directory() + self.set_session(None) + + def show_session_menu(self): + self.session_menu.exec(self.session_menu_button.mapToGlobal(self.session_menu_button.rect().bottomLeft())) + + def show_settings_menu(self): + self.settings_menu.exec(self.settings_button.mapToGlobal(self.session_menu_button.rect().bottomLeft())) + + def open_existing_session_directory(self): + working_dir = QSettings().value("workingDirectory") + dialog = OpenSessionDialog(working_dir, self) + path = dialog.get_session_path() + if path: + session_name = Path(path).name + self.set_session(Session(session_name, working_dir)) + + def rename_current_session(self): + new_name, ok = QInputDialog.getText(self, "Aktuelle Sitzung umbenennen", "Neuer Name", text=self.session.name) + if ok: + session_dir = self.session.session_dir + session_dir_parent = Path(session_dir).parent + new_session_dir = os.path.join(session_dir_parent, os.path.join(session_dir_parent, new_name)) + + if Path(new_session_dir).exists(): + QMessageBox.critical(self, "Fehler", "Sitzung %s existiert bereits." % new_name) + return + + image_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)] + preview_files = [os.path.join(self.session.preview_dir, f) for f in os.listdir(self.session.preview_dir)] + + for file in image_files + preview_files: + path = Path(file) + parent_path = path.parent + file_name = path.name + basename, ext = os.path.splitext(file_name) + if basename.startswith(self.session.name): + new_filename = basename.replace(self.session.name, new_name, 1) + ext + new_path = os.path.join(parent_path, new_filename) + os.rename(path, new_path) + + os.rename(session_dir, new_session_dir) + self.close_session() + self.set_session(Session(new_name, session_dir_parent)) + + def write_lp(self): + file_names = [os.path.basename(file_path) for file_path in self.rtiImageBrowser.files()] + num_files = len(file_names) + if num_files != 60: + logging.warning("Wrong number of files, not writing LP file.") + return + + lp_template_path = "cceh-dome-template.lp" + lp_output_path = os.path.join(self.session.images_dir, self.session.name + ".lp") + with open(lp_template_path, 'r') as lp_template_file, open(lp_output_path, 'w') as lp_output_file: + logging.info("Writing LP file: " + lp_output_path) + lp_output_file.write(str(num_files) + "\n") + for i, input_line in enumerate(lp_template_file): + output_line = file_names[i] + " " + input_line + lp_output_file.write(output_line) + + def dump_camera_config(self): + output_path = os.path.join(self.session.images_dir, "camera_config.json") + self.logger.info(f"Writing camera configuration dump: {output_path}") + + if not self.session: + return + def on_got_config(cfg: gp.CameraWidget): + cfg_dict = {} + + def traverse_widget(widget, widget_dict): + widget_type = widget.get_type() + + if widget_type == gp.GP_WIDGET_SECTION or widget_type == gp.GP_WIDGET_WINDOW: + # If the widget is a section, traverse its children + child_count = widget.count_children() + for i in range(child_count): + child = widget.get_child(i) + child_dict = {} + traverse_widget(child, child_dict) + widget_dict[child.get_name()] = child_dict + else: + try: + widget_dict['value'] = widget.get_value() + except gp.GPhoto2Error as err: + if err.code == -2: + self.logger.warning(f"Could not get config value for {cfg.get_label()} ({cfg.get_name()}).") + + widget_dict['label'] = widget.get_label() + + return widget_dict + + traverse_widget(cfg, cfg_dict) + + try: + with open(output_path, "w") as output_file: + json.dump(cfg_dict, output_file, indent=4) + except Exception as e: + self.logger.error(f"Could not write camera config dump to {output_path}:") + self.logger.exception(e) + + + req = ConfigRequest() + req.signal.got_config.connect(on_got_config) + self.camera_worker.commands.get_config.emit(req) + + + def config_hookup_select(self, config: gp.CameraWidget, config_name, combo_box: QComboBox, value_map: dict = None): + try: + combo_box.currentIndexChanged.disconnect() + except: + pass + cfg = config.get_child_by_name(config_name) + combo_box.clear() + for idx, choice in enumerate(cfg.get_choices()): + choice_label = value_map[choice] if value_map and choice in value_map else choice + combo_box.addItem(choice_label, choice) + if choice == cfg.get_value(): + combo_box.setCurrentIndex(idx) + + combo_box.currentIndexChanged.connect(lambda: self.camera_worker.commands.set_single_config.emit( + config_name, combo_box.currentData() + )) + + def on_config_update(self, config: gp.CameraWidget): + self.config_hookup_select(config, "iso", self.iso_select) + self.config_hookup_select(config, "f-number", self.f_number_select) + self.config_hookup_select(config, "shutterspeed2", self.shutter_speed_select) + self.config_hookup_select(config, "d030", self.crop_select, { + "0": "Voll", + "1": "Klein", + "2": "Mittel", + "3": "Mittel 2" + }) + + def on_property_change(self, event: PropertyChangeEvent): + match event.property_name: + case "lightmeter": + if isinstance(self.camera_state, CameraStates.LiveViewActive): + self.update_lightmeter(event.value) + + def update_lightmeter(self, value): + if isinstance(value, float): + self.light_lcd_number.display(int(value)) + else: + self.light_lcd_number.display(None) + + def enable_live_view(self, enable: bool): + self.camera_worker.commands.live_view.emit(enable) + + def trigger_autofocus(self): + self.camera_worker.commands.trigger_autofocus.emit() + + def capture_image(self): + capture_req: CaptureImagesRequest + + # Capture Previews + if self.capture_mode == CaptureMode.Preview: + filename_template = self.session.name.replace(" ", "_") + "_test_" + str( + self.session.preview_count + 1) + "${extension}" + file_path_template = os.path.join(self.session.preview_dir, filename_template) + capture_req = CaptureImagesRequest(file_path_template, num_images=1, image_quality="JPEG Fine") + + # Capture RTI Series + else: + if self.rtiImageBrowser.num_files() > 0: + message_box = QMessageBox(QMessageBox.Icon.Warning, "RTI-Serie aufnehmen", + "Vorhandene Aufnahmen werden gelöscht.") + message_box.addButton( + QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogCancelButton), "Abbrechen"), + QMessageBox.ButtonRole.NoRole) + message_box.addButton( + QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogOkButton), "Fortfahren"), + QMessageBox.ButtonRole.YesRole) + if not message_box.exec(): + return + + existing_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)] + send2trash(existing_files) + + filename_template = self.session.name.replace(" ", "_") + "_${num}${extension}" + file_path_template = os.path.join(self.session.images_dir, filename_template) + capture_req = CaptureImagesRequest(file_path_template, num_images=60, expect_files=2, + max_burst=int(QSettings().value("maxBurstNumber")), skip=0, image_quality="NEF+Fine") + self.capture_progress_bar.setMaximum(119) + self.capture_progress_bar.setValue(0) + + def on_file_received(path: str): + print("Rec: " + path) + capture_req.signal.file_received.connect(on_file_received) + + def start_capture(show_button_message: bool): + if self.capture_mode == CaptureMode.RTI and show_button_message: + press_buttons_dialog = QDialog() + loadUi(get_ui_path("ui/press-buttons-dialog.ui"), press_buttons_dialog) + if not press_buttons_dialog.exec(): + return + + self.camera_worker.commands.capture_images.emit(capture_req) + + if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED: + initial_led = 0 if self.capture_mode == CaptureMode.RTI else self.preview_led_select.currentData() + request = BtControllerRequest(BtControllerCommand.SET_LED, initial_led) + request.signals.success.connect(lambda: start_capture(False)) + request.signals.error.connect(lambda: start_capture(True)) + self.bt_controller.send_command(request) + else: + start_capture(True) + + + def on_capture_cancelled(self): + logging.info("Capture cancelled") + + def cancel_capture(self): + self.cancel_capture_button.setEnabled(False) + self.camera_worker.commands.cancel.emit() + + def closeEvent(self, event: QCloseEvent): + QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor) + self.camera_thread.requestInterruption() + self.camera_thread.exit() + if self.bt_controller and self.bt_controller.state != BtControllerState.DISCONNECTED: + self.bt_controller.bt_disconnect() + self.camera_thread.wait() + if self.second_screen_window: + self.second_screen_window.close() + + super().closeEvent(event) + +if __name__ == "__main__": + win: RTICaptureMainWindow + + logging.basicConfig( + format='%(levelname)s: %(name)s: %(message)s', level=logging.INFO) + + app = QApplication(sys.argv) + app.setOrganizationName("CCeH") + app.setOrganizationDomain("cceh.uni-koeln.de") + app.setApplicationName("Byzanz RTI") + + settings = QSettings() + if "workingDirectory" not in settings.allKeys(): + settings.setValue("workingDirectory", + QStandardPaths.writableLocation(QStandardPaths.StandardLocation.PicturesLocation)) + + if "maxPixmapCache" not in settings.allKeys(): + settings.setValue("maxPixmapCache", 1024) + + if "maxBurstNumber" not in settings.allKeys(): + settings.setValue("maxBurstNumber", 60) + + if "enableBluetooth" not in settings.allKeys(): + settings.setValue("enableBluetooth", True) + + if "enableSecondScreenMirror" not in settings.allKeys(): + settings.setValue("enableSecondScreenMirror", True) + + QPixmapCache.setCacheLimit(int(settings.value("maxPixmapCache")) * 1024) + + loop: qasync.QEventLoop = qasync.QEventLoop(app) + asyncio.set_event_loop(loop) + + app_close_event = asyncio.Event() + app.aboutToQuit.connect(app_close_event.set) + + orig_exceptionhook = sys.__excepthook__ + + win = RTICaptureMainWindow() + win.show() + + def excepthook(exc_type, exc_value, exc_traceback): + logging.exception(msg="Exception", exc_info=(exc_type, exc_value, exc_traceback)) + if win.bt_controller and win.bt_controller.state != BtControllerState.DISCONNECTED: + win.bt_controller.bt_disconnect() + + loop.call_soon(lambda _loop: _loop.stop(), loop) + + + # sys.excepthook = excepthook + # threading.excepthook = excepthook + + + with loop: + if BT_AVAILABLE and QSettings().value("enableBluetooth", type=bool): + loop.create_task(win.init_bluetooth()) + else: + logging.info("Bluetooth not available. Is bleak installed?") + + loop.run_until_complete(app_close_event.wait()) + + # asyncio.get_running_loop().run_forever() + + diff --git a/photo_browser.py b/photo_browser.py index 0b2de97..88ff448 100644 --- a/photo_browser.py +++ b/photo_browser.py @@ -1,240 +1,242 @@ -import mimetypes -import os -import re -from os import listdir -from pathlib import Path -from typing import Callable, Optional - -from PyQt6.QtCore import QFileSystemWatcher, Qt, QThreadPool, pyqtSignal, QMutex, \ - QMutexLocker -from PyQt6.QtGui import QPixmap, QResizeEvent, QPixmapCache, QImage -from PyQt6.QtWidgets import QWidget, QListWidget, QListWidgetItem, QGraphicsView -from PyQt6.uic import loadUi - -from load_image_worker import LoadImageWorker, LoadImageWorkerResult -from photo_viewer import PhotoViewer -from spinner import Spinner - - -def get_file_index(file_path) -> Optional[int]: - basename = os.path.splitext(file_path)[0] - numbers_in_basename = re.findall(r'\d+', basename) - return int(numbers_in_basename[-1]) if numbers_in_basename else None - - -class ImageFileListItem(QListWidgetItem): - def __init__(self, path: str, thumbnail: QPixmap): - super().__init__() - self.path: str = path - self.file_name = Path(path).name - self.index = get_file_index(self.file_name) - self.thumbnail: QPixmap = thumbnail - - def __lt__(self, other): - return self.index < other.index - - def data(self, role: Qt.ItemDataRole): - if role == Qt.ItemDataRole.DecorationRole: - return self.thumbnail - - return super().data(role) - -class PhotoBrowser(QWidget): - directory_loaded = pyqtSignal(str) - - def __init__(self, parent=None): - super().__init__(parent) - loadUi('ui/photo_browser.ui', self) - - self.__fileSystemWatcher = QFileSystemWatcher() - self.__threadpool = QThreadPool() - self.__threadpool.setMaxThreadCount(4) - self.__num_images_to_load = 0 - - self.__currentPath: str = None - self.__currentFileSet: set[str] = set() - - self.photo_viewer: PhotoViewer = self.findChild(QWidget, "photoViewer") - self.image_file_list: QListWidget = self.findChild(QListWidget, "imageFileList") - self.viewer_container: QWidget = self.findChild(QWidget, "viewerContainer") - - self.__fileSystemWatcher.directoryChanged.connect(self.__load_directory) - - self.image_file_list.currentItemChanged.connect(self.__on_select_image_file) - - self.spinner = Spinner(self.viewer_container, Spinner.m_light_color) - self.spinner.isAnimated = False - self.__center_spinner_over_photo_viewer() - self.resize(self.size()) - - self.__mutex = QMutex() - - def get_scene(self): - return self.photo_viewer.getScene() - - def set_mirror_graphics_view(self, view: QGraphicsView): - self.photo_viewer.setMirrorView(view) - - def open_directory(self, dir_path): - if self.__currentPath: - self.close_directory() - - self.__currentPath = dir_path - self.start_watching() - self.__load_directory() - - def start_watching(self): - # print("START watching " + self.__currentPath) - self.__fileSystemWatcher.addPath(self.__currentPath) - - def close_directory(self): - self.stop_watching() - self.__currentPath = None - self.__currentFileSet.clear() - self.__threadpool.clear() - self.__threadpool.waitForDone() - self.__num_images_to_load = 0 - self.image_file_list.clear() - QPixmapCache.clear() - - def stop_watching(self): - # print("STOP watching " + self.__currentPath) - self.__fileSystemWatcher.removePath(self.__currentPath) - - def num_files(self) -> int: - return self.image_file_list.count() - - def files(self): - return [self.image_file_list.item(row).path for row in range(self.image_file_list.count())] - - def last_index(self) -> int: - image_count = self.image_file_list.count() - if image_count > 0: - return self.image_file_list.item(image_count - 1).index - - return 0 - - def resizeEvent(self, event: QResizeEvent): - self.__center_spinner_over_photo_viewer() - - def show_preview(self, image: QImage | None): - if not image: - self.photo_viewer.setPhoto(None) - self.image_file_list.setEnabled(True) - - # re-show the previously selected image - selected_image_index = self.image_file_list.currentIndex() - if selected_image_index: - item = self.image_file_list.item(selected_image_index.row()) - self.__on_select_image_file(item) - return - - self.image_file_list.setEnabled(False) - self.photo_viewer.setPhoto(QPixmap.fromImage(image)) - self.photo_viewer.fitInView() - - def __load_directory(self): - print("Load directory: " + self.__currentPath) - new_files = [f for f in listdir(self.__currentPath) - if mimetypes.guess_type(f)[0] == "image/jpeg" and get_file_index(f) is not None] - - new_fileset = set(new_files) - added_files = new_fileset - self.__currentFileSet - removed_files = self.__currentFileSet - new_fileset - - if not added_files and not removed_files: - self.directory_loaded.emit(self.__currentPath) - - if added_files: - # self.__threadpool.waitForDone() - # self.stop_watching() - for f in added_files: - self.__load_image(f, self.__add_image_item) - - for f in removed_files: - for i in range(self.image_file_list.count()): - item = self.image_file_list.item(i) - if isinstance(item, ImageFileListItem): - if item.file_name == f: - self.image_file_list.takeItem(i) - del item - - - - - self.__currentFileSet = new_fileset - - def __on_directory_loaded(self): - # self.start_watching() - self.directory_loaded.emit(self.__currentPath) - # image_count = self.image_file_list.count() - # if image_count > 0: - # self.image_file_list.setCurrentItem(self.image_file_list.item(image_count - 1)) - - # just in case there have been changes while loading the files - self.__load_directory() - - def __load_image(self, file_name: str, on_finished_callback: Callable): - self.__num_images_to_load +=1 - - worker = LoadImageWorker(os.path.join(self.__currentPath, file_name), True, 200) - worker.signals.finished.connect(lambda result: self.__on_image_loaded(result, on_finished_callback)) - - self.spinner.startAnimation() - self.__threadpool.start(worker) - - def __on_image_loaded(self, result: LoadImageWorkerResult, on_finished_callback: Callable): - QPixmapCache.insert(result.path, QPixmap.fromImage(result.image)) - on_finished_callback(result) - - self.__num_images_to_load -= 1 - if self.__num_images_to_load == 0: - self.__on_directory_loaded() - - self.spinner.stopAnimation() - - def __on_select_image_file(self, item: ImageFileListItem): - if item: - file_path = item.path - cached_image = QPixmapCache.find(file_path) - if cached_image: - print("cache hit") - self.photo_viewer.setPhoto(cached_image) - else: - print("cache miss") - self.__load_image(file_path, lambda result: self.photo_viewer.setPhoto(QPixmap.fromImage(result.image))) - else: - self.photo_viewer.setPhoto(None) - - def __add_image_item(self, image_worker_result: LoadImageWorkerResult): - list_item = ImageFileListItem(image_worker_result.path, image_worker_result.thumbnail) - - exposure_time = image_worker_result.exif["ExposureTime"].real - f_number = image_worker_result.exif["FNumber"] - list_item.setText("%s\nf/%s | %s" % (list_item.file_name, f_number, exposure_time)) - - # Only add the item to the list if a directory is still open. This function - # can be called asynchronously from a thread so the directory could have been - # closed in the meantime. - with QMutexLocker(self.__mutex): - if self.__currentPath: - self.image_file_list.addItem(list_item) - self.image_file_list.sortItems() - self.image_file_list.scrollToBottom() - - self.image_file_list.currentItemChanged.disconnect() - self.image_file_list.setCurrentItem(list_item) - self.image_file_list.currentItemChanged.connect(self.__on_select_image_file) - - image_path = image_worker_result.path - pixmap: QPixmap = QPixmapCache.find(image_path) or QPixmap.fromImage(image_worker_result.image) - self.photo_viewer.setPhoto(pixmap) - if self.image_file_list.indexFromItem(list_item).row() == 0: - self.photo_viewer.fitInView() - - - - def __center_spinner_over_photo_viewer(self): - spinner_x = (self.viewer_container.width() - 80) / 2 - spinner_y = (self.viewer_container.height() - 80) / 2 - self.spinner.setGeometry(int(spinner_x), int(spinner_y), 80, 80) +import mimetypes +import os +import re +from os import listdir +from pathlib import Path +from typing import Callable, Optional + +from PyQt6.QtCore import QFileSystemWatcher, Qt, QThreadPool, pyqtSignal, QMutex, \ + QMutexLocker +from PyQt6.QtGui import QPixmap, QResizeEvent, QPixmapCache, QImage +from PyQt6.QtWidgets import QWidget, QListWidget, QListWidgetItem, QGraphicsView +from PyQt6.uic import loadUi + +from load_image_worker import LoadImageWorker, LoadImageWorkerResult +from photo_viewer import PhotoViewer +from spinner import Spinner + +from helpers import get_ui_path + + +def get_file_index(file_path) -> Optional[int]: + basename = os.path.splitext(file_path)[0] + numbers_in_basename = re.findall(r'\d+', basename) + return int(numbers_in_basename[-1]) if numbers_in_basename else None + + +class ImageFileListItem(QListWidgetItem): + def __init__(self, path: str, thumbnail: QPixmap): + super().__init__() + self.path: str = path + self.file_name = Path(path).name + self.index = get_file_index(self.file_name) + self.thumbnail: QPixmap = thumbnail + + def __lt__(self, other): + return self.index < other.index + + def data(self, role: Qt.ItemDataRole): + if role == Qt.ItemDataRole.DecorationRole: + return self.thumbnail + + return super().data(role) + +class PhotoBrowser(QWidget): + directory_loaded = pyqtSignal(str) + + def __init__(self, parent=None): + super().__init__(parent) + loadUi(get_ui_path('ui/photo_browser.ui'), self) + + self.__fileSystemWatcher = QFileSystemWatcher() + self.__threadpool = QThreadPool() + self.__threadpool.setMaxThreadCount(4) + self.__num_images_to_load = 0 + + self.__currentPath: str = None + self.__currentFileSet: set[str] = set() + + self.photo_viewer: PhotoViewer = self.findChild(QWidget, "photoViewer") + self.image_file_list: QListWidget = self.findChild(QListWidget, "imageFileList") + self.viewer_container: QWidget = self.findChild(QWidget, "viewerContainer") + + self.__fileSystemWatcher.directoryChanged.connect(self.__load_directory) + + self.image_file_list.currentItemChanged.connect(self.__on_select_image_file) + + self.spinner = Spinner(self.viewer_container, Spinner.m_light_color) + self.spinner.isAnimated = False + self.__center_spinner_over_photo_viewer() + self.resize(self.size()) + + self.__mutex = QMutex() + + def get_scene(self): + return self.photo_viewer.getScene() + + def set_mirror_graphics_view(self, view: QGraphicsView): + self.photo_viewer.setMirrorView(view) + + def open_directory(self, dir_path): + if self.__currentPath: + self.close_directory() + + self.__currentPath = dir_path + self.start_watching() + self.__load_directory() + + def start_watching(self): + # print("START watching " + self.__currentPath) + self.__fileSystemWatcher.addPath(self.__currentPath) + + def close_directory(self): + self.stop_watching() + self.__currentPath = None + self.__currentFileSet.clear() + self.__threadpool.clear() + self.__threadpool.waitForDone() + self.__num_images_to_load = 0 + self.image_file_list.clear() + QPixmapCache.clear() + + def stop_watching(self): + # print("STOP watching " + self.__currentPath) + self.__fileSystemWatcher.removePath(self.__currentPath) + + def num_files(self) -> int: + return self.image_file_list.count() + + def files(self): + return [self.image_file_list.item(row).path for row in range(self.image_file_list.count())] + + def last_index(self) -> int: + image_count = self.image_file_list.count() + if image_count > 0: + return self.image_file_list.item(image_count - 1).index + + return 0 + + def resizeEvent(self, event: QResizeEvent): + self.__center_spinner_over_photo_viewer() + + def show_preview(self, image: QImage | None): + if not image: + self.photo_viewer.setPhoto(None) + self.image_file_list.setEnabled(True) + + # re-show the previously selected image + selected_image_index = self.image_file_list.currentIndex() + if selected_image_index: + item = self.image_file_list.item(selected_image_index.row()) + self.__on_select_image_file(item) + return + + self.image_file_list.setEnabled(False) + self.photo_viewer.setPhoto(QPixmap.fromImage(image)) + self.photo_viewer.fitInView() + + def __load_directory(self): + print("Load directory: " + self.__currentPath) + new_files = [f for f in listdir(self.__currentPath) + if mimetypes.guess_type(f)[0] == "image/jpeg" and get_file_index(f) is not None] + + new_fileset = set(new_files) + added_files = new_fileset - self.__currentFileSet + removed_files = self.__currentFileSet - new_fileset + + if not added_files and not removed_files: + self.directory_loaded.emit(self.__currentPath) + + if added_files: + # self.__threadpool.waitForDone() + # self.stop_watching() + for f in added_files: + self.__load_image(f, self.__add_image_item) + + for f in removed_files: + for i in range(self.image_file_list.count()): + item = self.image_file_list.item(i) + if isinstance(item, ImageFileListItem): + if item.file_name == f: + self.image_file_list.takeItem(i) + del item + + + + + self.__currentFileSet = new_fileset + + def __on_directory_loaded(self): + # self.start_watching() + self.directory_loaded.emit(self.__currentPath) + # image_count = self.image_file_list.count() + # if image_count > 0: + # self.image_file_list.setCurrentItem(self.image_file_list.item(image_count - 1)) + + # just in case there have been changes while loading the files + self.__load_directory() + + def __load_image(self, file_name: str, on_finished_callback: Callable): + self.__num_images_to_load +=1 + + worker = LoadImageWorker(os.path.join(self.__currentPath, file_name), True, 200) + worker.signals.finished.connect(lambda result: self.__on_image_loaded(result, on_finished_callback)) + + self.spinner.startAnimation() + self.__threadpool.start(worker) + + def __on_image_loaded(self, result: LoadImageWorkerResult, on_finished_callback: Callable): + QPixmapCache.insert(result.path, QPixmap.fromImage(result.image)) + on_finished_callback(result) + + self.__num_images_to_load -= 1 + if self.__num_images_to_load == 0: + self.__on_directory_loaded() + + self.spinner.stopAnimation() + + def __on_select_image_file(self, item: ImageFileListItem): + if item: + file_path = item.path + cached_image = QPixmapCache.find(file_path) + if cached_image: + print("cache hit") + self.photo_viewer.setPhoto(cached_image) + else: + print("cache miss") + self.__load_image(file_path, lambda result: self.photo_viewer.setPhoto(QPixmap.fromImage(result.image))) + else: + self.photo_viewer.setPhoto(None) + + def __add_image_item(self, image_worker_result: LoadImageWorkerResult): + list_item = ImageFileListItem(image_worker_result.path, image_worker_result.thumbnail) + + exposure_time = image_worker_result.exif["ExposureTime"].real + f_number = image_worker_result.exif["FNumber"] + list_item.setText("%s\nf/%s | %s" % (list_item.file_name, f_number, exposure_time)) + + # Only add the item to the list if a directory is still open. This function + # can be called asynchronously from a thread so the directory could have been + # closed in the meantime. + with QMutexLocker(self.__mutex): + if self.__currentPath: + self.image_file_list.addItem(list_item) + self.image_file_list.sortItems() + self.image_file_list.scrollToBottom() + + self.image_file_list.currentItemChanged.disconnect() + self.image_file_list.setCurrentItem(list_item) + self.image_file_list.currentItemChanged.connect(self.__on_select_image_file) + + image_path = image_worker_result.path + pixmap: QPixmap = QPixmapCache.find(image_path) or QPixmap.fromImage(image_worker_result.image) + self.photo_viewer.setPhoto(pixmap) + if self.image_file_list.indexFromItem(list_item).row() == 0: + self.photo_viewer.fitInView() + + + + def __center_spinner_over_photo_viewer(self): + spinner_x = (self.viewer_container.width() - 80) / 2 + spinner_y = (self.viewer_container.height() - 80) / 2 + self.spinner.setGeometry(int(spinner_x), int(spinner_y), 80, 80) diff --git a/requirements.txt b/requirements.txt index d34cd16..ecc0982 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ gphoto2~=2.5.0 -PyQt6~=6.7.0 -Pillow~=10.3.0 +#PyQt6~=6.7.0 +#Pillow~=10.3.0 Send2Trash~=1.8.3 -bleak==0.22.1 +#bleak==0.22.1 qasync~=0.27.1 diff --git a/settings_dialog.py b/settings_dialog.py index cd85871..bab62d9 100644 --- a/settings_dialog.py +++ b/settings_dialog.py @@ -1,69 +1,70 @@ -from typing import Any - -from PyQt6.QtCore import QSettings, QVariant, Qt -from PyQt6.QtGui import QAction, QIcon -from PyQt6.QtWidgets import QDialog, QLineEdit, QFileDialog, QToolButton, QSpinBox, QCheckBox -from PyQt6.uic import loadUi - - -class SettingsDialog(QDialog): - - def __init__(self, q_settings: QSettings, parent=None): - super(SettingsDialog, self).__init__(parent) - self.__q_settings = q_settings - self.settings: dict[str, Any] = dict() - - loadUi('ui/settings_dialog.ui', self) - - self.working_directory_input: QLineEdit = self.findChild(QLineEdit, "workingDirectoryInput") - self.working_directory_input.setText(q_settings.value("workingDirectory")) - self.working_directory_input.textChanged.connect( - lambda text: self.set("workingDirectory", text) - ) - - open_action = QAction("Arbeitsverzeichnis wählen", self) - open_action.setIcon(QIcon("ui/folder-open.svg")) - open_action.triggered.connect(self.choose_working_directory) - - self.working_directory_input.addAction(open_action, QLineEdit.ActionPosition.TrailingPosition) - tool_button: QToolButton - for tool_button in self.working_directory_input.findChildren(QToolButton): - tool_button.setCursor(Qt.CursorShape.PointingHandCursor) - - self.max_pixmap_cache_input: QSpinBox = self.findChild(QSpinBox, "maxPixmapCacheInput") - self.max_pixmap_cache_input.setValue(int(q_settings.value("maxPixmapCache"))) - self.max_pixmap_cache_input.textChanged.connect( - lambda text: self.set("maxPixmapCache", int(text)) - ) - - self.max_burst_number_input: QSpinBox = self.findChild(QSpinBox, "maxBurstNumberInput") - self.max_burst_number_input.setValue(int(q_settings.value("maxBurstNumber"))) - self.max_burst_number_input.textChanged.connect( - lambda text: self.set("maxBurstNumber", int(text)) - ) - - self.enable_bluetooth_checkbox: QCheckBox = self.findChild(QCheckBox, "enableBluetoothCheckbox") - self.enable_bluetooth_checkbox.setChecked(q_settings.value("enableBluetooth", type=bool)) - self.enable_bluetooth_checkbox.stateChanged.connect( - lambda: self.set("enableBluetooth", self.enable_bluetooth_checkbox.isChecked()) - ) - - self.enable_second_screen_mirror_checkbox: QCheckBox = self.findChild(QCheckBox, "enableSecondScreenMirrorCheckbox") - self.enable_second_screen_mirror_checkbox.setChecked(q_settings.value("enableSecondScreenMirror", type=bool)) - self.enable_second_screen_mirror_checkbox.stateChanged.connect( - lambda: self.set("enableSecondScreenMirror", self.enable_second_screen_mirror_checkbox.isChecked()) - ) - - def set(self, name: str, value: QVariant): - self.settings[name] = value - - def choose_working_directory(self): - file_dialog = QFileDialog(self, - "Arbeitsverzeichnis wählen", - self.working_directory_input.text()) - file_dialog.setFileMode(QFileDialog.FileMode.Directory) - if file_dialog.exec(): - print(file_dialog.selectedFiles()) - - if file_dialog.selectedFiles(): - self.working_directory_input.setText(file_dialog.selectedFiles()[0]) +from typing import Any + +from PyQt6.QtCore import QSettings, QVariant, Qt +from PyQt6.QtGui import QAction, QIcon +from PyQt6.QtWidgets import QDialog, QLineEdit, QFileDialog, QToolButton, QSpinBox, QCheckBox +from PyQt6.uic import loadUi + +from helpers import get_ui_path + +class SettingsDialog(QDialog): + + def __init__(self, q_settings: QSettings, parent=None): + super(SettingsDialog, self).__init__(parent) + self.__q_settings = q_settings + self.settings: dict[str, Any] = dict() + + loadUi(get_ui_path('ui/settings_dialog.ui'), self) + + self.working_directory_input: QLineEdit = self.findChild(QLineEdit, "workingDirectoryInput") + self.working_directory_input.setText(q_settings.value("workingDirectory")) + self.working_directory_input.textChanged.connect( + lambda text: self.set("workingDirectory", text) + ) + + open_action = QAction("Arbeitsverzeichnis wählen", self) + open_action.setIcon(QIcon(get_ui_path("ui/folder-open.svg"))) + open_action.triggered.connect(self.choose_working_directory) + + self.working_directory_input.addAction(open_action, QLineEdit.ActionPosition.TrailingPosition) + tool_button: QToolButton + for tool_button in self.working_directory_input.findChildren(QToolButton): + tool_button.setCursor(Qt.CursorShape.PointingHandCursor) + + self.max_pixmap_cache_input: QSpinBox = self.findChild(QSpinBox, "maxPixmapCacheInput") + self.max_pixmap_cache_input.setValue(int(q_settings.value("maxPixmapCache"))) + self.max_pixmap_cache_input.textChanged.connect( + lambda text: self.set("maxPixmapCache", int(text)) + ) + + self.max_burst_number_input: QSpinBox = self.findChild(QSpinBox, "maxBurstNumberInput") + self.max_burst_number_input.setValue(int(q_settings.value("maxBurstNumber"))) + self.max_burst_number_input.textChanged.connect( + lambda text: self.set("maxBurstNumber", int(text)) + ) + + self.enable_bluetooth_checkbox: QCheckBox = self.findChild(QCheckBox, "enableBluetoothCheckbox") + self.enable_bluetooth_checkbox.setChecked(q_settings.value("enableBluetooth", type=bool)) + self.enable_bluetooth_checkbox.stateChanged.connect( + lambda: self.set("enableBluetooth", self.enable_bluetooth_checkbox.isChecked()) + ) + + self.enable_second_screen_mirror_checkbox: QCheckBox = self.findChild(QCheckBox, "enableSecondScreenMirrorCheckbox") + self.enable_second_screen_mirror_checkbox.setChecked(q_settings.value("enableSecondScreenMirror", type=bool)) + self.enable_second_screen_mirror_checkbox.stateChanged.connect( + lambda: self.set("enableSecondScreenMirror", self.enable_second_screen_mirror_checkbox.isChecked()) + ) + + def set(self, name: str, value: QVariant): + self.settings[name] = value + + def choose_working_directory(self): + file_dialog = QFileDialog(self, + "Arbeitsverzeichnis wählen", + self.working_directory_input.text()) + file_dialog.setFileMode(QFileDialog.FileMode.Directory) + if file_dialog.exec(): + print(file_dialog.selectedFiles()) + + if file_dialog.selectedFiles(): + self.working_directory_input.setText(file_dialog.selectedFiles()[0])