diff --git a/.github/workflows/build-win.yml b/.github/workflows/build-win.yml
new file mode 100644
index 0000000..9bae82e
--- /dev/null
+++ b/.github/workflows/build-win.yml
@@ -0,0 +1,54 @@
+name: Build for Windows
+
+on:
+ push:
+ branches: [ main ]
+ pull_request:
+ branches: [ main ]
+ workflow_dispatch:
+
+jobs:
+ build:
+ runs-on: windows-latest
+ defaults:
+ run:
+ shell: msys2 {0}
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Setup MSYS2
+ uses: msys2/setup-msys2@v2
+ with:
+ msystem: MINGW64
+ update: true
+ install: >-
+ mingw-w64-x86_64-python
+ mingw-w64-x86_64-python-pip
+ mingw-w64-x86_64-libgphoto2
+ mingw-w64-x86_64-qt6-base
+ mingw-w64-x86_64-pkg-config
+ mingw-w64-x86_64-python-pip
+ mingw-w64-x86_64-python-pyqt6
+ mingw-w64-x86_64-python-pillow
+ mingw-w64-x86_64-pyinstaller
+ wget
+ p7zip
+ mingw-w64-x86_64-jq
+
+ - name: Set up Python environment
+ run: |
+ python -m pip install --upgrade pip
+ python -m pip install -r requirements.txt
+
+ - name: Build
+ run: |
+ chmod +x build_win.sh
+ ./build_win.sh
+
+ - name: Upload artifacts
+ uses: actions/upload-artifact@v4
+ with:
+ name: build-artifacts
+ path: |
+ dist/main
diff --git a/build_win.sh b/build_win.sh
new file mode 100644
index 0000000..4e2d876
--- /dev/null
+++ b/build_win.sh
@@ -0,0 +1,35 @@
+#!/bin/bash
+set -euo pipefail
+
+# Get latest release info from GitHub API
+LIBUSB_RELEASE_INFO=$(wget -qO- https://api.github.com/repos/libusb/libusb/releases/latest)
+
+# Extract Windows release asset URL (only .7z file)
+LIBUSB_WIN_URL=$(echo "$LIBUSB_RELEASE_INFO" | \
+ jq -r '.assets[] | select(.name | endswith(".7z")) | .browser_download_url')
+LIBUSB_FILENAME=$(basename "$LIBUSB_WIN_URL")
+
+# Create vendor directory if it doesn't exist
+mkdir -p vendor/libusb
+
+# Download and extract
+cd vendor
+if [ ! -f "$LIBUSB_FILENAME" ]; then
+ wget -N "$LIBUSB_WIN_URL"
+else
+ echo $LIBUSB_FILENAME already exists, skipping download.
+fi
+7z x "$LIBUSB_FILENAME" -aos -olibusb
+
+# Clean up downloaded archive
+#rm libusb.7z
+
+cd ..
+
+pyinstaller --onedir \
+ --add-binary /mingw64/lib/libgphoto2_port/0.12.2/:. \
+ --add-binary /mingw64/lib/libgphoto2/2.5.31/:. \
+ --add-binary ./vendor/libusb/MinGW64/dll/libusb-1.0.dll:. \
+ --add-data ui:ui main.py \
+ --runtime-hook ./build_win_hook.py \
+ --noconfirm
diff --git a/build_win_hook.py b/build_win_hook.py
new file mode 100644
index 0000000..5e0d1f0
--- /dev/null
+++ b/build_win_hook.py
@@ -0,0 +1,7 @@
+import os
+import sys
+import logging
+
+logging.info(f"Set IOLIBS and CAMLIBS to: #{sys._MEIPASS}")
+os.environ["IOLIBS"] = sys._MEIPASS
+os.environ["CAMLIBS"] = sys._MEIPASS
\ No newline at end of file
diff --git a/camera_worker.py b/camera_worker.py
index fb4c23e..16852ea 100644
--- a/camera_worker.py
+++ b/camera_worker.py
@@ -1,552 +1,552 @@
-import io
-import logging
-import os
-import re
-from contextlib import contextmanager
-from enum import Enum
-from string import Template
-from time import sleep
-from typing import NamedTuple, Literal, Generator, Union
-
-import gphoto2 as gp
-from PIL import Image
-from PyQt6.QtCore import pyqtSignal, QObject, QElapsedTimer, QTimer, Qt, pyqtSlot
-from PyQt6.QtWidgets import QApplication
-from gphoto2 import CameraWidget
-
-EVENT_DESCRIPTIONS = {
- gp.GP_EVENT_UNKNOWN: "Unknown",
- gp.GP_EVENT_CAPTURE_COMPLETE: "Capture Complete",
- gp.GP_EVENT_FILE_ADDED: "File Added",
- gp.GP_EVENT_FOLDER_ADDED: "Folder Added",
- gp.GP_EVENT_TIMEOUT: "Timeout"
-}
-
-class NikonPTPError(Enum):
- OutOfFocus = "0xa002"
-
-class ConfigRequest():
- class Signal(QObject):
- got_config = pyqtSignal(gp.CameraWidget)
-
- def __init__(self):
- self.signal = ConfigRequest.Signal()
-
-
-class CaptureImagesRequest():
- class Signal(QObject):
- file_received = pyqtSignal(str)
-
- file_path_template: str
- num_images: int
- expect_files: int = 1
- max_burst: int = 1
- skip: int = 0
- manual_trigger: bool = False
- image_quality: str | None = None
-
- def __init__(self, file_path_template, num_images, expect_files = 1, max_burst = 1, skip = 0, manual_trigger = False, image_quality = None):
- self.file_path_template = file_path_template
- self.num_images = num_images
- self.expect_files = expect_files
- self.max_burst = max_burst
- self.skip = skip
- self.manual_trigger = manual_trigger
- self.image_quality = image_quality
-
- self.signal = CaptureImagesRequest.Signal()
-
-
-
-class LiveViewImage(NamedTuple):
- image: Image.Image
-
-class CameraStates:
- class Waiting:
- pass
-
- class Found:
- def __init__(self, camera_name: str):
- super().__init__()
- self.camera_name = camera_name
-
- class Disconnected:
- def __init__(self, camera_name: str, auto_reconnect: bool = True):
- super().__init__()
- self.auto_reconnect = auto_reconnect
- self.camera_name = camera_name
-
- class Connecting:
- def __init__(self, camera_name: str):
- self.camera_name = camera_name
-
- class Disconnecting:
- pass
-
- class Ready:
- def __init__(self, camera_name: str):
- super().__init__()
- self.camera_name = camera_name
-
- class LiveViewStarted(NamedTuple):
- current_lightmeter_value: int
-
- class LiveViewActive:
- pass
-
- class FocusStarted:
- pass
-
- class FocusFinished(NamedTuple):
- success: bool
-
- class LiveViewStopped:
- pass
-
- class CaptureInProgress:
- def __init__(self, capture_request: CaptureImagesRequest, num_captured: int):
- super().__init__()
- self.num_captured = num_captured
- self.capture_request = capture_request
-
- class CaptureFinished:
- def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int, num_captured: int):
- super().__init__()
- self.num_captured = num_captured
- self.elapsed_time = elapsed_time
- self.capture_request = capture_request
-
- class CaptureCancelling:
- pass
-
- class CaptureCanceled:
- def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int):
- super().__init__()
- self.elapsed_time = elapsed_time
- self.capture_request = capture_request
-
- class CaptureError:
- def __init__(self, capture_request: CaptureImagesRequest, error: str):
- super().__init__()
- self.capture_request = capture_request
- self.error = error
-
- class IOError:
- def __init__(self, error: str):
- super().__init__()
- self.error = error
-
- class ConnectionError:
- def __init__(self, error: gp.GPhoto2Error):
- super().__init__()
- self.error = error
-
- StateType = Union[
- Waiting, Found, Disconnected, Connecting, Disconnecting, Ready, CaptureInProgress,
- CaptureFinished, CaptureCanceled, CaptureCancelling, CaptureError, IOError, ConnectionError, LiveViewStarted, LiveViewStopped,
- LiveViewActive, FocusStarted, FocusFinished
- ]
-
-
-class CameraCommands(QObject):
- capture_images = pyqtSignal(CaptureImagesRequest)
- find_camera = pyqtSignal()
- connect_camera = pyqtSignal()
- disconnect_camera = pyqtSignal()
- reconnect_camera = pyqtSignal()
- set_config = pyqtSignal(gp.CameraWidget)
- set_single_config = pyqtSignal(str, str)
- cancel = pyqtSignal()
- live_view = pyqtSignal(bool)
- trigger_autofocus = pyqtSignal()
- get_config = pyqtSignal(ConfigRequest)
-
-
-class PropertyChangeEvent(NamedTuple):
- property: str
- property_name: str
- value: str | float
-
-
-class CameraEvents(QObject):
- config_updated = pyqtSignal(gp.CameraWidget)
-
-
-class CameraWorker(QObject):
- initialized = pyqtSignal()
- state_changed = pyqtSignal(object)
- property_changed = pyqtSignal(PropertyChangeEvent)
- preview_image = pyqtSignal(LiveViewImage)
-
- def __init__(self, parent=None):
- super(CameraWorker, self).__init__(parent)
-
- self.__logger = logging.getLogger(self.__class__.__name__)
-
- self.__logging_callback_extract_gp2_error = None
- self.__state = None
- self.commands = CameraCommands()
- self.events = CameraEvents()
-
- self.camera: gp.Camera = None
- self.camera_name: str = None
-
- self.filesCounter = 0
- self.captureComplete = False
-
- self.shouldCancel = False
- self.liveView = False
- self.timer: QTimer = None
-
- self.__last_ptp_error: NikonPTPError = None
-
- def initialize(self):
- self.__logger.info("Init Camera Worker")
- self.timer = QTimer()
-
- self.commands.capture_images.connect(self.captureImages)
- self.commands.find_camera.connect(self.__find_camera)
- self.commands.connect_camera.connect(self.__connect_camera)
- self.commands.disconnect_camera.connect(lambda: self.__disconnect_camera(False))
- self.commands.reconnect_camera.connect(lambda: self.__disconnect_camera(True))
- self.commands.set_config.connect(self.__set_config)
- self.commands.set_single_config.connect(self.__set_single_config)
- self.commands.cancel.connect(self.__cancel)
- self.commands.live_view.connect(lambda active: self.__start_live_view() if active else self.__stop_live_view())
- self.commands.trigger_autofocus.connect(self.__trigger_autofocus)
- self.commands.get_config.connect(self.__get_config)
-
- self.__set_state(CameraStates.Waiting())
-
- self.initialized.emit()
-
- # self.__logging_callback_python_logging = gp.check_result(gp.use_python_logging(mapping={
- # gp.GP_LOG_ERROR: logging.INFO,
- # gp.GP_LOG_DEBUG: logging.DEBUG,
- # gp.GP_LOG_VERBOSE: logging.DEBUG - 3,
- # gp.GP_LOG_DATA: logging.DEBUG - 6}))
-
- self.__logging_callback_extract_gp2_error = gp.check_result(
- gp.gp_log_add_func(gp.GP_LOG_ERROR, self.__extract_gp2_error_from_log))
-
- def __extract_gp2_error_from_log(self, _level: int, domain: bytes, string: bytes, _data=None):
- error_str = string
- for ptp_error in NikonPTPError:
- error_suffix = "(%s)" % ptp_error.value
- if error_str.endswith(error_suffix):
- self.__last_ptp_error = ptp_error
- self.__logger.debug("PTP Error: {} {}".format(ptp_error, error_suffix))
-
- def __set_state(self, state: CameraStates.StateType):
- self.__state = state
- self.__logger.debug("Set camera state: " + state.__class__.__name__)
- self.state_changed.emit(state)
-
- @staticmethod
- def __handle_camera_error(func):
- def wrapper(self, *args, **kwargs):
- try:
- return func(self, *args, **kwargs)
- except gp.GPhoto2Error as err:
- self.__logger.exception("Camera Error {0}: {1}".format(err.code, err.string))
- self.__set_state(CameraStates.ConnectionError(error=err))
- self.__disconnect_camera()
-
- return wrapper
-
- def __find_camera(self):
- self.__set_state(CameraStates.Waiting())
- camera_list = None
- while not camera_list and not self.thread().isInterruptionRequested():
- self.__logger.info("Waiting for camera...")
- camera_list = list(gp.Camera.autodetect())
- sleep(1)
-
- name, _ = camera_list[0]
- self.__set_state(CameraStates.Found(camera_name=name))
-
- @__handle_camera_error
- def __connect_camera(self):
- self.__set_state(CameraStates.Connecting(self.camera_name))
- self.camera = gp.Camera()
- self.__last_ptp_error = None
-
- self.camera.init()
- self.empty_event_queue(1000)
- with self.__open_config("read") as cfg:
- self.events.config_updated.emit(cfg)
- self.camera_name = "%s %s" % (
- cfg.get_child_by_name("manufacturer").get_value(),
- cfg.get_child_by_name("cameramodel").get_value()
- )
- self.__set_state(CameraStates.Ready(self.camera_name))
-
- while self.camera:
- if self.thread().isInterruptionRequested():
- self.__disconnect_camera()
- self.thread().exit()
- return
-
- try:
- if isinstance(self.__state, CameraStates.LiveViewActive):
- self.__live_view_capture_preview()
- # self.empty_event_queue(1)
- self.thread().msleep(50)
- else:
- self.empty_event_queue(1)
- finally:
- QApplication.processEvents()
-
- def __disconnect_camera(self, auto_reconnect=True):
- self.__set_state(CameraStates.Disconnecting())
-
- # Ignore errors while disconnecting
- try:
- self.camera.exit()
- except gp.GPhoto2Error:
- pass
- except AttributeError: # Camera already gone
- pass
-
- self.__set_state(CameraStates.Disconnected(camera_name=self.camera_name, auto_reconnect=auto_reconnect))
- self.camera = None
- self.camera_name = None
-
- @__handle_camera_error
- def __set_single_config(self, name, value):
- self.__logger.info("Set config %s to %s" % (name, value))
- with self.__open_config("write") as cfg:
- cfg_widget = cfg.get_child_by_name(name)
- cfg_widget.set_value(value)
-
- self.empty_event_queue()
- self.__emit_current_config()
-
- @__handle_camera_error
- def __set_config(self, cfg: gp.CameraWidget):
- self.camera.set_config(cfg)
-
- @__handle_camera_error
- def __get_config(self, req: ConfigRequest):
- with self.__open_config("read") as cfg:
- req.signal.got_config.emit(cfg)
-
- def __emit_current_config(self):
- with self.__open_config("read") as cfg:
- self.events.config_updated.emit(cfg)
-
- def __cancel(self):
- self.shouldCancel = True
- self.__set_state(CameraStates.CaptureCancelling())
-
- def empty_event_queue(self, timeout=100):
- event_type, data = self.camera.wait_for_event(timeout)
-
- while event_type != gp.GP_EVENT_TIMEOUT:
- self.__logger.debug("Event: %s, data: %s" % (EVENT_DESCRIPTIONS.get(event_type, "Unknown"), data))
- QApplication.processEvents()
-
- if event_type == gp.GP_EVENT_FILE_ADDED:
- cam_file_path = os.path.join(data.folder, data.name)
- self.__logger.info("New file: %s" % cam_file_path)
- basename, extension = os.path.splitext(data.name)
-
- if isinstance(self.__state, CameraStates.CaptureInProgress) and not self.shouldCancel and not self.thread().isInterruptionRequested():
- tpl = Template(self.__state.capture_request.file_path_template)
- file_target_path = tpl.substitute(
- basename=basename,
- extension=extension,
- num=str(self.filesCounter + 1).zfill(3)
- )
- cam_file = self.camera.file_get(
- data.folder, data.name, gp.GP_FILE_TYPE_NORMAL)
- self.__logger.info("Saving to %s" % file_target_path)
- cam_file.save(file_target_path)
-
- current_capture_req = self.__state.capture_request
- current_capture_req.signal.file_received.emit(file_target_path)
- if self.filesCounter % current_capture_req.expect_files == 0:
- num_captured = int(self.filesCounter / current_capture_req.expect_files)
- self.__set_state(CameraStates.CaptureInProgress(self.__state.capture_request, num_captured))
-
- self.filesCounter += 1
- else:
- self.__logger.warning(
- "Received file but capture not in progress, ignoring. State: " + self.__state.__class__.__name__)
-
- self.camera.file_delete(data.folder, data.name)
-
- elif event_type == gp.GP_EVENT_CAPTURE_COMPLETE:
- self.captureComplete = True
-
- elif event_type == gp.GP_EVENT_UNKNOWN:
- match = re.search(r'PTP Property (\w+) changed, "(\w+)" to "(-?\d+[,\.]\d+)"', data)
- if match:
- property = match.group(1)
- property_name = match.group(2)
- value_str = match.group(3)
- try:
- value = float(value_str.replace(',', '.'))
- except ValueError:
- value = value_str
- self.property_changed.emit(PropertyChangeEvent(property=property, property_name=property_name, value=value))
- # print(f"Property '{property_name}' changed to {value}")
-
- # try to grab another event
- event_type, data = self.camera.wait_for_event(1)
-
- @__handle_camera_error
- def __start_live_view(self):
- lightmeter: int
- with self.__open_config("write") as cfg:
- self.__try_set_config(cfg, "capturetarget", "Internal RAM")
- self.__try_set_config(cfg, "recordingmedia", "SDRAM")
- self.__try_set_config(cfg, "viewfinder", 1)
- self.__try_set_config(cfg, "liveviewsize", "VGA")
- lightmeter = cfg.get_child_by_name("lightmeter").get_value()
- self.__set_state(CameraStates.LiveViewStarted(current_lightmeter_value=lightmeter))
- self.__set_state(CameraStates.LiveViewActive())
-
- @__handle_camera_error
- def __live_view_capture_preview(self):
- lightmeter = None
- try:
- camera_file = self.camera.capture_preview()
- file_data = camera_file.get_data_and_size()
- image = Image.open(io.BytesIO(file_data))
-
- self.empty_event_queue(1)
- self.preview_image.emit(LiveViewImage(image=image))
-
- #
- # self.preview_image.emit(LiveViewImage(image=image, lightmeter_value=lightmeter))
- except gp.GPhoto2Error:
- self.__stop_live_view()
-
- @__handle_camera_error
- def __stop_live_view(self):
- if isinstance(self.__state, CameraStates.LiveViewActive):
- with self.__open_config("write") as cfg:
- self.__try_set_config(cfg, "viewfinder", 0)
- self.__try_set_config(cfg, "autofocusdrive", 0)
- self.__set_state(CameraStates.LiveViewStopped())
- self.__set_state(CameraStates.Ready(self.camera_name))
-
- @__handle_camera_error
- def __trigger_autofocus(self):
- lightmeter = None
- self.__set_state(CameraStates.FocusStarted())
- try:
- with self.__open_config("write") as cfg:
- lightmeter = cfg.get_child_by_name("lightmeter").get_value()
- self.__try_set_config(cfg, "autofocusdrive", 1)
- self.__set_state(CameraStates.FocusFinished(success=True))
- except gp.GPhoto2Error:
- if self.__last_ptp_error == NikonPTPError.OutOfFocus:
- self.__logger.warning("Could not get focus (light: %s)." % lightmeter)
- self.__set_state(CameraStates.FocusFinished(success=False))
- else:
- raise
- finally:
- with self.__open_config("write") as cfg:
- self.__try_set_config(cfg, "autofocusdrive", 0)
- self.__set_state(CameraStates.LiveViewActive())
- # TODO handle general camera error
-
- def captureImages(self, capture_req: CaptureImagesRequest):
- if isinstance(self.__state, CameraStates.LiveViewActive):
- self.__stop_live_view()
-
- self.__logger.info("Start capture (%s)", str(capture_req))
-
- timer = QElapsedTimer()
- try:
- timer.start()
- self.empty_event_queue()
- self.filesCounter = 0
- self.captureComplete = False
-
- self.__set_state(CameraStates.CaptureInProgress(capture_request=capture_req, num_captured=0))
-
- with self.__open_config("write") as cfg:
- self.__try_set_config(cfg, "capturetarget", "Internal RAM")
- self.__try_set_config(cfg, "recordingmedia", "SDRAM")
- self.__try_set_config(cfg, "viewfinder", 1)
- self.__try_set_config(cfg, "autofocusdrive", 0)
- self.__try_set_config(cfg, "focusmode", "Manual")
- self.__try_set_config(cfg, "focusmode2", "MF (fixed)")
- if capture_req.image_quality:
- self.__try_set_config(cfg, "imagequality", capture_req.image_quality)
-
- self.thread().sleep(1)
-
- remaining = capture_req.num_images * capture_req.expect_files
- while remaining > 0 and not self.shouldCancel and not self.thread().isInterruptionRequested():
- burst = min(capture_req.max_burst, int(remaining / capture_req.expect_files))
- if not capture_req.manual_trigger:
- with self.__open_config("write") as cfg:
- self.__try_set_config(cfg, "burstnumber", burst)
-
- self.captureComplete = False
- if not capture_req.manual_trigger:
- self.camera.trigger_capture()
- while not self.captureComplete:
- self.empty_event_queue(timeout=100)
- QApplication.processEvents()
-
- remaining = capture_req.num_images * capture_req.expect_files - self.filesCounter
- self.__logger.info("Curr. files: {0} (remaining: {1}).".format(self.filesCounter, remaining))
-
- if not capture_req.manual_trigger:
- with self.__open_config("write") as cfg:
- self.__try_set_config(cfg, "burstnumber", burst)
-
- self.__logger.info("No. Files captured: {0} (took {1}).".format(self.filesCounter, timer.elapsed()))
-
- if not self.shouldCancel:
- num_captured = int(self.filesCounter / capture_req.expect_files)
- self.__set_state(
- CameraStates.CaptureFinished(capture_req, elapsed_time=timer.elapsed(), num_captured=num_captured))
- else:
- self.__logger.info("Capture cancelled")
- self.__set_state(CameraStates.CaptureCanceled(capture_req, elapsed_time=timer.elapsed()))
-
- except gp.GPhoto2Error as err:
- self.__set_state(CameraStates.CaptureError(capture_req, err.string))
- finally:
- self.shouldCancel = False
- # If camera is still there, try to reset Camera to a default state
- if self.camera:
- try:
- with self.__open_config("write") as cfg:
- # TODO: enable again when trigger works
- self.__try_set_config(cfg, "viewfinder", 0)
- if not capture_req.manual_trigger:
- self.__try_set_config(cfg, "burstnumber", 1)
- self.empty_event_queue()
- self.__set_state(CameraStates.Ready(self.camera_name))
- except gp.GPhoto2Error as err:
- self.__set_state(CameraStates.ConnectionError(err.string))
-
- @contextmanager
- def __open_config(self, mode: Literal["read", "write"]) -> Generator[CameraWidget, None, None]:
- cfg: CameraWidget = None
- try:
- cfg = self.camera.get_config()
- yield cfg
- finally:
- if mode == "write":
- self.camera.set_config(cfg)
- elif not mode == "read":
- raise Exception("Invalid cfg open mode: %s" % mode)
-
- def __try_set_config(self, config: CameraWidget, name: str, value) -> None:
- try:
- config_widget = config.get_child_by_name(name)
- config_widget.set_value(value)
- self.__logger.info("Set config '%s' to %s." % (name, str(value)))
- except gp.GPhoto2Error:
- self.__logger.error("Config '%s' not supported by camera." % name)
-
- def __del__(self):
- self.__disconnect_camera()
+import io
+import logging
+import os
+import re
+from contextlib import contextmanager
+from enum import Enum
+from string import Template
+from time import sleep
+from typing import NamedTuple, Literal, Generator, Union
+
+import gphoto2 as gp
+from PIL import Image
+from PyQt6.QtCore import pyqtSignal, QObject, QElapsedTimer, QTimer, Qt, pyqtSlot
+from PyQt6.QtWidgets import QApplication
+from gphoto2 import CameraWidget
+
+EVENT_DESCRIPTIONS = {
+ gp.GP_EVENT_UNKNOWN: "Unknown",
+ gp.GP_EVENT_CAPTURE_COMPLETE: "Capture Complete",
+ gp.GP_EVENT_FILE_ADDED: "File Added",
+ gp.GP_EVENT_FOLDER_ADDED: "Folder Added",
+ gp.GP_EVENT_TIMEOUT: "Timeout"
+}
+
+class NikonPTPError(Enum):
+ OutOfFocus = "0xa002"
+
+class ConfigRequest():
+ class Signal(QObject):
+ got_config = pyqtSignal(gp.CameraWidget)
+
+ def __init__(self):
+ self.signal = ConfigRequest.Signal()
+
+
+class CaptureImagesRequest():
+ class Signal(QObject):
+ file_received = pyqtSignal(str)
+
+ file_path_template: str
+ num_images: int
+ expect_files: int = 1
+ max_burst: int = 1
+ skip: int = 0
+ manual_trigger: bool = False
+ image_quality: str | None = None
+
+ def __init__(self, file_path_template, num_images, expect_files = 1, max_burst = 1, skip = 0, manual_trigger = False, image_quality = None):
+ self.file_path_template = file_path_template
+ self.num_images = num_images
+ self.expect_files = expect_files
+ self.max_burst = max_burst
+ self.skip = skip
+ self.manual_trigger = manual_trigger
+ self.image_quality = image_quality
+
+ self.signal = CaptureImagesRequest.Signal()
+
+
+
+class LiveViewImage(NamedTuple):
+ image: Image.Image
+
+class CameraStates:
+ class Waiting:
+ pass
+
+ class Found:
+ def __init__(self, camera_name: str):
+ super().__init__()
+ self.camera_name = camera_name
+
+ class Disconnected:
+ def __init__(self, camera_name: str, auto_reconnect: bool = True):
+ super().__init__()
+ self.auto_reconnect = auto_reconnect
+ self.camera_name = camera_name
+
+ class Connecting:
+ def __init__(self, camera_name: str):
+ self.camera_name = camera_name
+
+ class Disconnecting:
+ pass
+
+ class Ready:
+ def __init__(self, camera_name: str):
+ super().__init__()
+ self.camera_name = camera_name
+
+ class LiveViewStarted(NamedTuple):
+ current_lightmeter_value: int
+
+ class LiveViewActive:
+ pass
+
+ class FocusStarted:
+ pass
+
+ class FocusFinished(NamedTuple):
+ success: bool
+
+ class LiveViewStopped:
+ pass
+
+ class CaptureInProgress:
+ def __init__(self, capture_request: CaptureImagesRequest, num_captured: int):
+ super().__init__()
+ self.num_captured = num_captured
+ self.capture_request = capture_request
+
+ class CaptureFinished:
+ def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int, num_captured: int):
+ super().__init__()
+ self.num_captured = num_captured
+ self.elapsed_time = elapsed_time
+ self.capture_request = capture_request
+
+ class CaptureCancelling:
+ pass
+
+ class CaptureCanceled:
+ def __init__(self, capture_request: CaptureImagesRequest, elapsed_time: int):
+ super().__init__()
+ self.elapsed_time = elapsed_time
+ self.capture_request = capture_request
+
+ class CaptureError:
+ def __init__(self, capture_request: CaptureImagesRequest, error: str):
+ super().__init__()
+ self.capture_request = capture_request
+ self.error = error
+
+ class IOError:
+ def __init__(self, error: str):
+ super().__init__()
+ self.error = error
+
+ class ConnectionError:
+ def __init__(self, error: gp.GPhoto2Error):
+ super().__init__()
+ self.error = error
+
+ StateType = Union[
+ Waiting, Found, Disconnected, Connecting, Disconnecting, Ready, CaptureInProgress,
+ CaptureFinished, CaptureCanceled, CaptureCancelling, CaptureError, IOError, ConnectionError, LiveViewStarted, LiveViewStopped,
+ LiveViewActive, FocusStarted, FocusFinished
+ ]
+
+
+class CameraCommands(QObject):
+ capture_images = pyqtSignal(CaptureImagesRequest)
+ find_camera = pyqtSignal()
+ connect_camera = pyqtSignal()
+ disconnect_camera = pyqtSignal()
+ reconnect_camera = pyqtSignal()
+ set_config = pyqtSignal(gp.CameraWidget)
+ set_single_config = pyqtSignal(str, str)
+ cancel = pyqtSignal()
+ live_view = pyqtSignal(bool)
+ trigger_autofocus = pyqtSignal()
+ get_config = pyqtSignal(ConfigRequest)
+
+
+class PropertyChangeEvent(NamedTuple):
+ property: str
+ property_name: str
+ value: str | float
+
+
+class CameraEvents(QObject):
+ config_updated = pyqtSignal(gp.CameraWidget)
+
+
+class CameraWorker(QObject):
+ initialized = pyqtSignal()
+ state_changed = pyqtSignal(object)
+ property_changed = pyqtSignal(PropertyChangeEvent)
+ preview_image = pyqtSignal(LiveViewImage)
+
+ def __init__(self, parent=None):
+ super(CameraWorker, self).__init__(parent)
+
+ self.__logger = logging.getLogger(self.__class__.__name__)
+
+ self.__logging_callback_extract_gp2_error = None
+ self.__state = None
+ self.commands = CameraCommands()
+ self.events = CameraEvents()
+
+ self.camera: gp.Camera = None
+ self.camera_name: str = None
+
+ self.filesCounter = 0
+ self.captureComplete = False
+
+ self.shouldCancel = False
+ self.liveView = False
+ self.timer: QTimer = None
+
+ self.__last_ptp_error: NikonPTPError = None
+
+ def initialize(self):
+ self.__logger.info("Init Camera Worker")
+ self.timer = QTimer()
+
+ self.commands.capture_images.connect(self.captureImages)
+ self.commands.find_camera.connect(self.__find_camera)
+ self.commands.connect_camera.connect(self.__connect_camera)
+ self.commands.disconnect_camera.connect(lambda: self.__disconnect_camera(False))
+ self.commands.reconnect_camera.connect(lambda: self.__disconnect_camera(True))
+ self.commands.set_config.connect(self.__set_config)
+ self.commands.set_single_config.connect(self.__set_single_config)
+ self.commands.cancel.connect(self.__cancel)
+ self.commands.live_view.connect(lambda active: self.__start_live_view() if active else self.__stop_live_view())
+ self.commands.trigger_autofocus.connect(self.__trigger_autofocus)
+ self.commands.get_config.connect(self.__get_config)
+
+ self.__set_state(CameraStates.Waiting())
+
+ self.initialized.emit()
+
+ # self.__logging_callback_python_logging = gp.check_result(gp.use_python_logging(mapping={
+ # gp.GP_LOG_ERROR: logging.INFO,
+ # gp.GP_LOG_DEBUG: logging.DEBUG,
+ # gp.GP_LOG_VERBOSE: logging.DEBUG - 3,
+ # gp.GP_LOG_DATA: logging.DEBUG - 6}))
+
+ self.__logging_callback_extract_gp2_error = gp.check_result(
+ gp.gp_log_add_func(gp.GP_LOG_ERROR, self.__extract_gp2_error_from_log))
+
+ def __extract_gp2_error_from_log(self, _level: int, domain: bytes, string: bytes, _data=None):
+ error_str = string
+ for ptp_error in NikonPTPError:
+ error_suffix = "(%s)" % ptp_error.value
+ if error_str.endswith(error_suffix):
+ self.__last_ptp_error = ptp_error
+ self.__logger.debug("PTP Error: {} {}".format(ptp_error, error_suffix))
+
+ def __set_state(self, state: CameraStates.StateType):
+ self.__state = state
+ self.__logger.debug("Set camera state: " + state.__class__.__name__)
+ self.state_changed.emit(state)
+
+ @staticmethod
+ def __handle_camera_error(func):
+ def wrapper(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except gp.GPhoto2Error as err:
+ self.__logger.exception("Camera Error {0}: {1}".format(err.code, err.string))
+ self.__set_state(CameraStates.ConnectionError(error=err))
+ self.__disconnect_camera()
+
+ return wrapper
+
+ def __find_camera(self):
+ self.__set_state(CameraStates.Waiting())
+ camera_list = None
+ while not camera_list and not self.thread().isInterruptionRequested():
+ self.__logger.info("Waiting for camera...")
+ camera_list = list(gp.Camera.autodetect())
+ sleep(1)
+
+ name, _ = camera_list[0]
+ self.__set_state(CameraStates.Found(camera_name=name))
+
+ @__handle_camera_error
+ def __connect_camera(self):
+ self.__set_state(CameraStates.Connecting(self.camera_name))
+ self.camera = gp.Camera()
+ self.__last_ptp_error = None
+
+ self.camera.init()
+ self.empty_event_queue(1000)
+ with self.__open_config("read") as cfg:
+ self.events.config_updated.emit(cfg)
+ self.camera_name = "%s %s" % (
+ cfg.get_child_by_name("manufacturer").get_value(),
+ cfg.get_child_by_name("cameramodel").get_value()
+ )
+ self.__set_state(CameraStates.Ready(self.camera_name))
+
+ while self.camera:
+ if self.thread().isInterruptionRequested():
+ self.__disconnect_camera()
+ self.thread().exit()
+ return
+
+ try:
+ if isinstance(self.__state, CameraStates.LiveViewActive):
+ self.__live_view_capture_preview()
+ # self.empty_event_queue(1)
+ self.thread().msleep(50)
+ else:
+ self.empty_event_queue(1)
+ finally:
+ QApplication.processEvents()
+
+ def __disconnect_camera(self, auto_reconnect=True):
+ self.__set_state(CameraStates.Disconnecting())
+
+ # Ignore errors while disconnecting
+ try:
+ self.camera.exit()
+ except gp.GPhoto2Error:
+ pass
+ except AttributeError: # Camera already gone
+ pass
+
+ self.__set_state(CameraStates.Disconnected(camera_name=self.camera_name, auto_reconnect=auto_reconnect))
+ self.camera = None
+ self.camera_name = None
+
+ @__handle_camera_error
+ def __set_single_config(self, name, value):
+ self.__logger.info("Set config %s to %s" % (name, value))
+ with self.__open_config("write") as cfg:
+ cfg_widget = cfg.get_child_by_name(name)
+ cfg_widget.set_value(value)
+
+ self.empty_event_queue()
+ self.__emit_current_config()
+
+ @__handle_camera_error
+ def __set_config(self, cfg: gp.CameraWidget):
+ self.camera.set_config(cfg)
+
+ @__handle_camera_error
+ def __get_config(self, req: ConfigRequest):
+ with self.__open_config("read") as cfg:
+ req.signal.got_config.emit(cfg)
+
+ def __emit_current_config(self):
+ with self.__open_config("read") as cfg:
+ self.events.config_updated.emit(cfg)
+
+ def __cancel(self):
+ self.shouldCancel = True
+ self.__set_state(CameraStates.CaptureCancelling())
+
+ def empty_event_queue(self, timeout=100):
+ event_type, data = self.camera.wait_for_event(timeout)
+
+ while event_type != gp.GP_EVENT_TIMEOUT:
+ self.__logger.debug("Event: %s, data: %s" % (EVENT_DESCRIPTIONS.get(event_type, "Unknown"), data))
+ QApplication.processEvents()
+
+ if event_type == gp.GP_EVENT_FILE_ADDED:
+ cam_file_path = os.path.join(data.folder, data.name)
+ self.__logger.info("New file: %s" % cam_file_path)
+ basename, extension = os.path.splitext(data.name)
+
+ if isinstance(self.__state, CameraStates.CaptureInProgress) and not self.shouldCancel and not self.thread().isInterruptionRequested():
+ tpl = Template(self.__state.capture_request.file_path_template)
+ file_target_path = tpl.substitute(
+ basename=basename,
+ extension=extension,
+ num=str(self.filesCounter + 1).zfill(3)
+ )
+ cam_file = self.camera.file_get(
+ data.folder, data.name, gp.GP_FILE_TYPE_NORMAL)
+ self.__logger.info("Saving to %s" % file_target_path)
+ cam_file.save(file_target_path)
+
+ current_capture_req = self.__state.capture_request
+ current_capture_req.signal.file_received.emit(file_target_path)
+ if self.filesCounter % current_capture_req.expect_files == 0:
+ num_captured = int(self.filesCounter / current_capture_req.expect_files)
+ self.__set_state(CameraStates.CaptureInProgress(self.__state.capture_request, num_captured))
+
+ self.filesCounter += 1
+ else:
+ self.__logger.warning(
+ "Received file but capture not in progress, ignoring. State: " + self.__state.__class__.__name__)
+
+ self.camera.file_delete(data.folder, data.name)
+
+ elif event_type == gp.GP_EVENT_CAPTURE_COMPLETE:
+ self.captureComplete = True
+
+ elif event_type == gp.GP_EVENT_UNKNOWN:
+ match = re.search(r'PTP Property (\w+) changed, "(\w+)" to "(-?\d+[,\.]\d+)"', data)
+ if match:
+ property = match.group(1)
+ property_name = match.group(2)
+ value_str = match.group(3)
+ try:
+ value = float(value_str.replace(',', '.'))
+ except ValueError:
+ value = value_str
+ self.property_changed.emit(PropertyChangeEvent(property=property, property_name=property_name, value=value))
+ # print(f"Property '{property_name}' changed to {value}")
+
+ # try to grab another event
+ event_type, data = self.camera.wait_for_event(1)
+
+ @__handle_camera_error
+ def __start_live_view(self):
+ lightmeter: int
+ with self.__open_config("write") as cfg:
+ self.__try_set_config(cfg, "capturetarget", "Internal RAM")
+ self.__try_set_config(cfg, "recordingmedia", "SDRAM")
+ self.__try_set_config(cfg, "viewfinder", 1)
+ self.__try_set_config(cfg, "liveviewsize", "VGA")
+ lightmeter = cfg.get_child_by_name("lightmeter").get_value()
+ self.__set_state(CameraStates.LiveViewStarted(current_lightmeter_value=lightmeter))
+ self.__set_state(CameraStates.LiveViewActive())
+
+ @__handle_camera_error
+ def __live_view_capture_preview(self):
+ lightmeter = None
+ try:
+ camera_file = self.camera.capture_preview()
+ file_data = camera_file.get_data_and_size()
+ image = Image.open(io.BytesIO(file_data))
+
+ self.empty_event_queue(1)
+ self.preview_image.emit(LiveViewImage(image=image))
+
+ #
+ # self.preview_image.emit(LiveViewImage(image=image, lightmeter_value=lightmeter))
+ except gp.GPhoto2Error:
+ self.__stop_live_view()
+
+ @__handle_camera_error
+ def __stop_live_view(self):
+ if isinstance(self.__state, CameraStates.LiveViewActive):
+ with self.__open_config("write") as cfg:
+ self.__try_set_config(cfg, "viewfinder", 0)
+ self.__try_set_config(cfg, "autofocusdrive", 0)
+ self.__set_state(CameraStates.LiveViewStopped())
+ self.__set_state(CameraStates.Ready(self.camera_name))
+
+ @__handle_camera_error
+ def __trigger_autofocus(self):
+ lightmeter = None
+ self.__set_state(CameraStates.FocusStarted())
+ try:
+ with self.__open_config("write") as cfg:
+ lightmeter = cfg.get_child_by_name("lightmeter").get_value()
+ self.__try_set_config(cfg, "autofocusdrive", 1)
+ self.__set_state(CameraStates.FocusFinished(success=True))
+ except gp.GPhoto2Error:
+ if self.__last_ptp_error == NikonPTPError.OutOfFocus:
+ self.__logger.warning("Could not get focus (light: %s)." % lightmeter)
+ self.__set_state(CameraStates.FocusFinished(success=False))
+ else:
+ raise
+ finally:
+ with self.__open_config("write") as cfg:
+ self.__try_set_config(cfg, "autofocusdrive", 0)
+ self.__set_state(CameraStates.LiveViewActive())
+ # TODO handle general camera error
+
+ def captureImages(self, capture_req: CaptureImagesRequest):
+ if isinstance(self.__state, CameraStates.LiveViewActive):
+ self.__stop_live_view()
+
+ self.__logger.info("Start capture (%s)", str(capture_req))
+
+ timer = QElapsedTimer()
+ try:
+ timer.start()
+ self.empty_event_queue()
+ self.filesCounter = 0
+ self.captureComplete = False
+
+ self.__set_state(CameraStates.CaptureInProgress(capture_request=capture_req, num_captured=0))
+
+ with self.__open_config("write") as cfg:
+ self.__try_set_config(cfg, "capturetarget", "Internal RAM")
+ self.__try_set_config(cfg, "recordingmedia", "SDRAM")
+ self.__try_set_config(cfg, "viewfinder", 1)
+ self.__try_set_config(cfg, "autofocusdrive", 0)
+ self.__try_set_config(cfg, "focusmode", "Manual")
+ self.__try_set_config(cfg, "focusmode2", "MF (fixed)")
+ if capture_req.image_quality:
+ self.__try_set_config(cfg, "imagequality", capture_req.image_quality)
+
+ self.thread().sleep(1)
+
+ remaining = capture_req.num_images * capture_req.expect_files
+ while remaining > 0 and not self.shouldCancel and not self.thread().isInterruptionRequested():
+ burst = min(capture_req.max_burst, int(remaining / capture_req.expect_files))
+ if not capture_req.manual_trigger:
+ with self.__open_config("write") as cfg:
+ self.__try_set_config(cfg, "burstnumber", burst)
+
+ self.captureComplete = False
+ if not capture_req.manual_trigger:
+ self.camera.trigger_capture()
+ while not self.captureComplete:
+ self.empty_event_queue(timeout=100)
+ QApplication.processEvents()
+
+ remaining = capture_req.num_images * capture_req.expect_files - self.filesCounter
+ self.__logger.info("Curr. files: {0} (remaining: {1}).".format(self.filesCounter, remaining))
+
+ if not capture_req.manual_trigger:
+ with self.__open_config("write") as cfg:
+ self.__try_set_config(cfg, "burstnumber", burst)
+
+ self.__logger.info("No. Files captured: {0} (took {1}).".format(self.filesCounter, timer.elapsed()))
+
+ if not self.shouldCancel:
+ num_captured = int(self.filesCounter / capture_req.expect_files)
+ self.__set_state(
+ CameraStates.CaptureFinished(capture_req, elapsed_time=timer.elapsed(), num_captured=num_captured))
+ else:
+ self.__logger.info("Capture cancelled")
+ self.__set_state(CameraStates.CaptureCanceled(capture_req, elapsed_time=timer.elapsed()))
+
+ except gp.GPhoto2Error as err:
+ self.__set_state(CameraStates.CaptureError(capture_req, err.string))
+ finally:
+ self.shouldCancel = False
+ # If camera is still there, try to reset Camera to a default state
+ if self.camera:
+ try:
+ with self.__open_config("write") as cfg:
+ # TODO: enable again when trigger works
+ self.__try_set_config(cfg, "viewfinder", 0)
+ if not capture_req.manual_trigger:
+ self.__try_set_config(cfg, "burstnumber", 1)
+ self.empty_event_queue()
+ self.__set_state(CameraStates.Ready(self.camera_name))
+ except gp.GPhoto2Error as err:
+ self.__set_state(CameraStates.ConnectionError(err.string))
+
+ @contextmanager
+ def __open_config(self, mode: Literal["read", "write"]) -> Generator[CameraWidget, None, None]:
+ cfg: CameraWidget = None
+ try:
+ cfg = self.camera.get_config()
+ yield cfg
+ finally:
+ if mode == "write":
+ self.camera.set_config(cfg)
+ elif not mode == "read":
+ raise Exception("Invalid cfg open mode: %s" % mode)
+
+ def __try_set_config(self, config: CameraWidget, name: str, value) -> None:
+ try:
+ config_widget = config.get_child_by_name(name)
+ config_widget.set_value(value)
+ self.__logger.info("Set config '%s' to %s." % (name, str(value)))
+ except gp.GPhoto2Error:
+ self.__logger.error("Config '%s' not supported by camera." % name)
+
+ def __del__(self):
+ self.__disconnect_camera()
diff --git a/helpers.py b/helpers.py
new file mode 100644
index 0000000..b407e79
--- /dev/null
+++ b/helpers.py
@@ -0,0 +1,9 @@
+from os import path
+import sys
+
+def get_ui_path(file: str):
+ if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
+ bundle_dir = sys._MEIPASS
+ else:
+ bundle_dir = path.abspath(path.dirname("__FILE__"))
+ return path.join(bundle_dir, file)
\ No newline at end of file
diff --git a/main.py b/main.py
index 7f2f782..05400fb 100755
--- a/main.py
+++ b/main.py
@@ -1,918 +1,919 @@
-import asyncio.exceptions
-import json
-import logging
-import os
-import sys
-import threading
-from enum import Enum
-from pathlib import Path
-
-import gphoto2 as gp
-import qasync
-from qasync import QEventLoop, QThreadExecutor
-from PIL.ImageQt import ImageQt
-from PyQt6.QtCore import QThread, QSettings, QStandardPaths, pyqtSignal, Qt
-from PyQt6.QtGui import QPixmap, QAction, QPixmapCache, QIcon, QColor, QCloseEvent, QBrush, QPainter, QCursor
-from PyQt6.QtWidgets import (
- QApplication, QMainWindow, QPushButton, QWidget, QFrame, QLineEdit,
- QComboBox, QLabel, QToolBox, QProgressBar, QMenu, QAbstractButton, QInputDialog, QMessageBox, QStyle, QDialog,
- QLCDNumber, QGraphicsView, QSizePolicy, QVBoxLayout
-)
-from PyQt6.uic import loadUi
-from send2trash import send2trash
-
-
-try:
- from bt_controller_controller import BtControllerController, BtControllerCommand, BtControllerRequest, BtControllerState
- BT_AVAILABLE = True
-except:
- BT_AVAILABLE = False
-
-from camera_worker import CameraWorker, CaptureImagesRequest, CameraStates, PropertyChangeEvent, ConfigRequest
-from open_session_dialog import OpenSessionDialog
-from photo_browser import PhotoBrowser
-from settings_dialog import SettingsDialog
-from spinner import Spinner
-from camera_config_dialog import CameraConfigDialog
-
-
-class Session:
- def __init__(self, name, working_dir):
- self.images_dir_loaded = False
- self.preview_dir_loaded = False
-
- self.name = name
- self.session_dir = os.path.join(working_dir, self.name)
- self.preview_dir = os.path.join(self.session_dir, "test")
- self.images_dir = os.path.join(self.session_dir, "images")
- self.preview_count = 0
-
-
-# Corresponds to itemIndex of the captureView QToolBox
-class CaptureMode(Enum):
- Preview = 0
- RTI = 1
-
-
-class RTICaptureMainWindow(QMainWindow):
- find_camera = pyqtSignal()
-
- def __init__(self, parent=None):
- super().__init__(parent)
-
- self.logger = logging.getLogger(self.__class__.__name__)
-
- self.camera_worker = CameraWorker()
- self.__session: Session = None
- self.camera_state: CameraStates.StateType = None
- self.cam_config_dialog: CameraConfigDialog = None
-
- # Set up UI and find controls
- loadUi('ui/main_window.ui', self)
- self.disconnect_camera_button: QPushButton = self.findChild(QPushButton, "disconnectCameraButton")
- self.connect_camera_button: QPushButton = self.findChild(QPushButton, "connectCameraButton")
- self.camera_busy_spinner: Spinner = self.findChild(QWidget, "cameraBusySpinner")
- self.camera_state_label: QLabel = self.findChild(QLabel, "cameraStateLabel")
- self.camera_state_icon: QLabel = self.findChild(QLabel, "cameraStateIcon")
-
- self.bluetooth_frame: QFrame = self.findChild(QFrame, "bluetoothFrame")
- self.bluetooth_state_icon: QLabel = self.findChild(QLabel, "bluetoothStateLabel")
- self.bluetooth_connecting_spinner: Spinner = self.findChild(QWidget, "bluetoothConnectingSpinner")
-
- self.session_controls: QWidget = self.findChild(QWidget, "sessionControls")
- self.session_name_edit: QLineEdit = self.findChild(QLineEdit, "sessionNameEdit")
- self.start_session_button: QPushButton = self.findChild(QPushButton, "startSessionButton")
- self.close_session_button: QPushButton = self.findChild(QPushButton, "closeSessionButton")
- self.session_loading_spinner: Spinner = self.findChild(QWidget, "sessionLoadingSpinner")
- self.session_menu_button: QAbstractButton = self.findChild(QWidget, "sessionMenuButton")
-
- self.live_view_controls: QWidget = self.findChild(QWidget, "liveViewControls")
- self.toggle_live_view_button: QPushButton = self.findChild(QPushButton, "toggleLiveViewButton")
- self.autofocus_button: QPushButton = self.findChild(QPushButton, "autofocusButton")
- self.light_lcd_number: QLCDNumber = self.findChild(QLCDNumber, "lightLCDNumber")
- self.light_lcd_frame: QFrame = self.findChild(QFrame, "lightLCDFrame")
- self.live_view_error_label: QLabel = self.findChild(QLabel, "liveviewErrorLabel")
-
- self.preview_led_select: QComboBox = self.findChild(QComboBox, "previewLedSelect")
- self.preview_led_frame: QFrame = self.findChild(QFrame, "previewLedFrame")
-
- self.capture_view: QToolBox = self.findChild(QToolBox, "captureView")
- self.rtiPage: QWidget = self.findChild(QWidget, "rtiPage")
- self.previewPage: QWidget = self.findChild(QWidget, "previewPage")
- self.previewImageBrowser: PhotoBrowser = self.findChild(QWidget, "previewImageBrowser")
- self.rtiImageBrowser: PhotoBrowser = self.findChild(QWidget, "rtiImageBrowser")
- self.capture_button: QPushButton = self.findChild(QPushButton, "captureButton")
- self.cancel_capture_button: QPushButton = self.findChild(QPushButton, "cancelCaptureButton")
-
- self.rti_progress_view: QWidget = self.findChild(QWidget, "rtiProgressView")
- self.capture_progress_bar: QProgressBar = self.findChild(QProgressBar, "captureProgressBar")
- self.capture_status_label: QLabel = self.findChild(QLabel, "captureStatusLabel")
-
- self.camera_controls: QFrame = self.findChild(QFrame, "cameraControls")
- self.camera_config_controls: QWidget = self.findChild(QWidget, "cameraConfigControls")
- self.f_number_select: QComboBox = self.findChild(QComboBox, "fNumberSelect")
- self.shutter_speed_select: QComboBox = self.findChild(QComboBox, "shutterSpeedSelect")
- self.crop_select: QComboBox = self.findChild(QComboBox, "cropSelect")
- self.iso_select: QComboBox = self.findChild(QComboBox, "isoSelect")
-
- self.settings_button: QPushButton = self.findChild(QPushButton, "settingsButton")
-
- self.session_menu = QMenu(self)
- self.open_session_action = QAction('Vorherige Sitzung öffnen...', self)
- self.open_session_action.triggered.connect(self.open_existing_session_directory)
- self.open_session_action.setIcon(QIcon("ui/open.svg"))
- self.rename_session_action = QAction('Sitzung umbenennen...', self)
- self.rename_session_action.triggered.connect(self.rename_current_session)
- self.rename_session_action.setIcon(QIcon("ui/rename.svg"))
- self.session_menu.addActions([self.open_session_action, self.rename_session_action])
-
- self.settings_menu = QMenu(self)
- self.open_program_settings_action = QAction('Allgemeine Einstellungen')
- self.open_program_settings_action.triggered.connect(self.open_settings)
- self.open_program_settings_action.setIcon(QIcon("ui/general_settings.svg"))
- self.open_advanced_cam_config_action = QAction('Erweiterte Kamerakonfiguration')
- self.open_advanced_cam_config_action.triggered.connect(self.open_advanced_capture_settings)
- self.open_advanced_cam_config_action.setIcon(QIcon("ui/cam_settings.svg"))
- self.settings_menu.addActions([self.open_program_settings_action, self.open_advanced_cam_config_action])
-
- self.mirror_graphics_view: QGraphicsView | None = None
- self.second_screen_window: QDialog | None = None
-
- self.session_name_edit.textChanged.connect(
- lambda text: self.start_session_button.setEnabled(
- True if len(text) > 0 else False
- ))
-
- self.cancel_capture_button.setVisible(False)
-
- for i in range(60):
- self.preview_led_select.addItem(str(i + 1), i)
-
- self.set_camera_connection_busy(True)
- self.capture_mode = CaptureMode.Preview
- self.set_session(None)
-
- self.camera_thread = QThread()
- self.camera_worker.moveToThread(self.camera_thread)
- self.camera_worker.state_changed.connect(self.set_camera_state)
- self.camera_worker.events.config_updated.connect(self.on_config_update)
- self.camera_worker.property_changed.connect(self.on_property_change)
- self.camera_worker.preview_image.connect(lambda image: self.previewImageBrowser.show_preview(ImageQt(image.image)))
- self.camera_worker.initialized.connect(lambda: self.camera_worker.commands.find_camera.emit())
- self.camera_thread.started.connect(self.camera_worker.initialize)
- self.camera_thread.start()
-
- self.bt_controller: BtControllerController | None = None
-
- self.update_ui_bluetooth()
-
- self.init_mirror_view()
- QApplication.instance().screenAdded.connect(self.reset_mirror_view)
- QApplication.instance().screenRemoved.connect(self.reset_mirror_view)
- QApplication.instance().primaryScreenChanged.connect(self.reset_mirror_view)
-
-
- def init_mirror_view(self):
- screens = QApplication.screens()
- mirror_view_enabled = QSettings().value("enableSecondScreenMirror", type=bool)
- if mirror_view_enabled and len(screens) > 1:
- second_screen = screens[1]
- self.second_screen_window = QDialog()
- self.second_screen_window.setWindowFlags(Qt.WindowType.WindowStaysOnTopHint | Qt.WindowType.FramelessWindowHint)
- self.second_screen_window.setWindowTitle("Secondary View")
- self.second_screen_window.setGeometry(second_screen.availableGeometry())
- self.second_screen_window.showFullScreen()
-
- self.mirror_graphics_view = QGraphicsView(self.second_screen_window)
- self.mirror_graphics_view.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
- self.mirror_graphics_view.setBackgroundBrush(QBrush(QColor(30, 30, 30)))
- self.mirror_graphics_view.setRenderHint(QPainter.RenderHint.Antialiasing)
- layout = QVBoxLayout(self.second_screen_window)
- layout.setContentsMargins(0, 0, 0, 0)
- layout.addWidget(self.mirror_graphics_view)
- self.second_screen_window.setLayout(layout)
-
- self.update_mirror_view()
-
- def disable_mirror_view(self):
- if self.second_screen_window:
- self.second_screen_window.close()
- self.second_screen_window = None
- self.mirror_graphics_view = None
-
- def reset_mirror_view(self):
- self.disable_mirror_view()
- self.init_mirror_view()
-
- async def init_bluetooth(self):
- if not self.bt_controller:
- self.bt_controller = BtControllerController()
- self.bt_controller.state_changed.connect(self.update_ui_bluetooth)
- await self.bt_controller.connect()
-
- @property
- def capture_mode(self) -> CaptureMode:
- return CaptureMode(self.capture_view.currentIndex())
-
- @capture_mode.setter
- def capture_mode(self, mode: CaptureMode):
- self.capture_view.setCurrentIndex(mode.value)
- self.update_ui()
-
- def get_camera_state(self):
- return self.camera_state
-
- def set_camera_state(self, state: CameraStates.StateType):
- self.logger.debug("Handle camera state:" + state.__class__.__name__)
- self.camera_state = state
- self.update_ui()
-
- match state:
- case CameraStates.Waiting():
- pass
-
- case CameraStates.Found():
- self.camera_worker.commands.connect_camera.emit()
-
- case CameraStates.Disconnected():
- if state.auto_reconnect:
- self.camera_worker.commands.find_camera.emit()
-
- case CameraStates.Connecting():
- pass
-
- case CameraStates.Disconnecting():
- if self.cam_config_dialog:
- self.cam_config_dialog.reject()
-
- case CameraStates.ConnectionError():
- self.logger.error(state.error)
-
- case CameraStates.Ready():
- pass
-
- case CameraStates.LiveViewStarted():
- if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED:
- request = BtControllerRequest(BtControllerCommand.PILOT_LIGHT_ON)
- request.signals.success.connect(lambda: print("BT Success!"))
- request.signals.error.connect(lambda e: logging.exception(e))
- self.bt_controller.send_command(request)
-
- case CameraStates.LiveViewStopped():
- if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED:
- request = BtControllerRequest(BtControllerCommand.LED_OFF)
- request.signals.success.connect(lambda: print("BT Success!"))
- request.signals.error.connect(lambda e: logging.exception(e))
- self.bt_controller.send_command(request)
-
- case CameraStates.Disconnecting():
- pass
-
- case CameraStates.CaptureInProgress():
- pass
-
- case CameraStates.CaptureFinished():
- if self.capture_mode == CaptureMode.Preview:
- self.session.preview_count += 1
- else:
- self.write_lp()
- self.dump_camera_config()
-
- case CameraStates.CaptureCanceled():
- pass
-
- def update_ui(self):
- # variables on which the UI state depends
- camera_state = self.camera_state
-
- has_session = self.session is not None
- session_loaded = has_session \
- and self.session.preview_dir_loaded \
- and self.session.images_dir_loaded
- capture_mode = self.capture_mode
-
- # configure UI according to the capture mode
- for item_index in range(self.capture_view.count()):
- if item_index == self.capture_mode.value:
- self.capture_view.setItemIcon(item_index, QIcon("ui/chevron_down.svg"))
- else:
- self.capture_view.setItemIcon(item_index, QIcon("ui/chevron_right.svg"))
-
-
-
-
- # configure UI according to the state of the current session
- self.session_name_edit.setEnabled(not has_session)
- self.start_session_button.setVisible(not has_session)
- self.open_session_action.setEnabled(not has_session)
- self.rename_session_action.setEnabled(session_loaded)
-
- self.close_session_button.setVisible(has_session)
- self.close_session_button.setText("Sitzung beenden" if session_loaded else "Laden abbrechen...")
-
- self.session_loading_spinner.isAnimated = has_session and not session_loaded
- self.capture_view.setEnabled(has_session)
-
- self.capture_progress_bar.setMaximum(60)
- self.capture_progress_bar.setValue(self.rtiImageBrowser.num_files() if session_loaded else 0)
-
- if has_session:
- self.session_name_edit.setText(self.session.name)
-
- # configure UI according to the camera state
- match camera_state:
- case CameraStates.Waiting():
- self.camera_state_label.setText("Suche Kamera...")
- self.camera_state_icon.setPixmap(QPixmap("ui/camera_waiting.png"))
- self.open_advanced_cam_config_action.setEnabled(False)
-
- self.connect_camera_button.setEnabled(False)
- self.disconnect_camera_button.setVisible(False)
- self.camera_busy_spinner.isAnimated = True
- self.capture_status_label.setText(None)
-
- self.live_view_controls.setEnabled(False)
- self.light_lcd_frame.setEnabled(False)
- self.light_lcd_number.display(None)
- self.live_view_error_label.setText(None)
-
- self.camera_controls.setEnabled(False)
- self.camera_config_controls.setEnabled(False)
- self.capture_button.setText("Nicht verbunden")
- self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
- self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
-
- case CameraStates.Found():
- pass
-
- case CameraStates.Disconnected():
- self.camera_state_label.setText("Kamera getrennt
%s" % camera_state.camera_name)
- self.camera_state_icon.setPixmap(QPixmap("ui/camera_not_ok.png"))
-
- self.connect_camera_button.setEnabled(True)
- self.connect_camera_button.setVisible(True)
- self.disconnect_camera_button.setVisible(False)
- self.camera_busy_spinner.isAnimated = False
-
- self.toggle_live_view_button.setChecked(False)
- self.autofocus_button.setEnabled(False)
-
-
- self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
- self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
-
- case CameraStates.Connecting():
- self.camera_state_label.setText("Verbinde...
%s" % camera_state.camera_name)
- self.connect_camera_button.setEnabled(False)
- self.camera_busy_spinner.isAnimated = True
-
- case CameraStates.ConnectionError():
- pass
-
- case CameraStates.Ready():
- self.camera_state_label.setText("Kamera verbunden
%s" % camera_state.camera_name)
- self.camera_state_icon.setPixmap(QPixmap("ui/camera_ok.png"))
-
- self.open_advanced_cam_config_action.setEnabled(True)
-
- self.disconnect_camera_button.setEnabled(True)
- self.disconnect_camera_button.setVisible(True)
- self.connect_camera_button.setVisible(False)
- self.camera_busy_spinner.isAnimated = False
-
- self.live_view_controls.setEnabled(True)
- self.toggle_live_view_button.setChecked(False)
- self.autofocus_button.setEnabled(False)
-
- self.camera_controls.setEnabled(True if session_loaded else False)
- self.camera_config_controls.setEnabled(True)
- if self.capture_mode == CaptureMode.Preview:
- self.capture_button.setText("Vorschaubild aufnehmen")
- else:
- self.capture_button.setText("RTI-Aufnahme starten")
- self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
- self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
-
- case CameraStates.Disconnecting():
- self.camera_state_label.setText("Trenne Kamera...")
- self.disconnect_camera_button.setEnabled(False)
- self.disconnect_camera_button.setVisible(True)
- self.open_advanced_cam_config_action.setEnabled(False)
-
- self.live_view_controls.setEnabled(False)
-
- self.camera_controls.setEnabled(False)
- self.camera_config_controls.setEnabled(False)
- self.capture_button.setText("Nicht verbunden")
-
- case CameraStates.LiveViewStarted():
- self.camera_config_controls.setEnabled(False)
- self.autofocus_button.setEnabled(True)
- self.light_lcd_frame.setEnabled(True)
- self.update_lightmeter(camera_state.current_lightmeter_value)
-
- case CameraStates.LiveViewActive():
- pass
-
- case CameraStates.FocusStarted():
- self.autofocus_button.setEnabled(False)
-
- case CameraStates.FocusFinished():
- self.autofocus_button.setEnabled(True)
- if not camera_state.success:
- self.live_view_error_label.setText("Konnte nicht fokussieren. Zu dunkel?")
- else:
- self.live_view_error_label.setText(None)
-
- case CameraStates.LiveViewStopped():
- self.previewImageBrowser.show_preview(None)
- self.light_lcd_number.display(None)
- self.light_lcd_frame.setEnabled(False)
- self.live_view_error_label.setText(None)
-
-
- case CameraStates.CaptureInProgress():
- # if prev
- # disable combo boxes
- self.session_controls.setEnabled(False)
- self.disconnect_camera_button.setEnabled(False)
-
- self.live_view_controls.setEnabled(False)
- self.toggle_live_view_button.setChecked(False)
-
-
- self.capture_button.setVisible(False)
- self.cancel_capture_button.setVisible(True)
- self.cancel_capture_button.setEnabled(True)
- self.capture_status_label.setStyleSheet(None)
- self.capture_status_label.setText(None)
-
- self.camera_config_controls.setEnabled(False)
-
- if self.capture_mode == CaptureMode.Preview:
- self.capture_view.setItemEnabled(CaptureMode.RTI.value, False)
- else:
- self.capture_view.setItemEnabled(CaptureMode.Preview.value, False)
-
- self.capture_progress_bar.setMaximum(camera_state.capture_request.num_images)
- self.capture_progress_bar.setValue(camera_state.num_captured)
-
- case CameraStates.CaptureCancelling():
- self.cancel_capture_button.setEnabled(False)
-
- case CameraStates.CaptureCanceled():
- self.capture_status_label.setText("Aufnahme abgebrochen!")
- self.capture_status_label.setStyleSheet("color: red;")
-
- self.session_controls.setEnabled(True)
- self.cancel_capture_button.setVisible(False)
- self.capture_button.setVisible(True)
- self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
- self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
-
- case CameraStates.CaptureError():
- self.capture_status_label.setText("Fehler: %s" % str(camera_state.error))
- self.capture_status_label.setStyleSheet("color: red;")
-
- self.session_controls.setEnabled(True)
- self.cancel_capture_button.setVisible(False)
- self.capture_button.setVisible(True)
- self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
- self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
-
- case CameraStates.CaptureFinished():
- self.capture_status_label.setText("Fertig in %ss!" % str(camera_state.elapsed_time / 1000))
- self.capture_progress_bar.setValue(camera_state.num_captured)
- self.session_controls.setEnabled(True)
- self.cancel_capture_button.setVisible(False)
- self.capture_button.setVisible(True)
- self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
- self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
-
- def update_ui_bluetooth(self):
- if self.bt_controller is not None:
- self.bluetooth_frame.setVisible(True)
- self.preview_led_frame.setVisible(True)
-
- match self.bt_controller.state:
- case BtControllerState.DISCONNECTED:
- self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_disconnected.svg"))
- self.preview_led_select.setEnabled(False)
- self.bluetooth_connecting_spinner.isAnimated = False
- self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller getrennt")
- case BtControllerState.CONNECTING:
- self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_connecting.svg"))
- self.bluetooth_connecting_spinner.isAnimated = True
- self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird aufgebaut...")
- case BtControllerState.CONNECTED:
- self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_connected.svg"))
- self.preview_led_select.setEnabled(True)
- self.bluetooth_connecting_spinner.isAnimated = False
- self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller aktiv")
- case BtControllerState.DISCONNECTING:
- self.bluetooth_state_icon.setPixmap(QPixmap("ui/bluetooth_connecting.svg"))
- self.bluetooth_connecting_spinner.isAnimated = True
- self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird getrennt...")
-
- else:
- self.bluetooth_frame.setVisible(False)
- self.preview_led_frame.setVisible(False)
-
- @property
- def session(self) -> Session:
- return self.__session
-
- def set_session(self, _session):
- self.__session = _session
- self.update_ui()
-
- if _session is None:
- self.session_name_edit.clear()
- self.session_name_edit.setFocus()
- return
-
- os.makedirs(_session.session_dir, exist_ok=True)
- os.makedirs(_session.preview_dir, exist_ok=True)
- os.makedirs(_session.images_dir, exist_ok=True)
-
- # both browser will emit the directory_loaded signal connected to the
- # session_directory_loaded slot below (in Qt Designer/Creator, main_window.ui)
- self.previewImageBrowser.open_directory(self.session.preview_dir)
- self.rtiImageBrowser.open_directory(self.session.images_dir)
-
- def on_capture_mode_changed(self):
- self.update_mirror_view()
- if self.capture_mode == CaptureMode.RTI and isinstance(self.camera_state, CameraStates.LiveViewActive):
- self.camera_worker.commands.live_view.emit(False)
- self.update_ui()
-
- def update_mirror_view(self):
- if self.capture_mode == CaptureMode.Preview:
- self.previewImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view)
- else:
- self.rtiImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view)
-
- def open_settings(self):
- q_settings = QSettings()
- dialog = SettingsDialog(q_settings, self)
- dialog.setModal(True)
- if dialog.exec():
- for name, value in dialog.settings.items():
- q_settings.setValue(name, value)
- if name == "maxPixmapCache":
- QPixmapCache.setCacheLimit(value * 1024)
- elif name == "enableBluetooth":
- event_loop = asyncio.get_running_loop()
- if value is True:
- event_loop.create_task(self.init_bluetooth())
- elif self.bt_controller:
- self.bt_controller.bt_disconnect()
- self.update_ui_bluetooth()
- elif name == "enableSecondScreenMirror":
- self.reset_mirror_view()
-
- def open_advanced_capture_settings(self):
- def open_dialog(cfg: gp.CameraWidget):
- self.cam_config_dialog = CameraConfigDialog(cfg, self)
- if self.cam_config_dialog.exec():
- self.camera_worker.commands.set_config.emit(cfg)
- print(cfg.__dict__)
- self.cam_config_dialog = None
-
- req = ConfigRequest()
- req.signal.got_config.connect(open_dialog)
- self.camera_worker.commands.get_config.emit(req)
-
- def set_camera_connection_busy(self, busy: bool = True):
- self.connect_camera_button.setEnabled(not busy)
- self.disconnect_camera_button.setEnabled(not busy)
- self.camera_busy_spinner.isAnimated = busy
-
- def connect_camera(self):
- self.camera_worker.commands.connect_camera.emit()
-
- def disconnect_camera(self):
- self.camera_worker.commands.disconnect_camera.emit()
-
- def create_session(self):
- name = self.session_name_edit.text()
-
- print("Create" + name)
- session = Session(name, QSettings().value("workingDirectory"))
- if Path(session.session_dir).exists():
- result = QMessageBox.warning(self, "Fehler",
- "Sitzung %s existiert bereits. Soll sie erneut geöffnet werden?" % name,
- QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)
- if result == QMessageBox.StandardButton.No:
- return
-
- self.set_session(session)
-
- def session_directory_loaded(self, path):
- if not self.session:
- return
-
- if os.path.normpath(path) == os.path.normpath(self.session.preview_dir):
- self.session.preview_dir_loaded = True
- self.session.preview_count = self.previewImageBrowser.last_index()
-
- elif os.path.normpath(path) == os.path.normpath(self.session.images_dir):
- self.session.images_dir_loaded = True
-
- self.update_ui()
-
- def close_session(self):
- self.previewImageBrowser.close_directory()
- self.rtiImageBrowser.close_directory()
- self.set_session(None)
-
- def show_session_menu(self):
- self.session_menu.exec(self.session_menu_button.mapToGlobal(self.session_menu_button.rect().bottomLeft()))
-
- def show_settings_menu(self):
- self.settings_menu.exec(self.settings_button.mapToGlobal(self.session_menu_button.rect().bottomLeft()))
-
- def open_existing_session_directory(self):
- working_dir = QSettings().value("workingDirectory")
- dialog = OpenSessionDialog(working_dir, self)
- path = dialog.get_session_path()
- if path:
- session_name = Path(path).name
- self.set_session(Session(session_name, working_dir))
-
- def rename_current_session(self):
- new_name, ok = QInputDialog.getText(self, "Aktuelle Sitzung umbenennen", "Neuer Name", text=self.session.name)
- if ok:
- session_dir = self.session.session_dir
- session_dir_parent = Path(session_dir).parent
- new_session_dir = os.path.join(session_dir_parent, os.path.join(session_dir_parent, new_name))
-
- if Path(new_session_dir).exists():
- QMessageBox.critical(self, "Fehler", "Sitzung %s existiert bereits." % new_name)
- return
-
- image_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)]
- preview_files = [os.path.join(self.session.preview_dir, f) for f in os.listdir(self.session.preview_dir)]
-
- for file in image_files + preview_files:
- path = Path(file)
- parent_path = path.parent
- file_name = path.name
- basename, ext = os.path.splitext(file_name)
- if basename.startswith(self.session.name):
- new_filename = basename.replace(self.session.name, new_name, 1) + ext
- new_path = os.path.join(parent_path, new_filename)
- os.rename(path, new_path)
-
- os.rename(session_dir, new_session_dir)
- self.close_session()
- self.set_session(Session(new_name, session_dir_parent))
-
- def write_lp(self):
- file_names = [os.path.basename(file_path) for file_path in self.rtiImageBrowser.files()]
- num_files = len(file_names)
- if num_files != 60:
- logging.warning("Wrong number of files, not writing LP file.")
- return
-
- lp_template_path = "cceh-dome-template.lp"
- lp_output_path = os.path.join(self.session.images_dir, self.session.name + ".lp")
- with open(lp_template_path, 'r') as lp_template_file, open(lp_output_path, 'w') as lp_output_file:
- logging.info("Writing LP file: " + lp_output_path)
- lp_output_file.write(str(num_files) + "\n")
- for i, input_line in enumerate(lp_template_file):
- output_line = file_names[i] + " " + input_line
- lp_output_file.write(output_line)
-
- def dump_camera_config(self):
- output_path = os.path.join(self.session.images_dir, "camera_config.json")
- self.logger.info(f"Writing camera configuration dump: {output_path}")
-
- if not self.session:
- return
- def on_got_config(cfg: gp.CameraWidget):
- cfg_dict = {}
-
- def traverse_widget(widget, widget_dict):
- widget_type = widget.get_type()
-
- if widget_type == gp.GP_WIDGET_SECTION or widget_type == gp.GP_WIDGET_WINDOW:
- # If the widget is a section, traverse its children
- child_count = widget.count_children()
- for i in range(child_count):
- child = widget.get_child(i)
- child_dict = {}
- traverse_widget(child, child_dict)
- widget_dict[child.get_name()] = child_dict
- else:
- try:
- widget_dict['value'] = widget.get_value()
- except gp.GPhoto2Error as err:
- if err.code == -2:
- self.logger.warning(f"Could not get config value for {cfg.get_label()} ({cfg.get_name()}).")
-
- widget_dict['label'] = widget.get_label()
-
- return widget_dict
-
- traverse_widget(cfg, cfg_dict)
-
- try:
- with open(output_path, "w") as output_file:
- json.dump(cfg_dict, output_file, indent=4)
- except Exception as e:
- self.logger.error(f"Could not write camera config dump to {output_path}:")
- self.logger.exception(e)
-
-
- req = ConfigRequest()
- req.signal.got_config.connect(on_got_config)
- self.camera_worker.commands.get_config.emit(req)
-
-
- def config_hookup_select(self, config: gp.CameraWidget, config_name, combo_box: QComboBox, value_map: dict = None):
- try:
- combo_box.currentIndexChanged.disconnect()
- except:
- pass
- cfg = config.get_child_by_name(config_name)
- combo_box.clear()
- for idx, choice in enumerate(cfg.get_choices()):
- choice_label = value_map[choice] if value_map and choice in value_map else choice
- combo_box.addItem(choice_label, choice)
- if choice == cfg.get_value():
- combo_box.setCurrentIndex(idx)
-
- combo_box.currentIndexChanged.connect(lambda: self.camera_worker.commands.set_single_config.emit(
- config_name, combo_box.currentData()
- ))
-
- def on_config_update(self, config: gp.CameraWidget):
- self.config_hookup_select(config, "iso", self.iso_select)
- self.config_hookup_select(config, "f-number", self.f_number_select)
- self.config_hookup_select(config, "shutterspeed2", self.shutter_speed_select)
- self.config_hookup_select(config, "d030", self.crop_select, {
- "0": "Voll",
- "1": "Klein",
- "2": "Mittel",
- "3": "Mittel 2"
- })
-
- def on_property_change(self, event: PropertyChangeEvent):
- match event.property_name:
- case "lightmeter":
- if isinstance(self.camera_state, CameraStates.LiveViewActive):
- self.update_lightmeter(event.value)
-
- def update_lightmeter(self, value):
- if isinstance(value, float):
- self.light_lcd_number.display(int(value))
- else:
- self.light_lcd_number.display(None)
-
- def enable_live_view(self, enable: bool):
- self.camera_worker.commands.live_view.emit(enable)
-
- def trigger_autofocus(self):
- self.camera_worker.commands.trigger_autofocus.emit()
-
- def capture_image(self):
- capture_req: CaptureImagesRequest
-
- # Capture Previews
- if self.capture_mode == CaptureMode.Preview:
- filename_template = self.session.name.replace(" ", "_") + "_test_" + str(
- self.session.preview_count + 1) + "${extension}"
- file_path_template = os.path.join(self.session.preview_dir, filename_template)
- capture_req = CaptureImagesRequest(file_path_template, num_images=1, image_quality="JPEG Fine")
-
- # Capture RTI Series
- else:
- if self.rtiImageBrowser.num_files() > 0:
- message_box = QMessageBox(QMessageBox.Icon.Warning, "RTI-Serie aufnehmen",
- "Vorhandene Aufnahmen werden gelöscht.")
- message_box.addButton(
- QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogCancelButton), "Abbrechen"),
- QMessageBox.ButtonRole.NoRole)
- message_box.addButton(
- QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogOkButton), "Fortfahren"),
- QMessageBox.ButtonRole.YesRole)
- if not message_box.exec():
- return
-
- existing_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)]
- send2trash(existing_files)
-
- filename_template = self.session.name.replace(" ", "_") + "_${num}${extension}"
- file_path_template = os.path.join(self.session.images_dir, filename_template)
- capture_req = CaptureImagesRequest(file_path_template, num_images=60, expect_files=2,
- max_burst=int(QSettings().value("maxBurstNumber")), skip=0, image_quality="NEF+Fine")
- self.capture_progress_bar.setMaximum(119)
- self.capture_progress_bar.setValue(0)
-
- def on_file_received(path: str):
- print("Rec: " + path)
- capture_req.signal.file_received.connect(on_file_received)
-
- def start_capture(show_button_message: bool):
- if self.capture_mode == CaptureMode.RTI and show_button_message:
- press_buttons_dialog = QDialog()
- loadUi("ui/press-buttons-dialog.ui", press_buttons_dialog)
- if not press_buttons_dialog.exec():
- return
-
- self.camera_worker.commands.capture_images.emit(capture_req)
-
- if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED:
- initial_led = 0 if self.capture_mode == CaptureMode.RTI else self.preview_led_select.currentData()
- request = BtControllerRequest(BtControllerCommand.SET_LED, initial_led)
- request.signals.success.connect(lambda: start_capture(False))
- request.signals.error.connect(lambda: start_capture(True))
- self.bt_controller.send_command(request)
- else:
- start_capture(True)
-
-
- def on_capture_cancelled(self):
- logging.info("Capture cancelled")
-
- def cancel_capture(self):
- self.cancel_capture_button.setEnabled(False)
- self.camera_worker.commands.cancel.emit()
-
- def closeEvent(self, event: QCloseEvent):
- QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
- self.camera_thread.requestInterruption()
- self.camera_thread.exit()
- if self.bt_controller and self.bt_controller.state != BtControllerState.DISCONNECTED:
- self.bt_controller.bt_disconnect()
- self.camera_thread.wait()
- if self.second_screen_window:
- self.second_screen_window.close()
-
- super().closeEvent(event)
-
-if __name__ == "__main__":
- win: RTICaptureMainWindow
-
- logging.basicConfig(
- format='%(levelname)s: %(name)s: %(message)s', level=logging.INFO)
-
- app = QApplication(sys.argv)
- app.setOrganizationName("CCeH")
- app.setOrganizationDomain("cceh.uni-koeln.de")
- app.setApplicationName("Byzanz RTI")
-
- settings = QSettings()
- if "workingDirectory" not in settings.allKeys():
- settings.setValue("workingDirectory",
- QStandardPaths.writableLocation(QStandardPaths.StandardLocation.PicturesLocation))
-
- if "maxPixmapCache" not in settings.allKeys():
- settings.setValue("maxPixmapCache", 1024)
-
- if "maxBurstNumber" not in settings.allKeys():
- settings.setValue("maxBurstNumber", 60)
-
- if "enableBluetooth" not in settings.allKeys():
- settings.setValue("enableBluetooth", True)
-
- if "enableSecondScreenMirror" not in settings.allKeys():
- settings.setValue("enableSecondScreenMirror", True)
-
- QPixmapCache.setCacheLimit(int(settings.value("maxPixmapCache")) * 1024)
-
- loop: qasync.QEventLoop = qasync.QEventLoop(app)
- asyncio.set_event_loop(loop)
-
- app_close_event = asyncio.Event()
- app.aboutToQuit.connect(app_close_event.set)
-
- orig_exceptionhook = sys.__excepthook__
-
- win = RTICaptureMainWindow()
- win.show()
-
- def excepthook(exc_type, exc_value, exc_traceback):
- logging.exception(msg="Exception", exc_info=(exc_type, exc_value, exc_traceback))
- if win.bt_controller and win.bt_controller.state != BtControllerState.DISCONNECTED:
- win.bt_controller.bt_disconnect()
-
- loop.call_soon(lambda _loop: _loop.stop(), loop)
-
-
- # sys.excepthook = excepthook
- # threading.excepthook = excepthook
-
-
- with loop:
- if BT_AVAILABLE and QSettings().value("enableBluetooth", type=bool):
- loop.create_task(win.init_bluetooth())
- else:
- logging.info("Bluetooth not available. Is bleak installed?")
-
- loop.run_until_complete(app_close_event.wait())
-
- # asyncio.get_running_loop().run_forever()
-
-
+import asyncio.exceptions
+import json
+import logging
+import os
+import sys
+import threading
+from enum import Enum
+from pathlib import Path
+
+import gphoto2 as gp
+import qasync
+from qasync import QEventLoop, QThreadExecutor
+from PIL.ImageQt import ImageQt
+from PyQt6.QtCore import QThread, QSettings, QStandardPaths, pyqtSignal, Qt
+from PyQt6.QtGui import QPixmap, QAction, QPixmapCache, QIcon, QColor, QCloseEvent, QBrush, QPainter, QCursor
+from PyQt6.QtWidgets import (
+ QApplication, QMainWindow, QPushButton, QWidget, QFrame, QLineEdit,
+ QComboBox, QLabel, QToolBox, QProgressBar, QMenu, QAbstractButton, QInputDialog, QMessageBox, QStyle, QDialog,
+ QLCDNumber, QGraphicsView, QSizePolicy, QVBoxLayout
+)
+from PyQt6.uic import loadUi
+from send2trash import send2trash
+
+from helpers import get_ui_path
+
+try:
+ from bt_controller_controller import BtControllerController, BtControllerCommand, BtControllerRequest, BtControllerState
+ BT_AVAILABLE = True
+except:
+ BT_AVAILABLE = False
+
+from camera_worker import CameraWorker, CaptureImagesRequest, CameraStates, PropertyChangeEvent, ConfigRequest
+from open_session_dialog import OpenSessionDialog
+from photo_browser import PhotoBrowser
+from settings_dialog import SettingsDialog
+from spinner import Spinner
+from camera_config_dialog import CameraConfigDialog
+
+
+class Session:
+ def __init__(self, name, working_dir):
+ self.images_dir_loaded = False
+ self.preview_dir_loaded = False
+
+ self.name = name
+ self.session_dir = os.path.join(working_dir, self.name)
+ self.preview_dir = os.path.join(self.session_dir, "test")
+ self.images_dir = os.path.join(self.session_dir, "images")
+ self.preview_count = 0
+
+
+# Corresponds to itemIndex of the captureView QToolBox
+class CaptureMode(Enum):
+ Preview = 0
+ RTI = 1
+
+
+class RTICaptureMainWindow(QMainWindow):
+ find_camera = pyqtSignal()
+
+ def __init__(self, parent=None):
+ super().__init__(parent)
+
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ self.camera_worker = CameraWorker()
+ self.__session: Session = None
+ self.camera_state: CameraStates.StateType = None
+ self.cam_config_dialog: CameraConfigDialog = None
+
+ # Set up UI and find controls
+ loadUi(get_ui_path('ui/main_window.ui'), self)
+ self.disconnect_camera_button: QPushButton = self.findChild(QPushButton, "disconnectCameraButton")
+ self.connect_camera_button: QPushButton = self.findChild(QPushButton, "connectCameraButton")
+ self.camera_busy_spinner: Spinner = self.findChild(QWidget, "cameraBusySpinner")
+ self.camera_state_label: QLabel = self.findChild(QLabel, "cameraStateLabel")
+ self.camera_state_icon: QLabel = self.findChild(QLabel, "cameraStateIcon")
+
+ self.bluetooth_frame: QFrame = self.findChild(QFrame, "bluetoothFrame")
+ self.bluetooth_state_icon: QLabel = self.findChild(QLabel, "bluetoothStateLabel")
+ self.bluetooth_connecting_spinner: Spinner = self.findChild(QWidget, "bluetoothConnectingSpinner")
+
+ self.session_controls: QWidget = self.findChild(QWidget, "sessionControls")
+ self.session_name_edit: QLineEdit = self.findChild(QLineEdit, "sessionNameEdit")
+ self.start_session_button: QPushButton = self.findChild(QPushButton, "startSessionButton")
+ self.close_session_button: QPushButton = self.findChild(QPushButton, "closeSessionButton")
+ self.session_loading_spinner: Spinner = self.findChild(QWidget, "sessionLoadingSpinner")
+ self.session_menu_button: QAbstractButton = self.findChild(QWidget, "sessionMenuButton")
+
+ self.live_view_controls: QWidget = self.findChild(QWidget, "liveViewControls")
+ self.toggle_live_view_button: QPushButton = self.findChild(QPushButton, "toggleLiveViewButton")
+ self.autofocus_button: QPushButton = self.findChild(QPushButton, "autofocusButton")
+ self.light_lcd_number: QLCDNumber = self.findChild(QLCDNumber, "lightLCDNumber")
+ self.light_lcd_frame: QFrame = self.findChild(QFrame, "lightLCDFrame")
+ self.live_view_error_label: QLabel = self.findChild(QLabel, "liveviewErrorLabel")
+
+ self.preview_led_select: QComboBox = self.findChild(QComboBox, "previewLedSelect")
+ self.preview_led_frame: QFrame = self.findChild(QFrame, "previewLedFrame")
+
+ self.capture_view: QToolBox = self.findChild(QToolBox, "captureView")
+ self.rtiPage: QWidget = self.findChild(QWidget, "rtiPage")
+ self.previewPage: QWidget = self.findChild(QWidget, "previewPage")
+ self.previewImageBrowser: PhotoBrowser = self.findChild(QWidget, "previewImageBrowser")
+ self.rtiImageBrowser: PhotoBrowser = self.findChild(QWidget, "rtiImageBrowser")
+ self.capture_button: QPushButton = self.findChild(QPushButton, "captureButton")
+ self.cancel_capture_button: QPushButton = self.findChild(QPushButton, "cancelCaptureButton")
+
+ self.rti_progress_view: QWidget = self.findChild(QWidget, "rtiProgressView")
+ self.capture_progress_bar: QProgressBar = self.findChild(QProgressBar, "captureProgressBar")
+ self.capture_status_label: QLabel = self.findChild(QLabel, "captureStatusLabel")
+
+ self.camera_controls: QFrame = self.findChild(QFrame, "cameraControls")
+ self.camera_config_controls: QWidget = self.findChild(QWidget, "cameraConfigControls")
+ self.f_number_select: QComboBox = self.findChild(QComboBox, "fNumberSelect")
+ self.shutter_speed_select: QComboBox = self.findChild(QComboBox, "shutterSpeedSelect")
+ self.crop_select: QComboBox = self.findChild(QComboBox, "cropSelect")
+ self.iso_select: QComboBox = self.findChild(QComboBox, "isoSelect")
+
+ self.settings_button: QPushButton = self.findChild(QPushButton, "settingsButton")
+
+ self.session_menu = QMenu(self)
+ self.open_session_action = QAction('Vorherige Sitzung öffnen...', self)
+ self.open_session_action.triggered.connect(self.open_existing_session_directory)
+ self.open_session_action.setIcon(QIcon(get_ui_path("ui/open.svg")))
+ self.rename_session_action = QAction('Sitzung umbenennen...', self)
+ self.rename_session_action.triggered.connect(self.rename_current_session)
+ self.rename_session_action.setIcon(QIcon(get_ui_path("ui/rename.svg")))
+ self.session_menu.addActions([self.open_session_action, self.rename_session_action])
+
+ self.settings_menu = QMenu(self)
+ self.open_program_settings_action = QAction('Allgemeine Einstellungen')
+ self.open_program_settings_action.triggered.connect(self.open_settings)
+ self.open_program_settings_action.setIcon(QIcon(get_ui_path("ui/general_settings.svg")))
+ self.open_advanced_cam_config_action = QAction('Erweiterte Kamerakonfiguration')
+ self.open_advanced_cam_config_action.triggered.connect(self.open_advanced_capture_settings)
+ self.open_advanced_cam_config_action.setIcon(QIcon(get_ui_path("ui/cam_settings.svg")))
+ self.settings_menu.addActions([self.open_program_settings_action, self.open_advanced_cam_config_action])
+
+ self.mirror_graphics_view: QGraphicsView | None = None
+ self.second_screen_window: QDialog | None = None
+
+ self.session_name_edit.textChanged.connect(
+ lambda text: self.start_session_button.setEnabled(
+ True if len(text) > 0 else False
+ ))
+
+ self.cancel_capture_button.setVisible(False)
+
+ for i in range(60):
+ self.preview_led_select.addItem(str(i + 1), i)
+
+ self.set_camera_connection_busy(True)
+ self.capture_mode = CaptureMode.Preview
+ self.set_session(None)
+
+ self.camera_thread = QThread()
+ self.camera_worker.moveToThread(self.camera_thread)
+ self.camera_worker.state_changed.connect(self.set_camera_state)
+ self.camera_worker.events.config_updated.connect(self.on_config_update)
+ self.camera_worker.property_changed.connect(self.on_property_change)
+ self.camera_worker.preview_image.connect(lambda image: self.previewImageBrowser.show_preview(ImageQt(image.image)))
+ self.camera_worker.initialized.connect(lambda: self.camera_worker.commands.find_camera.emit())
+ self.camera_thread.started.connect(self.camera_worker.initialize)
+ self.camera_thread.start()
+
+ self.bt_controller: BtControllerController | None = None
+
+ self.update_ui_bluetooth()
+
+ self.init_mirror_view()
+ QApplication.instance().screenAdded.connect(self.reset_mirror_view)
+ QApplication.instance().screenRemoved.connect(self.reset_mirror_view)
+ QApplication.instance().primaryScreenChanged.connect(self.reset_mirror_view)
+
+
+ def init_mirror_view(self):
+ screens = QApplication.screens()
+ mirror_view_enabled = QSettings().value("enableSecondScreenMirror", type=bool)
+ if mirror_view_enabled and len(screens) > 1:
+ second_screen = screens[1]
+ self.second_screen_window = QDialog()
+ self.second_screen_window.setWindowFlags(Qt.WindowType.WindowStaysOnTopHint | Qt.WindowType.FramelessWindowHint)
+ self.second_screen_window.setWindowTitle("Secondary View")
+ self.second_screen_window.setGeometry(second_screen.availableGeometry())
+ self.second_screen_window.showFullScreen()
+
+ self.mirror_graphics_view = QGraphicsView(self.second_screen_window)
+ self.mirror_graphics_view.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding)
+ self.mirror_graphics_view.setBackgroundBrush(QBrush(QColor(30, 30, 30)))
+ self.mirror_graphics_view.setRenderHint(QPainter.RenderHint.Antialiasing)
+ layout = QVBoxLayout(self.second_screen_window)
+ layout.setContentsMargins(0, 0, 0, 0)
+ layout.addWidget(self.mirror_graphics_view)
+ self.second_screen_window.setLayout(layout)
+
+ self.update_mirror_view()
+
+ def disable_mirror_view(self):
+ if self.second_screen_window:
+ self.second_screen_window.close()
+ self.second_screen_window = None
+ self.mirror_graphics_view = None
+
+ def reset_mirror_view(self):
+ self.disable_mirror_view()
+ self.init_mirror_view()
+
+ async def init_bluetooth(self):
+ if not self.bt_controller:
+ self.bt_controller = BtControllerController()
+ self.bt_controller.state_changed.connect(self.update_ui_bluetooth)
+ await self.bt_controller.connect()
+
+ @property
+ def capture_mode(self) -> CaptureMode:
+ return CaptureMode(self.capture_view.currentIndex())
+
+ @capture_mode.setter
+ def capture_mode(self, mode: CaptureMode):
+ self.capture_view.setCurrentIndex(mode.value)
+ self.update_ui()
+
+ def get_camera_state(self):
+ return self.camera_state
+
+ def set_camera_state(self, state: CameraStates.StateType):
+ self.logger.debug("Handle camera state:" + state.__class__.__name__)
+ self.camera_state = state
+ self.update_ui()
+
+ match state:
+ case CameraStates.Waiting():
+ pass
+
+ case CameraStates.Found():
+ self.camera_worker.commands.connect_camera.emit()
+
+ case CameraStates.Disconnected():
+ if state.auto_reconnect:
+ self.camera_worker.commands.find_camera.emit()
+
+ case CameraStates.Connecting():
+ pass
+
+ case CameraStates.Disconnecting():
+ if self.cam_config_dialog:
+ self.cam_config_dialog.reject()
+
+ case CameraStates.ConnectionError():
+ self.logger.error(state.error)
+
+ case CameraStates.Ready():
+ pass
+
+ case CameraStates.LiveViewStarted():
+ if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED:
+ request = BtControllerRequest(BtControllerCommand.PILOT_LIGHT_ON)
+ request.signals.success.connect(lambda: print("BT Success!"))
+ request.signals.error.connect(lambda e: logging.exception(e))
+ self.bt_controller.send_command(request)
+
+ case CameraStates.LiveViewStopped():
+ if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED:
+ request = BtControllerRequest(BtControllerCommand.LED_OFF)
+ request.signals.success.connect(lambda: print("BT Success!"))
+ request.signals.error.connect(lambda e: logging.exception(e))
+ self.bt_controller.send_command(request)
+
+ case CameraStates.Disconnecting():
+ pass
+
+ case CameraStates.CaptureInProgress():
+ pass
+
+ case CameraStates.CaptureFinished():
+ if self.capture_mode == CaptureMode.Preview:
+ self.session.preview_count += 1
+ else:
+ self.write_lp()
+ self.dump_camera_config()
+
+ case CameraStates.CaptureCanceled():
+ pass
+
+ def update_ui(self):
+ # variables on which the UI state depends
+ camera_state = self.camera_state
+
+ has_session = self.session is not None
+ session_loaded = has_session \
+ and self.session.preview_dir_loaded \
+ and self.session.images_dir_loaded
+ capture_mode = self.capture_mode
+
+ # configure UI according to the capture mode
+ for item_index in range(self.capture_view.count()):
+ if item_index == self.capture_mode.value:
+ self.capture_view.setItemIcon(item_index, QIcon(get_ui_path("ui/chevron_down.svg")))
+ else:
+ self.capture_view.setItemIcon(item_index, QIcon(get_ui_path("ui/chevron_right.svg")))
+
+
+
+
+ # configure UI according to the state of the current session
+ self.session_name_edit.setEnabled(not has_session)
+ self.start_session_button.setVisible(not has_session)
+ self.open_session_action.setEnabled(not has_session)
+ self.rename_session_action.setEnabled(session_loaded)
+
+ self.close_session_button.setVisible(has_session)
+ self.close_session_button.setText("Sitzung beenden" if session_loaded else "Laden abbrechen...")
+
+ self.session_loading_spinner.isAnimated = has_session and not session_loaded
+ self.capture_view.setEnabled(has_session)
+
+ self.capture_progress_bar.setMaximum(60)
+ self.capture_progress_bar.setValue(self.rtiImageBrowser.num_files() if session_loaded else 0)
+
+ if has_session:
+ self.session_name_edit.setText(self.session.name)
+
+ # configure UI according to the camera state
+ match camera_state:
+ case CameraStates.Waiting():
+ self.camera_state_label.setText("Suche Kamera...")
+ self.camera_state_icon.setPixmap(QPixmap(get_ui_path("ui/camera_waiting.png")))
+ self.open_advanced_cam_config_action.setEnabled(False)
+
+ self.connect_camera_button.setEnabled(False)
+ self.disconnect_camera_button.setVisible(False)
+ self.camera_busy_spinner.isAnimated = True
+ self.capture_status_label.setText(None)
+
+ self.live_view_controls.setEnabled(False)
+ self.light_lcd_frame.setEnabled(False)
+ self.light_lcd_number.display(None)
+ self.live_view_error_label.setText(None)
+
+ self.camera_controls.setEnabled(False)
+ self.camera_config_controls.setEnabled(False)
+ self.capture_button.setText("Nicht verbunden")
+ self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
+ self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
+
+ case CameraStates.Found():
+ pass
+
+ case CameraStates.Disconnected():
+ self.camera_state_label.setText("Kamera getrennt
%s" % camera_state.camera_name)
+ self.camera_state_icon.setPixmap(QPixmap(get_ui_path("ui/camera_not_ok.png")))
+
+ self.connect_camera_button.setEnabled(True)
+ self.connect_camera_button.setVisible(True)
+ self.disconnect_camera_button.setVisible(False)
+ self.camera_busy_spinner.isAnimated = False
+
+ self.toggle_live_view_button.setChecked(False)
+ self.autofocus_button.setEnabled(False)
+
+
+ self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
+ self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
+
+ case CameraStates.Connecting():
+ self.camera_state_label.setText("Verbinde...
%s" % camera_state.camera_name)
+ self.connect_camera_button.setEnabled(False)
+ self.camera_busy_spinner.isAnimated = True
+
+ case CameraStates.ConnectionError():
+ pass
+
+ case CameraStates.Ready():
+ self.camera_state_label.setText("Kamera verbunden
%s" % camera_state.camera_name)
+ self.camera_state_icon.setPixmap(QPixmap(get_ui_path("ui/camera_ok.png")))
+
+ self.open_advanced_cam_config_action.setEnabled(True)
+
+ self.disconnect_camera_button.setEnabled(True)
+ self.disconnect_camera_button.setVisible(True)
+ self.connect_camera_button.setVisible(False)
+ self.camera_busy_spinner.isAnimated = False
+
+ self.live_view_controls.setEnabled(True)
+ self.toggle_live_view_button.setChecked(False)
+ self.autofocus_button.setEnabled(False)
+
+ self.camera_controls.setEnabled(True if session_loaded else False)
+ self.camera_config_controls.setEnabled(True)
+ if self.capture_mode == CaptureMode.Preview:
+ self.capture_button.setText("Vorschaubild aufnehmen")
+ else:
+ self.capture_button.setText("RTI-Aufnahme starten")
+ self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
+ self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
+
+ case CameraStates.Disconnecting():
+ self.camera_state_label.setText("Trenne Kamera...")
+ self.disconnect_camera_button.setEnabled(False)
+ self.disconnect_camera_button.setVisible(True)
+ self.open_advanced_cam_config_action.setEnabled(False)
+
+ self.live_view_controls.setEnabled(False)
+
+ self.camera_controls.setEnabled(False)
+ self.camera_config_controls.setEnabled(False)
+ self.capture_button.setText("Nicht verbunden")
+
+ case CameraStates.LiveViewStarted():
+ self.camera_config_controls.setEnabled(False)
+ self.autofocus_button.setEnabled(True)
+ self.light_lcd_frame.setEnabled(True)
+ self.update_lightmeter(camera_state.current_lightmeter_value)
+
+ case CameraStates.LiveViewActive():
+ pass
+
+ case CameraStates.FocusStarted():
+ self.autofocus_button.setEnabled(False)
+
+ case CameraStates.FocusFinished():
+ self.autofocus_button.setEnabled(True)
+ if not camera_state.success:
+ self.live_view_error_label.setText("Konnte nicht fokussieren. Zu dunkel?")
+ else:
+ self.live_view_error_label.setText(None)
+
+ case CameraStates.LiveViewStopped():
+ self.previewImageBrowser.show_preview(None)
+ self.light_lcd_number.display(None)
+ self.light_lcd_frame.setEnabled(False)
+ self.live_view_error_label.setText(None)
+
+
+ case CameraStates.CaptureInProgress():
+ # if prev
+ # disable combo boxes
+ self.session_controls.setEnabled(False)
+ self.disconnect_camera_button.setEnabled(False)
+
+ self.live_view_controls.setEnabled(False)
+ self.toggle_live_view_button.setChecked(False)
+
+
+ self.capture_button.setVisible(False)
+ self.cancel_capture_button.setVisible(True)
+ self.cancel_capture_button.setEnabled(True)
+ self.capture_status_label.setStyleSheet(None)
+ self.capture_status_label.setText(None)
+
+ self.camera_config_controls.setEnabled(False)
+
+ if self.capture_mode == CaptureMode.Preview:
+ self.capture_view.setItemEnabled(CaptureMode.RTI.value, False)
+ else:
+ self.capture_view.setItemEnabled(CaptureMode.Preview.value, False)
+
+ self.capture_progress_bar.setMaximum(camera_state.capture_request.num_images)
+ self.capture_progress_bar.setValue(camera_state.num_captured)
+
+ case CameraStates.CaptureCancelling():
+ self.cancel_capture_button.setEnabled(False)
+
+ case CameraStates.CaptureCanceled():
+ self.capture_status_label.setText("Aufnahme abgebrochen!")
+ self.capture_status_label.setStyleSheet("color: red;")
+
+ self.session_controls.setEnabled(True)
+ self.cancel_capture_button.setVisible(False)
+ self.capture_button.setVisible(True)
+ self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
+ self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
+
+ case CameraStates.CaptureError():
+ self.capture_status_label.setText("Fehler: %s" % str(camera_state.error))
+ self.capture_status_label.setStyleSheet("color: red;")
+
+ self.session_controls.setEnabled(True)
+ self.cancel_capture_button.setVisible(False)
+ self.capture_button.setVisible(True)
+ self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
+ self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
+
+ case CameraStates.CaptureFinished():
+ self.capture_status_label.setText("Fertig in %ss!" % str(camera_state.elapsed_time / 1000))
+ self.capture_progress_bar.setValue(camera_state.num_captured)
+ self.session_controls.setEnabled(True)
+ self.cancel_capture_button.setVisible(False)
+ self.capture_button.setVisible(True)
+ self.capture_view.setItemEnabled(CaptureMode.Preview.value, True)
+ self.capture_view.setItemEnabled(CaptureMode.RTI.value, True)
+
+ def update_ui_bluetooth(self):
+ if self.bt_controller is not None:
+ self.bluetooth_frame.setVisible(True)
+ self.preview_led_frame.setVisible(True)
+
+ match self.bt_controller.state:
+ case BtControllerState.DISCONNECTED:
+ self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_disconnected.svg")))
+ self.preview_led_select.setEnabled(False)
+ self.bluetooth_connecting_spinner.isAnimated = False
+ self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller getrennt")
+ case BtControllerState.CONNECTING:
+ self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_connecting.svg")))
+ self.bluetooth_connecting_spinner.isAnimated = True
+ self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird aufgebaut...")
+ case BtControllerState.CONNECTED:
+ self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_connected.svg")))
+ self.preview_led_select.setEnabled(True)
+ self.bluetooth_connecting_spinner.isAnimated = False
+ self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller aktiv")
+ case BtControllerState.DISCONNECTING:
+ self.bluetooth_state_icon.setPixmap(QPixmap(get_ui_path("ui/bluetooth_connecting.svg")))
+ self.bluetooth_connecting_spinner.isAnimated = True
+ self.bluetooth_frame.setToolTip("Bluetooth-Verbindung zum Controller wird getrennt...")
+
+ else:
+ self.bluetooth_frame.setVisible(False)
+ self.preview_led_frame.setVisible(False)
+
+ @property
+ def session(self) -> Session:
+ return self.__session
+
+ def set_session(self, _session):
+ self.__session = _session
+ self.update_ui()
+
+ if _session is None:
+ self.session_name_edit.clear()
+ self.session_name_edit.setFocus()
+ return
+
+ os.makedirs(_session.session_dir, exist_ok=True)
+ os.makedirs(_session.preview_dir, exist_ok=True)
+ os.makedirs(_session.images_dir, exist_ok=True)
+
+ # both browser will emit the directory_loaded signal connected to the
+ # session_directory_loaded slot below (in Qt Designer/Creator, main_window.ui)
+ self.previewImageBrowser.open_directory(self.session.preview_dir)
+ self.rtiImageBrowser.open_directory(self.session.images_dir)
+
+ def on_capture_mode_changed(self):
+ self.update_mirror_view()
+ if self.capture_mode == CaptureMode.RTI and isinstance(self.camera_state, CameraStates.LiveViewActive):
+ self.camera_worker.commands.live_view.emit(False)
+ self.update_ui()
+
+ def update_mirror_view(self):
+ if self.capture_mode == CaptureMode.Preview:
+ self.previewImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view)
+ else:
+ self.rtiImageBrowser.set_mirror_graphics_view(self.mirror_graphics_view)
+
+ def open_settings(self):
+ q_settings = QSettings()
+ dialog = SettingsDialog(q_settings, self)
+ dialog.setModal(True)
+ if dialog.exec():
+ for name, value in dialog.settings.items():
+ q_settings.setValue(name, value)
+ if name == "maxPixmapCache":
+ QPixmapCache.setCacheLimit(value * 1024)
+ elif name == "enableBluetooth":
+ event_loop = asyncio.get_running_loop()
+ if value is True:
+ event_loop.create_task(self.init_bluetooth())
+ elif self.bt_controller:
+ self.bt_controller.bt_disconnect()
+ self.update_ui_bluetooth()
+ elif name == "enableSecondScreenMirror":
+ self.reset_mirror_view()
+
+ def open_advanced_capture_settings(self):
+ def open_dialog(cfg: gp.CameraWidget):
+ self.cam_config_dialog = CameraConfigDialog(cfg, self)
+ if self.cam_config_dialog.exec():
+ self.camera_worker.commands.set_config.emit(cfg)
+ print(cfg.__dict__)
+ self.cam_config_dialog = None
+
+ req = ConfigRequest()
+ req.signal.got_config.connect(open_dialog)
+ self.camera_worker.commands.get_config.emit(req)
+
+ def set_camera_connection_busy(self, busy: bool = True):
+ self.connect_camera_button.setEnabled(not busy)
+ self.disconnect_camera_button.setEnabled(not busy)
+ self.camera_busy_spinner.isAnimated = busy
+
+ def connect_camera(self):
+ self.camera_worker.commands.connect_camera.emit()
+
+ def disconnect_camera(self):
+ self.camera_worker.commands.disconnect_camera.emit()
+
+ def create_session(self):
+ name = self.session_name_edit.text()
+
+ print("Create" + name)
+ session = Session(name, QSettings().value("workingDirectory"))
+ if Path(session.session_dir).exists():
+ result = QMessageBox.warning(self, "Fehler",
+ "Sitzung %s existiert bereits. Soll sie erneut geöffnet werden?" % name,
+ QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)
+ if result == QMessageBox.StandardButton.No:
+ return
+
+ self.set_session(session)
+
+ def session_directory_loaded(self, path):
+ if not self.session:
+ return
+
+ if os.path.normpath(path) == os.path.normpath(self.session.preview_dir):
+ self.session.preview_dir_loaded = True
+ self.session.preview_count = self.previewImageBrowser.last_index()
+
+ elif os.path.normpath(path) == os.path.normpath(self.session.images_dir):
+ self.session.images_dir_loaded = True
+
+ self.update_ui()
+
+ def close_session(self):
+ self.previewImageBrowser.close_directory()
+ self.rtiImageBrowser.close_directory()
+ self.set_session(None)
+
+ def show_session_menu(self):
+ self.session_menu.exec(self.session_menu_button.mapToGlobal(self.session_menu_button.rect().bottomLeft()))
+
+ def show_settings_menu(self):
+ self.settings_menu.exec(self.settings_button.mapToGlobal(self.session_menu_button.rect().bottomLeft()))
+
+ def open_existing_session_directory(self):
+ working_dir = QSettings().value("workingDirectory")
+ dialog = OpenSessionDialog(working_dir, self)
+ path = dialog.get_session_path()
+ if path:
+ session_name = Path(path).name
+ self.set_session(Session(session_name, working_dir))
+
+ def rename_current_session(self):
+ new_name, ok = QInputDialog.getText(self, "Aktuelle Sitzung umbenennen", "Neuer Name", text=self.session.name)
+ if ok:
+ session_dir = self.session.session_dir
+ session_dir_parent = Path(session_dir).parent
+ new_session_dir = os.path.join(session_dir_parent, os.path.join(session_dir_parent, new_name))
+
+ if Path(new_session_dir).exists():
+ QMessageBox.critical(self, "Fehler", "Sitzung %s existiert bereits." % new_name)
+ return
+
+ image_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)]
+ preview_files = [os.path.join(self.session.preview_dir, f) for f in os.listdir(self.session.preview_dir)]
+
+ for file in image_files + preview_files:
+ path = Path(file)
+ parent_path = path.parent
+ file_name = path.name
+ basename, ext = os.path.splitext(file_name)
+ if basename.startswith(self.session.name):
+ new_filename = basename.replace(self.session.name, new_name, 1) + ext
+ new_path = os.path.join(parent_path, new_filename)
+ os.rename(path, new_path)
+
+ os.rename(session_dir, new_session_dir)
+ self.close_session()
+ self.set_session(Session(new_name, session_dir_parent))
+
+ def write_lp(self):
+ file_names = [os.path.basename(file_path) for file_path in self.rtiImageBrowser.files()]
+ num_files = len(file_names)
+ if num_files != 60:
+ logging.warning("Wrong number of files, not writing LP file.")
+ return
+
+ lp_template_path = "cceh-dome-template.lp"
+ lp_output_path = os.path.join(self.session.images_dir, self.session.name + ".lp")
+ with open(lp_template_path, 'r') as lp_template_file, open(lp_output_path, 'w') as lp_output_file:
+ logging.info("Writing LP file: " + lp_output_path)
+ lp_output_file.write(str(num_files) + "\n")
+ for i, input_line in enumerate(lp_template_file):
+ output_line = file_names[i] + " " + input_line
+ lp_output_file.write(output_line)
+
+ def dump_camera_config(self):
+ output_path = os.path.join(self.session.images_dir, "camera_config.json")
+ self.logger.info(f"Writing camera configuration dump: {output_path}")
+
+ if not self.session:
+ return
+ def on_got_config(cfg: gp.CameraWidget):
+ cfg_dict = {}
+
+ def traverse_widget(widget, widget_dict):
+ widget_type = widget.get_type()
+
+ if widget_type == gp.GP_WIDGET_SECTION or widget_type == gp.GP_WIDGET_WINDOW:
+ # If the widget is a section, traverse its children
+ child_count = widget.count_children()
+ for i in range(child_count):
+ child = widget.get_child(i)
+ child_dict = {}
+ traverse_widget(child, child_dict)
+ widget_dict[child.get_name()] = child_dict
+ else:
+ try:
+ widget_dict['value'] = widget.get_value()
+ except gp.GPhoto2Error as err:
+ if err.code == -2:
+ self.logger.warning(f"Could not get config value for {cfg.get_label()} ({cfg.get_name()}).")
+
+ widget_dict['label'] = widget.get_label()
+
+ return widget_dict
+
+ traverse_widget(cfg, cfg_dict)
+
+ try:
+ with open(output_path, "w") as output_file:
+ json.dump(cfg_dict, output_file, indent=4)
+ except Exception as e:
+ self.logger.error(f"Could not write camera config dump to {output_path}:")
+ self.logger.exception(e)
+
+
+ req = ConfigRequest()
+ req.signal.got_config.connect(on_got_config)
+ self.camera_worker.commands.get_config.emit(req)
+
+
+ def config_hookup_select(self, config: gp.CameraWidget, config_name, combo_box: QComboBox, value_map: dict = None):
+ try:
+ combo_box.currentIndexChanged.disconnect()
+ except:
+ pass
+ cfg = config.get_child_by_name(config_name)
+ combo_box.clear()
+ for idx, choice in enumerate(cfg.get_choices()):
+ choice_label = value_map[choice] if value_map and choice in value_map else choice
+ combo_box.addItem(choice_label, choice)
+ if choice == cfg.get_value():
+ combo_box.setCurrentIndex(idx)
+
+ combo_box.currentIndexChanged.connect(lambda: self.camera_worker.commands.set_single_config.emit(
+ config_name, combo_box.currentData()
+ ))
+
+ def on_config_update(self, config: gp.CameraWidget):
+ self.config_hookup_select(config, "iso", self.iso_select)
+ self.config_hookup_select(config, "f-number", self.f_number_select)
+ self.config_hookup_select(config, "shutterspeed2", self.shutter_speed_select)
+ self.config_hookup_select(config, "d030", self.crop_select, {
+ "0": "Voll",
+ "1": "Klein",
+ "2": "Mittel",
+ "3": "Mittel 2"
+ })
+
+ def on_property_change(self, event: PropertyChangeEvent):
+ match event.property_name:
+ case "lightmeter":
+ if isinstance(self.camera_state, CameraStates.LiveViewActive):
+ self.update_lightmeter(event.value)
+
+ def update_lightmeter(self, value):
+ if isinstance(value, float):
+ self.light_lcd_number.display(int(value))
+ else:
+ self.light_lcd_number.display(None)
+
+ def enable_live_view(self, enable: bool):
+ self.camera_worker.commands.live_view.emit(enable)
+
+ def trigger_autofocus(self):
+ self.camera_worker.commands.trigger_autofocus.emit()
+
+ def capture_image(self):
+ capture_req: CaptureImagesRequest
+
+ # Capture Previews
+ if self.capture_mode == CaptureMode.Preview:
+ filename_template = self.session.name.replace(" ", "_") + "_test_" + str(
+ self.session.preview_count + 1) + "${extension}"
+ file_path_template = os.path.join(self.session.preview_dir, filename_template)
+ capture_req = CaptureImagesRequest(file_path_template, num_images=1, image_quality="JPEG Fine")
+
+ # Capture RTI Series
+ else:
+ if self.rtiImageBrowser.num_files() > 0:
+ message_box = QMessageBox(QMessageBox.Icon.Warning, "RTI-Serie aufnehmen",
+ "Vorhandene Aufnahmen werden gelöscht.")
+ message_box.addButton(
+ QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogCancelButton), "Abbrechen"),
+ QMessageBox.ButtonRole.NoRole)
+ message_box.addButton(
+ QPushButton(self.style().standardIcon(QStyle.StandardPixmap.SP_DialogOkButton), "Fortfahren"),
+ QMessageBox.ButtonRole.YesRole)
+ if not message_box.exec():
+ return
+
+ existing_files = [os.path.join(self.session.images_dir, f) for f in os.listdir(self.session.images_dir)]
+ send2trash(existing_files)
+
+ filename_template = self.session.name.replace(" ", "_") + "_${num}${extension}"
+ file_path_template = os.path.join(self.session.images_dir, filename_template)
+ capture_req = CaptureImagesRequest(file_path_template, num_images=60, expect_files=2,
+ max_burst=int(QSettings().value("maxBurstNumber")), skip=0, image_quality="NEF+Fine")
+ self.capture_progress_bar.setMaximum(119)
+ self.capture_progress_bar.setValue(0)
+
+ def on_file_received(path: str):
+ print("Rec: " + path)
+ capture_req.signal.file_received.connect(on_file_received)
+
+ def start_capture(show_button_message: bool):
+ if self.capture_mode == CaptureMode.RTI and show_button_message:
+ press_buttons_dialog = QDialog()
+ loadUi(get_ui_path("ui/press-buttons-dialog.ui"), press_buttons_dialog)
+ if not press_buttons_dialog.exec():
+ return
+
+ self.camera_worker.commands.capture_images.emit(capture_req)
+
+ if self.bt_controller and self.bt_controller.state == BtControllerState.CONNECTED:
+ initial_led = 0 if self.capture_mode == CaptureMode.RTI else self.preview_led_select.currentData()
+ request = BtControllerRequest(BtControllerCommand.SET_LED, initial_led)
+ request.signals.success.connect(lambda: start_capture(False))
+ request.signals.error.connect(lambda: start_capture(True))
+ self.bt_controller.send_command(request)
+ else:
+ start_capture(True)
+
+
+ def on_capture_cancelled(self):
+ logging.info("Capture cancelled")
+
+ def cancel_capture(self):
+ self.cancel_capture_button.setEnabled(False)
+ self.camera_worker.commands.cancel.emit()
+
+ def closeEvent(self, event: QCloseEvent):
+ QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
+ self.camera_thread.requestInterruption()
+ self.camera_thread.exit()
+ if self.bt_controller and self.bt_controller.state != BtControllerState.DISCONNECTED:
+ self.bt_controller.bt_disconnect()
+ self.camera_thread.wait()
+ if self.second_screen_window:
+ self.second_screen_window.close()
+
+ super().closeEvent(event)
+
+if __name__ == "__main__":
+ win: RTICaptureMainWindow
+
+ logging.basicConfig(
+ format='%(levelname)s: %(name)s: %(message)s', level=logging.INFO)
+
+ app = QApplication(sys.argv)
+ app.setOrganizationName("CCeH")
+ app.setOrganizationDomain("cceh.uni-koeln.de")
+ app.setApplicationName("Byzanz RTI")
+
+ settings = QSettings()
+ if "workingDirectory" not in settings.allKeys():
+ settings.setValue("workingDirectory",
+ QStandardPaths.writableLocation(QStandardPaths.StandardLocation.PicturesLocation))
+
+ if "maxPixmapCache" not in settings.allKeys():
+ settings.setValue("maxPixmapCache", 1024)
+
+ if "maxBurstNumber" not in settings.allKeys():
+ settings.setValue("maxBurstNumber", 60)
+
+ if "enableBluetooth" not in settings.allKeys():
+ settings.setValue("enableBluetooth", True)
+
+ if "enableSecondScreenMirror" not in settings.allKeys():
+ settings.setValue("enableSecondScreenMirror", True)
+
+ QPixmapCache.setCacheLimit(int(settings.value("maxPixmapCache")) * 1024)
+
+ loop: qasync.QEventLoop = qasync.QEventLoop(app)
+ asyncio.set_event_loop(loop)
+
+ app_close_event = asyncio.Event()
+ app.aboutToQuit.connect(app_close_event.set)
+
+ orig_exceptionhook = sys.__excepthook__
+
+ win = RTICaptureMainWindow()
+ win.show()
+
+ def excepthook(exc_type, exc_value, exc_traceback):
+ logging.exception(msg="Exception", exc_info=(exc_type, exc_value, exc_traceback))
+ if win.bt_controller and win.bt_controller.state != BtControllerState.DISCONNECTED:
+ win.bt_controller.bt_disconnect()
+
+ loop.call_soon(lambda _loop: _loop.stop(), loop)
+
+
+ # sys.excepthook = excepthook
+ # threading.excepthook = excepthook
+
+
+ with loop:
+ if BT_AVAILABLE and QSettings().value("enableBluetooth", type=bool):
+ loop.create_task(win.init_bluetooth())
+ else:
+ logging.info("Bluetooth not available. Is bleak installed?")
+
+ loop.run_until_complete(app_close_event.wait())
+
+ # asyncio.get_running_loop().run_forever()
+
+
diff --git a/photo_browser.py b/photo_browser.py
index 0b2de97..88ff448 100644
--- a/photo_browser.py
+++ b/photo_browser.py
@@ -1,240 +1,242 @@
-import mimetypes
-import os
-import re
-from os import listdir
-from pathlib import Path
-from typing import Callable, Optional
-
-from PyQt6.QtCore import QFileSystemWatcher, Qt, QThreadPool, pyqtSignal, QMutex, \
- QMutexLocker
-from PyQt6.QtGui import QPixmap, QResizeEvent, QPixmapCache, QImage
-from PyQt6.QtWidgets import QWidget, QListWidget, QListWidgetItem, QGraphicsView
-from PyQt6.uic import loadUi
-
-from load_image_worker import LoadImageWorker, LoadImageWorkerResult
-from photo_viewer import PhotoViewer
-from spinner import Spinner
-
-
-def get_file_index(file_path) -> Optional[int]:
- basename = os.path.splitext(file_path)[0]
- numbers_in_basename = re.findall(r'\d+', basename)
- return int(numbers_in_basename[-1]) if numbers_in_basename else None
-
-
-class ImageFileListItem(QListWidgetItem):
- def __init__(self, path: str, thumbnail: QPixmap):
- super().__init__()
- self.path: str = path
- self.file_name = Path(path).name
- self.index = get_file_index(self.file_name)
- self.thumbnail: QPixmap = thumbnail
-
- def __lt__(self, other):
- return self.index < other.index
-
- def data(self, role: Qt.ItemDataRole):
- if role == Qt.ItemDataRole.DecorationRole:
- return self.thumbnail
-
- return super().data(role)
-
-class PhotoBrowser(QWidget):
- directory_loaded = pyqtSignal(str)
-
- def __init__(self, parent=None):
- super().__init__(parent)
- loadUi('ui/photo_browser.ui', self)
-
- self.__fileSystemWatcher = QFileSystemWatcher()
- self.__threadpool = QThreadPool()
- self.__threadpool.setMaxThreadCount(4)
- self.__num_images_to_load = 0
-
- self.__currentPath: str = None
- self.__currentFileSet: set[str] = set()
-
- self.photo_viewer: PhotoViewer = self.findChild(QWidget, "photoViewer")
- self.image_file_list: QListWidget = self.findChild(QListWidget, "imageFileList")
- self.viewer_container: QWidget = self.findChild(QWidget, "viewerContainer")
-
- self.__fileSystemWatcher.directoryChanged.connect(self.__load_directory)
-
- self.image_file_list.currentItemChanged.connect(self.__on_select_image_file)
-
- self.spinner = Spinner(self.viewer_container, Spinner.m_light_color)
- self.spinner.isAnimated = False
- self.__center_spinner_over_photo_viewer()
- self.resize(self.size())
-
- self.__mutex = QMutex()
-
- def get_scene(self):
- return self.photo_viewer.getScene()
-
- def set_mirror_graphics_view(self, view: QGraphicsView):
- self.photo_viewer.setMirrorView(view)
-
- def open_directory(self, dir_path):
- if self.__currentPath:
- self.close_directory()
-
- self.__currentPath = dir_path
- self.start_watching()
- self.__load_directory()
-
- def start_watching(self):
- # print("START watching " + self.__currentPath)
- self.__fileSystemWatcher.addPath(self.__currentPath)
-
- def close_directory(self):
- self.stop_watching()
- self.__currentPath = None
- self.__currentFileSet.clear()
- self.__threadpool.clear()
- self.__threadpool.waitForDone()
- self.__num_images_to_load = 0
- self.image_file_list.clear()
- QPixmapCache.clear()
-
- def stop_watching(self):
- # print("STOP watching " + self.__currentPath)
- self.__fileSystemWatcher.removePath(self.__currentPath)
-
- def num_files(self) -> int:
- return self.image_file_list.count()
-
- def files(self):
- return [self.image_file_list.item(row).path for row in range(self.image_file_list.count())]
-
- def last_index(self) -> int:
- image_count = self.image_file_list.count()
- if image_count > 0:
- return self.image_file_list.item(image_count - 1).index
-
- return 0
-
- def resizeEvent(self, event: QResizeEvent):
- self.__center_spinner_over_photo_viewer()
-
- def show_preview(self, image: QImage | None):
- if not image:
- self.photo_viewer.setPhoto(None)
- self.image_file_list.setEnabled(True)
-
- # re-show the previously selected image
- selected_image_index = self.image_file_list.currentIndex()
- if selected_image_index:
- item = self.image_file_list.item(selected_image_index.row())
- self.__on_select_image_file(item)
- return
-
- self.image_file_list.setEnabled(False)
- self.photo_viewer.setPhoto(QPixmap.fromImage(image))
- self.photo_viewer.fitInView()
-
- def __load_directory(self):
- print("Load directory: " + self.__currentPath)
- new_files = [f for f in listdir(self.__currentPath)
- if mimetypes.guess_type(f)[0] == "image/jpeg" and get_file_index(f) is not None]
-
- new_fileset = set(new_files)
- added_files = new_fileset - self.__currentFileSet
- removed_files = self.__currentFileSet - new_fileset
-
- if not added_files and not removed_files:
- self.directory_loaded.emit(self.__currentPath)
-
- if added_files:
- # self.__threadpool.waitForDone()
- # self.stop_watching()
- for f in added_files:
- self.__load_image(f, self.__add_image_item)
-
- for f in removed_files:
- for i in range(self.image_file_list.count()):
- item = self.image_file_list.item(i)
- if isinstance(item, ImageFileListItem):
- if item.file_name == f:
- self.image_file_list.takeItem(i)
- del item
-
-
-
-
- self.__currentFileSet = new_fileset
-
- def __on_directory_loaded(self):
- # self.start_watching()
- self.directory_loaded.emit(self.__currentPath)
- # image_count = self.image_file_list.count()
- # if image_count > 0:
- # self.image_file_list.setCurrentItem(self.image_file_list.item(image_count - 1))
-
- # just in case there have been changes while loading the files
- self.__load_directory()
-
- def __load_image(self, file_name: str, on_finished_callback: Callable):
- self.__num_images_to_load +=1
-
- worker = LoadImageWorker(os.path.join(self.__currentPath, file_name), True, 200)
- worker.signals.finished.connect(lambda result: self.__on_image_loaded(result, on_finished_callback))
-
- self.spinner.startAnimation()
- self.__threadpool.start(worker)
-
- def __on_image_loaded(self, result: LoadImageWorkerResult, on_finished_callback: Callable):
- QPixmapCache.insert(result.path, QPixmap.fromImage(result.image))
- on_finished_callback(result)
-
- self.__num_images_to_load -= 1
- if self.__num_images_to_load == 0:
- self.__on_directory_loaded()
-
- self.spinner.stopAnimation()
-
- def __on_select_image_file(self, item: ImageFileListItem):
- if item:
- file_path = item.path
- cached_image = QPixmapCache.find(file_path)
- if cached_image:
- print("cache hit")
- self.photo_viewer.setPhoto(cached_image)
- else:
- print("cache miss")
- self.__load_image(file_path, lambda result: self.photo_viewer.setPhoto(QPixmap.fromImage(result.image)))
- else:
- self.photo_viewer.setPhoto(None)
-
- def __add_image_item(self, image_worker_result: LoadImageWorkerResult):
- list_item = ImageFileListItem(image_worker_result.path, image_worker_result.thumbnail)
-
- exposure_time = image_worker_result.exif["ExposureTime"].real
- f_number = image_worker_result.exif["FNumber"]
- list_item.setText("%s\nf/%s | %s" % (list_item.file_name, f_number, exposure_time))
-
- # Only add the item to the list if a directory is still open. This function
- # can be called asynchronously from a thread so the directory could have been
- # closed in the meantime.
- with QMutexLocker(self.__mutex):
- if self.__currentPath:
- self.image_file_list.addItem(list_item)
- self.image_file_list.sortItems()
- self.image_file_list.scrollToBottom()
-
- self.image_file_list.currentItemChanged.disconnect()
- self.image_file_list.setCurrentItem(list_item)
- self.image_file_list.currentItemChanged.connect(self.__on_select_image_file)
-
- image_path = image_worker_result.path
- pixmap: QPixmap = QPixmapCache.find(image_path) or QPixmap.fromImage(image_worker_result.image)
- self.photo_viewer.setPhoto(pixmap)
- if self.image_file_list.indexFromItem(list_item).row() == 0:
- self.photo_viewer.fitInView()
-
-
-
- def __center_spinner_over_photo_viewer(self):
- spinner_x = (self.viewer_container.width() - 80) / 2
- spinner_y = (self.viewer_container.height() - 80) / 2
- self.spinner.setGeometry(int(spinner_x), int(spinner_y), 80, 80)
+import mimetypes
+import os
+import re
+from os import listdir
+from pathlib import Path
+from typing import Callable, Optional
+
+from PyQt6.QtCore import QFileSystemWatcher, Qt, QThreadPool, pyqtSignal, QMutex, \
+ QMutexLocker
+from PyQt6.QtGui import QPixmap, QResizeEvent, QPixmapCache, QImage
+from PyQt6.QtWidgets import QWidget, QListWidget, QListWidgetItem, QGraphicsView
+from PyQt6.uic import loadUi
+
+from load_image_worker import LoadImageWorker, LoadImageWorkerResult
+from photo_viewer import PhotoViewer
+from spinner import Spinner
+
+from helpers import get_ui_path
+
+
+def get_file_index(file_path) -> Optional[int]:
+ basename = os.path.splitext(file_path)[0]
+ numbers_in_basename = re.findall(r'\d+', basename)
+ return int(numbers_in_basename[-1]) if numbers_in_basename else None
+
+
+class ImageFileListItem(QListWidgetItem):
+ def __init__(self, path: str, thumbnail: QPixmap):
+ super().__init__()
+ self.path: str = path
+ self.file_name = Path(path).name
+ self.index = get_file_index(self.file_name)
+ self.thumbnail: QPixmap = thumbnail
+
+ def __lt__(self, other):
+ return self.index < other.index
+
+ def data(self, role: Qt.ItemDataRole):
+ if role == Qt.ItemDataRole.DecorationRole:
+ return self.thumbnail
+
+ return super().data(role)
+
+class PhotoBrowser(QWidget):
+ directory_loaded = pyqtSignal(str)
+
+ def __init__(self, parent=None):
+ super().__init__(parent)
+ loadUi(get_ui_path('ui/photo_browser.ui'), self)
+
+ self.__fileSystemWatcher = QFileSystemWatcher()
+ self.__threadpool = QThreadPool()
+ self.__threadpool.setMaxThreadCount(4)
+ self.__num_images_to_load = 0
+
+ self.__currentPath: str = None
+ self.__currentFileSet: set[str] = set()
+
+ self.photo_viewer: PhotoViewer = self.findChild(QWidget, "photoViewer")
+ self.image_file_list: QListWidget = self.findChild(QListWidget, "imageFileList")
+ self.viewer_container: QWidget = self.findChild(QWidget, "viewerContainer")
+
+ self.__fileSystemWatcher.directoryChanged.connect(self.__load_directory)
+
+ self.image_file_list.currentItemChanged.connect(self.__on_select_image_file)
+
+ self.spinner = Spinner(self.viewer_container, Spinner.m_light_color)
+ self.spinner.isAnimated = False
+ self.__center_spinner_over_photo_viewer()
+ self.resize(self.size())
+
+ self.__mutex = QMutex()
+
+ def get_scene(self):
+ return self.photo_viewer.getScene()
+
+ def set_mirror_graphics_view(self, view: QGraphicsView):
+ self.photo_viewer.setMirrorView(view)
+
+ def open_directory(self, dir_path):
+ if self.__currentPath:
+ self.close_directory()
+
+ self.__currentPath = dir_path
+ self.start_watching()
+ self.__load_directory()
+
+ def start_watching(self):
+ # print("START watching " + self.__currentPath)
+ self.__fileSystemWatcher.addPath(self.__currentPath)
+
+ def close_directory(self):
+ self.stop_watching()
+ self.__currentPath = None
+ self.__currentFileSet.clear()
+ self.__threadpool.clear()
+ self.__threadpool.waitForDone()
+ self.__num_images_to_load = 0
+ self.image_file_list.clear()
+ QPixmapCache.clear()
+
+ def stop_watching(self):
+ # print("STOP watching " + self.__currentPath)
+ self.__fileSystemWatcher.removePath(self.__currentPath)
+
+ def num_files(self) -> int:
+ return self.image_file_list.count()
+
+ def files(self):
+ return [self.image_file_list.item(row).path for row in range(self.image_file_list.count())]
+
+ def last_index(self) -> int:
+ image_count = self.image_file_list.count()
+ if image_count > 0:
+ return self.image_file_list.item(image_count - 1).index
+
+ return 0
+
+ def resizeEvent(self, event: QResizeEvent):
+ self.__center_spinner_over_photo_viewer()
+
+ def show_preview(self, image: QImage | None):
+ if not image:
+ self.photo_viewer.setPhoto(None)
+ self.image_file_list.setEnabled(True)
+
+ # re-show the previously selected image
+ selected_image_index = self.image_file_list.currentIndex()
+ if selected_image_index:
+ item = self.image_file_list.item(selected_image_index.row())
+ self.__on_select_image_file(item)
+ return
+
+ self.image_file_list.setEnabled(False)
+ self.photo_viewer.setPhoto(QPixmap.fromImage(image))
+ self.photo_viewer.fitInView()
+
+ def __load_directory(self):
+ print("Load directory: " + self.__currentPath)
+ new_files = [f for f in listdir(self.__currentPath)
+ if mimetypes.guess_type(f)[0] == "image/jpeg" and get_file_index(f) is not None]
+
+ new_fileset = set(new_files)
+ added_files = new_fileset - self.__currentFileSet
+ removed_files = self.__currentFileSet - new_fileset
+
+ if not added_files and not removed_files:
+ self.directory_loaded.emit(self.__currentPath)
+
+ if added_files:
+ # self.__threadpool.waitForDone()
+ # self.stop_watching()
+ for f in added_files:
+ self.__load_image(f, self.__add_image_item)
+
+ for f in removed_files:
+ for i in range(self.image_file_list.count()):
+ item = self.image_file_list.item(i)
+ if isinstance(item, ImageFileListItem):
+ if item.file_name == f:
+ self.image_file_list.takeItem(i)
+ del item
+
+
+
+
+ self.__currentFileSet = new_fileset
+
+ def __on_directory_loaded(self):
+ # self.start_watching()
+ self.directory_loaded.emit(self.__currentPath)
+ # image_count = self.image_file_list.count()
+ # if image_count > 0:
+ # self.image_file_list.setCurrentItem(self.image_file_list.item(image_count - 1))
+
+ # just in case there have been changes while loading the files
+ self.__load_directory()
+
+ def __load_image(self, file_name: str, on_finished_callback: Callable):
+ self.__num_images_to_load +=1
+
+ worker = LoadImageWorker(os.path.join(self.__currentPath, file_name), True, 200)
+ worker.signals.finished.connect(lambda result: self.__on_image_loaded(result, on_finished_callback))
+
+ self.spinner.startAnimation()
+ self.__threadpool.start(worker)
+
+ def __on_image_loaded(self, result: LoadImageWorkerResult, on_finished_callback: Callable):
+ QPixmapCache.insert(result.path, QPixmap.fromImage(result.image))
+ on_finished_callback(result)
+
+ self.__num_images_to_load -= 1
+ if self.__num_images_to_load == 0:
+ self.__on_directory_loaded()
+
+ self.spinner.stopAnimation()
+
+ def __on_select_image_file(self, item: ImageFileListItem):
+ if item:
+ file_path = item.path
+ cached_image = QPixmapCache.find(file_path)
+ if cached_image:
+ print("cache hit")
+ self.photo_viewer.setPhoto(cached_image)
+ else:
+ print("cache miss")
+ self.__load_image(file_path, lambda result: self.photo_viewer.setPhoto(QPixmap.fromImage(result.image)))
+ else:
+ self.photo_viewer.setPhoto(None)
+
+ def __add_image_item(self, image_worker_result: LoadImageWorkerResult):
+ list_item = ImageFileListItem(image_worker_result.path, image_worker_result.thumbnail)
+
+ exposure_time = image_worker_result.exif["ExposureTime"].real
+ f_number = image_worker_result.exif["FNumber"]
+ list_item.setText("%s\nf/%s | %s" % (list_item.file_name, f_number, exposure_time))
+
+ # Only add the item to the list if a directory is still open. This function
+ # can be called asynchronously from a thread so the directory could have been
+ # closed in the meantime.
+ with QMutexLocker(self.__mutex):
+ if self.__currentPath:
+ self.image_file_list.addItem(list_item)
+ self.image_file_list.sortItems()
+ self.image_file_list.scrollToBottom()
+
+ self.image_file_list.currentItemChanged.disconnect()
+ self.image_file_list.setCurrentItem(list_item)
+ self.image_file_list.currentItemChanged.connect(self.__on_select_image_file)
+
+ image_path = image_worker_result.path
+ pixmap: QPixmap = QPixmapCache.find(image_path) or QPixmap.fromImage(image_worker_result.image)
+ self.photo_viewer.setPhoto(pixmap)
+ if self.image_file_list.indexFromItem(list_item).row() == 0:
+ self.photo_viewer.fitInView()
+
+
+
+ def __center_spinner_over_photo_viewer(self):
+ spinner_x = (self.viewer_container.width() - 80) / 2
+ spinner_y = (self.viewer_container.height() - 80) / 2
+ self.spinner.setGeometry(int(spinner_x), int(spinner_y), 80, 80)
diff --git a/requirements.txt b/requirements.txt
index d34cd16..ecc0982 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,6 @@
gphoto2~=2.5.0
-PyQt6~=6.7.0
-Pillow~=10.3.0
+#PyQt6~=6.7.0
+#Pillow~=10.3.0
Send2Trash~=1.8.3
-bleak==0.22.1
+#bleak==0.22.1
qasync~=0.27.1
diff --git a/settings_dialog.py b/settings_dialog.py
index cd85871..bab62d9 100644
--- a/settings_dialog.py
+++ b/settings_dialog.py
@@ -1,69 +1,70 @@
-from typing import Any
-
-from PyQt6.QtCore import QSettings, QVariant, Qt
-from PyQt6.QtGui import QAction, QIcon
-from PyQt6.QtWidgets import QDialog, QLineEdit, QFileDialog, QToolButton, QSpinBox, QCheckBox
-from PyQt6.uic import loadUi
-
-
-class SettingsDialog(QDialog):
-
- def __init__(self, q_settings: QSettings, parent=None):
- super(SettingsDialog, self).__init__(parent)
- self.__q_settings = q_settings
- self.settings: dict[str, Any] = dict()
-
- loadUi('ui/settings_dialog.ui', self)
-
- self.working_directory_input: QLineEdit = self.findChild(QLineEdit, "workingDirectoryInput")
- self.working_directory_input.setText(q_settings.value("workingDirectory"))
- self.working_directory_input.textChanged.connect(
- lambda text: self.set("workingDirectory", text)
- )
-
- open_action = QAction("Arbeitsverzeichnis wählen", self)
- open_action.setIcon(QIcon("ui/folder-open.svg"))
- open_action.triggered.connect(self.choose_working_directory)
-
- self.working_directory_input.addAction(open_action, QLineEdit.ActionPosition.TrailingPosition)
- tool_button: QToolButton
- for tool_button in self.working_directory_input.findChildren(QToolButton):
- tool_button.setCursor(Qt.CursorShape.PointingHandCursor)
-
- self.max_pixmap_cache_input: QSpinBox = self.findChild(QSpinBox, "maxPixmapCacheInput")
- self.max_pixmap_cache_input.setValue(int(q_settings.value("maxPixmapCache")))
- self.max_pixmap_cache_input.textChanged.connect(
- lambda text: self.set("maxPixmapCache", int(text))
- )
-
- self.max_burst_number_input: QSpinBox = self.findChild(QSpinBox, "maxBurstNumberInput")
- self.max_burst_number_input.setValue(int(q_settings.value("maxBurstNumber")))
- self.max_burst_number_input.textChanged.connect(
- lambda text: self.set("maxBurstNumber", int(text))
- )
-
- self.enable_bluetooth_checkbox: QCheckBox = self.findChild(QCheckBox, "enableBluetoothCheckbox")
- self.enable_bluetooth_checkbox.setChecked(q_settings.value("enableBluetooth", type=bool))
- self.enable_bluetooth_checkbox.stateChanged.connect(
- lambda: self.set("enableBluetooth", self.enable_bluetooth_checkbox.isChecked())
- )
-
- self.enable_second_screen_mirror_checkbox: QCheckBox = self.findChild(QCheckBox, "enableSecondScreenMirrorCheckbox")
- self.enable_second_screen_mirror_checkbox.setChecked(q_settings.value("enableSecondScreenMirror", type=bool))
- self.enable_second_screen_mirror_checkbox.stateChanged.connect(
- lambda: self.set("enableSecondScreenMirror", self.enable_second_screen_mirror_checkbox.isChecked())
- )
-
- def set(self, name: str, value: QVariant):
- self.settings[name] = value
-
- def choose_working_directory(self):
- file_dialog = QFileDialog(self,
- "Arbeitsverzeichnis wählen",
- self.working_directory_input.text())
- file_dialog.setFileMode(QFileDialog.FileMode.Directory)
- if file_dialog.exec():
- print(file_dialog.selectedFiles())
-
- if file_dialog.selectedFiles():
- self.working_directory_input.setText(file_dialog.selectedFiles()[0])
+from typing import Any
+
+from PyQt6.QtCore import QSettings, QVariant, Qt
+from PyQt6.QtGui import QAction, QIcon
+from PyQt6.QtWidgets import QDialog, QLineEdit, QFileDialog, QToolButton, QSpinBox, QCheckBox
+from PyQt6.uic import loadUi
+
+from helpers import get_ui_path
+
+class SettingsDialog(QDialog):
+
+ def __init__(self, q_settings: QSettings, parent=None):
+ super(SettingsDialog, self).__init__(parent)
+ self.__q_settings = q_settings
+ self.settings: dict[str, Any] = dict()
+
+ loadUi(get_ui_path('ui/settings_dialog.ui'), self)
+
+ self.working_directory_input: QLineEdit = self.findChild(QLineEdit, "workingDirectoryInput")
+ self.working_directory_input.setText(q_settings.value("workingDirectory"))
+ self.working_directory_input.textChanged.connect(
+ lambda text: self.set("workingDirectory", text)
+ )
+
+ open_action = QAction("Arbeitsverzeichnis wählen", self)
+ open_action.setIcon(QIcon(get_ui_path("ui/folder-open.svg")))
+ open_action.triggered.connect(self.choose_working_directory)
+
+ self.working_directory_input.addAction(open_action, QLineEdit.ActionPosition.TrailingPosition)
+ tool_button: QToolButton
+ for tool_button in self.working_directory_input.findChildren(QToolButton):
+ tool_button.setCursor(Qt.CursorShape.PointingHandCursor)
+
+ self.max_pixmap_cache_input: QSpinBox = self.findChild(QSpinBox, "maxPixmapCacheInput")
+ self.max_pixmap_cache_input.setValue(int(q_settings.value("maxPixmapCache")))
+ self.max_pixmap_cache_input.textChanged.connect(
+ lambda text: self.set("maxPixmapCache", int(text))
+ )
+
+ self.max_burst_number_input: QSpinBox = self.findChild(QSpinBox, "maxBurstNumberInput")
+ self.max_burst_number_input.setValue(int(q_settings.value("maxBurstNumber")))
+ self.max_burst_number_input.textChanged.connect(
+ lambda text: self.set("maxBurstNumber", int(text))
+ )
+
+ self.enable_bluetooth_checkbox: QCheckBox = self.findChild(QCheckBox, "enableBluetoothCheckbox")
+ self.enable_bluetooth_checkbox.setChecked(q_settings.value("enableBluetooth", type=bool))
+ self.enable_bluetooth_checkbox.stateChanged.connect(
+ lambda: self.set("enableBluetooth", self.enable_bluetooth_checkbox.isChecked())
+ )
+
+ self.enable_second_screen_mirror_checkbox: QCheckBox = self.findChild(QCheckBox, "enableSecondScreenMirrorCheckbox")
+ self.enable_second_screen_mirror_checkbox.setChecked(q_settings.value("enableSecondScreenMirror", type=bool))
+ self.enable_second_screen_mirror_checkbox.stateChanged.connect(
+ lambda: self.set("enableSecondScreenMirror", self.enable_second_screen_mirror_checkbox.isChecked())
+ )
+
+ def set(self, name: str, value: QVariant):
+ self.settings[name] = value
+
+ def choose_working_directory(self):
+ file_dialog = QFileDialog(self,
+ "Arbeitsverzeichnis wählen",
+ self.working_directory_input.text())
+ file_dialog.setFileMode(QFileDialog.FileMode.Directory)
+ if file_dialog.exec():
+ print(file_dialog.selectedFiles())
+
+ if file_dialog.selectedFiles():
+ self.working_directory_input.setText(file_dialog.selectedFiles()[0])