Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor watchdog & macos icon stuff #51

Merged
merged 7 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 23 additions & 109 deletions syftbox/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import importlib
import os
import platform
import subprocess
import sys
import threading
import time
import traceback
import types
Expand All @@ -26,9 +24,13 @@
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer

from syftbox.client.fsevents import (
AnyFileSystemEventHandler,
FileSystemEvent,
FSWatchdog,
)
from syftbox.client.utils import macos
from syftbox.lib import ClientConfig, SharedState, validate_email

current_dir = Path(__file__).parent
Expand Down Expand Up @@ -57,57 +59,6 @@ class Plugin:
description: str


# if you knew the pain of this function
def find_icon_file(src_folder: str) -> Path:
src_path = Path(src_folder)

# Function to search for Icon\r file
def search_icon_file():
if os.path.exists(src_folder):
for file_path in src_path.iterdir():
if "Icon" in file_path.name and "\r" in file_path.name:
return file_path
return None

# First attempt to find the Icon\r file
icon_file = search_icon_file()
if icon_file:
return icon_file

# If Icon\r is not found, search for icon.zip and unzip it
zip_file = ASSETS_FOLDER / "icon.zip"

if zip_file.exists():
try:
# cant use other zip tools as they don't unpack it correctly
subprocess.run(
["ditto", "-xk", str(zip_file), str(src_path.parent)],
check=True,
)

# Try to find the Icon\r file again after extraction
icon_file = search_icon_file()
if icon_file:
return icon_file
except subprocess.CalledProcessError:
raise RuntimeError("Failed to unzip icon.zip using macOS CLI tool.")

# If still not found, raise an error
raise FileNotFoundError(
"Icon file with a carriage return not found, and icon.zip did not contain it.",
)


def copy_icon_file(icon_folder: str, dest_folder: str) -> None:
src_icon_path = find_icon_file(icon_folder)
if not os.path.isdir(dest_folder):
raise FileNotFoundError(f"Destination folder '{dest_folder}' does not exist.")

# shutil wont work with these special icon files
subprocess.run(["cp", "-p", src_icon_path, dest_folder], check=True)
subprocess.run(["SetFile", "-a", "C", dest_folder], check=True)


def load_or_create_config(args) -> ClientConfig:
syft_config_dir = os.path.abspath(os.path.expanduser("~/.syftbox"))
os.makedirs(syft_config_dir, exist_ok=True)
Expand Down Expand Up @@ -146,7 +97,7 @@ def load_or_create_config(args) -> ClientConfig:
os.makedirs(client_config.sync_folder, exist_ok=True)

if platform.system() == "Darwin":
copy_icon_file(ICON_FOLDER, client_config.sync_folder)
macos.copy_icon_file(ICON_FOLDER, client_config.sync_folder)

if args.email:
client_config.email = args.email
Expand Down Expand Up @@ -250,14 +201,6 @@ def is_valid_datasite_name(name):
return name.isalnum() or all(c.isalnum() or c in ("-", "_") for c in name)


@dataclass
class Plugin:
name: str
module: types.ModuleType
schedule: int
description: str


# API Models
class PluginRequest(BaseModel):
plugin_name: str
Expand Down Expand Up @@ -343,45 +286,27 @@ def parse_args():
return parser.parse_args()


def start_watchdog(app):
shared_state = app.shared_state
sync_folder = shared_state.client_config.sync_folder

stop_event = threading.Event()
app.stop_event = stop_event

class AnyFileSystemEventHandler(FileSystemEventHandler):
def on_any_event(self, event: FileSystemEvent) -> None:
for ignore in WATCHDOG_IGNORE:
full_path = shared_state.client_config.sync_folder + "/" + ignore
if event.src_path.startswith(full_path):
return
run_plugin("sync", event)
def start_watchdog(app) -> FSWatchdog:
def sync_on_event(event: FileSystemEvent):
run_plugin("sync", event)

event_handler = AnyFileSystemEventHandler()
observer = Observer()
observer.schedule(event_handler, sync_folder, recursive=True)
observer.start()

# Run observer in a thread to keep the process alive
try:
while not stop_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
print("> Watchdog received KeyboardInterrupt")
finally:
print("> Stopping Watchdog...")
observer.stop()
observer.join()
print("> Watchdog stopped")
watch_dir = Path(app.shared_state.client_config.sync_folder)
event_handler = AnyFileSystemEventHandler(
watch_dir,
callbacks=[sync_on_event],
ignored=WATCHDOG_IGNORE,
)
watchdog = FSWatchdog(watch_dir, event_handler)
watchdog.start()
return watchdog


