Skip to content

Commit

Permalink
Merge pull request #51 from OpenMined/yash/refactor
Browse files Browse the repository at this point in the history
Refactor watchdog
  • Loading branch information
yashgorana authored Oct 8, 2024
2 parents aae6c6d + 65316e9 commit 3fadf13
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 109 deletions.
132 changes: 23 additions & 109 deletions syftbox/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import importlib
import os
import platform
import subprocess
import sys
import threading
import time
import traceback
import types
Expand All @@ -26,9 +24,13 @@
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer

from syftbox.client.fsevents import (
AnyFileSystemEventHandler,
FileSystemEvent,
FSWatchdog,
)
from syftbox.client.utils import macos
from syftbox.lib import ClientConfig, SharedState, validate_email

current_dir = Path(__file__).parent
Expand Down Expand Up @@ -57,57 +59,6 @@ class Plugin:
description: str


# if you knew the pain of this function
def find_icon_file(src_folder: str) -> Path:
src_path = Path(src_folder)

# Function to search for Icon\r file
def search_icon_file():
if os.path.exists(src_folder):
for file_path in src_path.iterdir():
if "Icon" in file_path.name and "\r" in file_path.name:
return file_path
return None

# First attempt to find the Icon\r file
icon_file = search_icon_file()
if icon_file:
return icon_file

# If Icon\r is not found, search for icon.zip and unzip it
zip_file = ASSETS_FOLDER / "icon.zip"

if zip_file.exists():
try:
# cant use other zip tools as they don't unpack it correctly
subprocess.run(
["ditto", "-xk", str(zip_file), str(src_path.parent)],
check=True,
)

# Try to find the Icon\r file again after extraction
icon_file = search_icon_file()
if icon_file:
return icon_file
except subprocess.CalledProcessError:
raise RuntimeError("Failed to unzip icon.zip using macOS CLI tool.")

# If still not found, raise an error
raise FileNotFoundError(
"Icon file with a carriage return not found, and icon.zip did not contain it.",
)


def copy_icon_file(icon_folder: str, dest_folder: str) -> None:
src_icon_path = find_icon_file(icon_folder)
if not os.path.isdir(dest_folder):
raise FileNotFoundError(f"Destination folder '{dest_folder}' does not exist.")

# shutil wont work with these special icon files
subprocess.run(["cp", "-p", src_icon_path, dest_folder], check=True)
subprocess.run(["SetFile", "-a", "C", dest_folder], check=True)


def load_or_create_config(args) -> ClientConfig:
syft_config_dir = os.path.abspath(os.path.expanduser("~/.syftbox"))
os.makedirs(syft_config_dir, exist_ok=True)
Expand Down Expand Up @@ -146,7 +97,7 @@ def load_or_create_config(args) -> ClientConfig:
os.makedirs(client_config.sync_folder, exist_ok=True)

if platform.system() == "Darwin":
copy_icon_file(ICON_FOLDER, client_config.sync_folder)
macos.copy_icon_file(ICON_FOLDER, client_config.sync_folder)

if args.email:
client_config.email = args.email
Expand Down Expand Up @@ -250,14 +201,6 @@ def is_valid_datasite_name(name):
return name.isalnum() or all(c.isalnum() or c in ("-", "_") for c in name)


@dataclass
class Plugin:
name: str
module: types.ModuleType
schedule: int
description: str


# API Models
class PluginRequest(BaseModel):
plugin_name: str
Expand Down Expand Up @@ -343,45 +286,27 @@ def parse_args():
return parser.parse_args()


def start_watchdog(app):
shared_state = app.shared_state
sync_folder = shared_state.client_config.sync_folder

stop_event = threading.Event()
app.stop_event = stop_event

class AnyFileSystemEventHandler(FileSystemEventHandler):
def on_any_event(self, event: FileSystemEvent) -> None:
for ignore in WATCHDOG_IGNORE:
full_path = shared_state.client_config.sync_folder + "/" + ignore
if event.src_path.startswith(full_path):
return
run_plugin("sync", event)
def start_watchdog(app) -> FSWatchdog:
def sync_on_event(event: FileSystemEvent):
run_plugin("sync", event)

event_handler = AnyFileSystemEventHandler()
observer = Observer()
observer.schedule(event_handler, sync_folder, recursive=True)
observer.start()

# Run observer in a thread to keep the process alive
try:
while not stop_event.is_set():
time.sleep(1)
except KeyboardInterrupt:
print("> Watchdog received KeyboardInterrupt")
finally:
print("> Stopping Watchdog...")
observer.stop()
observer.join()
print("> Watchdog stopped")
watch_dir = Path(app.shared_state.client_config.sync_folder)
event_handler = AnyFileSystemEventHandler(
watch_dir,
callbacks=[sync_on_event],
ignored=WATCHDOG_IGNORE,
)
watchdog = FSWatchdog(watch_dir, event_handler)
watchdog.start()
return watchdog