async def lifespan(app: FastAPI):
# Startup
print("> Starting Client")
args = parse_args()
client_config = load_or_create_config(args)
app.shared_state = initialize_shared_state(client_config)
app.shared_state = SharedState(client_config=client_config)

# Clear the lock file on the first run if it exists
job_file = client_config.config_path.replace(".json", ".sql")
Expand All @@ -394,20 +319,12 @@ async def lifespan(app: FastAPI):
jobstores = {"default": SQLAlchemyJobStore(url=f"sqlite:///{job_file}")}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.start()
app.scheduler = scheduler
atexit.register(stop_scheduler)

app.scheduler = scheduler
app.running_plugins = {}
app.loaded_plugins = load_plugins(client_config)

# Start the watchdog observer in a thread
if not hasattr(app, "watchdog_thread") or not app.watchdog_thread.is_alive():
print("> Starting Watchdog Thread")
watchdog_thread = threading.Thread(
target=start_watchdog, args=(app,), daemon=True
)
watchdog_thread.start()
app.watchdog_thread = watchdog_thread
app.watchdog = start_watchdog(app)

autorun_plugins = ["init", "create_datasite", "sync", "apps"]
# autorun_plugins = ["init", "create_datasite", "sync", "apps"]
Expand All @@ -418,10 +335,7 @@ async def lifespan(app: FastAPI):

print("> Shutting down...")
scheduler.shutdown()

if app.watchdog_thread.is_alive():
app.stop_event.set()
app.watchdog_thread.join()
app.watchdog.stop()


def stop_scheduler():
Expand Down
54 changes: 54 additions & 0 deletions syftbox/client/fsevents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from pathlib import Path
from typing import Callable

from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer

__all__ = [
"FSWatchdog",
"AnyFileSystemEventHandler",
"FSEventCallbacks",
"FileSystemEvent",
]

type FSEventCallbacks = list[Callable[[FileSystemEvent], None]]


class FSWatchdog:
def __init__(self, watch_dir: Path, event_handler: FileSystemEventHandler):
self.watch_dir = watch_dir
self.event_handler = event_handler
self._observer = Observer()

def start(self):
# observer starts it's own thread
self._observer.schedule(
self.event_handler,
self.watch_dir,
recursive=True,
)
self._observer.start()

def stop(self):
self._observer.stop()
self._observer.join()


class AnyFileSystemEventHandler(FileSystemEventHandler):
def __init__(
self,
watch_dir: Path,
callbacks: FSEventCallbacks,
ignored: list[str] = [],
):
self.watch_dir = watch_dir
self.callbacks = callbacks
self.ignored = [Path(self.watch_dir, ignore) for ignore in ignored]

def on_any_event(self, event: FileSystemEvent) -> None:
for ignore in self.ignored:
if event.src_path.startswith(str(ignore)):
return

for cb in self.callbacks:
cb(event)
Empty file.
52 changes: 52 additions & 0 deletions syftbox/client/utils/macos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import subprocess
from pathlib import Path

ASSETS_FOLDER = Path(__file__).parents[2] / "assets"
ICONS_PKG = ASSETS_FOLDER / "icon.zip"


# Function to search for Icon\r file
def search_icon_file(src_path: Path) -> Path:
if not src_path.exists():
return None
for file_path in src_path.iterdir():
if "Icon" in file_path.name and "\r" in file_path.name:
return file_path


# if you knew the pain of this function
def find_icon_file(src_path: Path) -> Path:
# First attempt to find the Icon\r file
icon_file = search_icon_file(src_path)
if icon_file:
return icon_file

if not ICONS_PKG.exists():
# If still not found, raise an error
raise FileNotFoundError(f"{ICONS_PKG} not found")

try:
# cant use other zip tools as they don't unpack it correctly
subprocess.run(
["ditto", "-xk", str(ICONS_PKG), str(src_path.parent)],
check=True,
)

# Try to find the Icon\r file again after extraction
icon_file = search_icon_file(src_path)
if icon_file:
return icon_file
except subprocess.CalledProcessError:
raise RuntimeError("Failed to unzip icon.zip using macOS CLI tool.")


def copy_icon_file(icon_folder: str, dest_folder: str) -> None:
dest_path = Path(dest_folder)
icon_path = Path(icon_folder)
src_icon_path = find_icon_file(icon_path)
if not dest_path.exists():
raise FileNotFoundError(f"Destination folder '{dest_folder}' does not exist.")

# shutil wont work with these special icon files
subprocess.run(["cp", "-p", src_icon_path, dest_folder], check=True)
subprocess.run(["SetFile", "-a", "C", dest_folder], check=True)