async def lifespan(app: FastAPI):
# Startup
print("> Starting Client")
args = parse_args()
client_config = load_or_create_config(args)
app.shared_state = initialize_shared_state(client_config)
app.shared_state = SharedState(client_config=client_config)

# Clear the lock file on the first run if it exists
job_file = client_config.config_path.replace(".json", ".sql")
Expand All @@ -394,20 +319,12 @@ async def lifespan(app: FastAPI):
jobstores = {"default": SQLAlchemyJobStore(url=f"sqlite:///{job_file}")}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.start()
app.scheduler = scheduler
atexit.register(stop_scheduler)

app.scheduler = scheduler
app.running_plugins = {}
app.loaded_plugins = load_plugins(client_config)

# Start the watchdog observer in a thread
if not hasattr(app, "watchdog_thread") or not app.watchdog_thread.is_alive():
print("> Starting Watchdog Thread")
watchdog_thread = threading.Thread(
target=start_watchdog, args=(app,), daemon=True
)
watchdog_thread.start()
app.watchdog_thread = watchdog_thread
app.watchdog = start_watchdog(app)

autorun_plugins = ["init", "create_datasite", "sync", "apps"]
# autorun_plugins = ["init", "create_datasite", "sync", "apps"]
Expand All @@ -418,10 +335,7 @@ async def lifespan(app: FastAPI):

print("> Shutting down...")
scheduler.shutdown()

if app.watchdog_thread.is_alive():
app.stop_event.set()
app.watchdog_thread.join()
app.watchdog.stop()


def stop_scheduler():
Expand Down
54 changes: 54 additions & 0 deletions syftbox/client/fsevents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from pathlib import Path
from typing import Callable

from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer

__all__ = [
"FSWatchdog",
"AnyFileSystemEventHandler",
"FSEventCallbacks",
"FileSystemEvent",
]

type FSEventCallbacks = list[Callable[[FileSystemEvent], None]]


class FSWatchdog:
def __init__(self, watch_dir: Path, event_handler: FileSystemEventHandler):
self.watch_dir = watch_dir
self.event_handler = event_handler
self._observer = Observer()

def start(self):
# observer starts it's own thread
self._observer.schedule(
self.event_handler,
self.watch_dir,
recursive=True,
)
self._observer.start()

def stop(self):
self._observer.stop()
self._observer.join()


class AnyFileSystemEventHandler(FileSystemEventHandler):
def __init__(
self,
watch_dir: Path,
callbacks: FSEventCallbacks,
ignored: list[str] = [],
):
self.watch_dir = watch_dir
self.callbacks = callbacks
self.ignored = [Path(self.watch_dir, ignore) for ignore in ignored]

def on_any_event(self, event: FileSystemEvent) -> None:
for ignore in self.ignored:
if event.src_path.startswith(str(ignore)):
return

for cb in self.callbacks:
cb(event)
Empty file.
52 changes: 52 additions & 0 deletions syftbox/client/utils/macos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import subprocess
from pathlib import Path

ASSETS_FOLDER = Path(__file__).parents[2] / "assets"
ICONS_PKG = ASSETS_FOLDER / "icon.zip"


# Function to search for Icon\r file
def search_icon_file(src_path: Path) -> Path:
if not src_path.exists():
return None
for file_path in src_path.iterdir():
if "Icon" in file_path.name and "\r" in file_path.name:
return file_path


# if you knew the pain of this function
def find_icon_file(src_path: Path) -> Path:
# First attempt to find the Icon\r file
icon_file = search_icon_file(src_path)
if icon_file:
return icon_file

if not ICONS_PKG.exists():
# If still not found, raise an error
raise FileNotFoundError(f"{ICONS_PKG} not found")

try:
# cant use other zip tools as they don't unpack it correctly
subprocess.run(
["ditto", "-xk", str(ICONS_PKG), str(src_path.parent)],
check=True,
)

# Try to find the Icon\r file again after extraction
icon_file = search_icon_file(src_path)
if icon_file:
return icon_file
except subprocess.CalledProcessError:
raise RuntimeError("Failed to unzip icon.zip using macOS CLI tool.")


def copy_icon_file(icon_folder: str, dest_folder: str) -> None:
dest_path = Path(dest_folder)
icon_path = Path(icon_folder)
src_icon_path = find_icon_file(icon_path)
if not dest_path.exists():
raise FileNotFoundError(f"Destination folder '{dest_folder}' does not exist.")

# shutil wont work with these special icon files
subprocess.run(["cp", "-p", src_icon_path, dest_folder], check=True)
subprocess.run(["SetFile", "-a", "C", dest_folder], check=True)

0 comments on commit 3fadf13

Please sign in to comment.