diff --git a/me.sh b/me.sh new file mode 100755 index 00000000..791dd5aa --- /dev/null +++ b/me.sh @@ -0,0 +1,2 @@ +#!/bin/bash +uv run syftbox/client/client.py --config_path=./users/me.json --sync_folder=./users/me --email=me@madhavajay.com --port=8085 --server=http://localhost:5001 diff --git a/pyproject.toml b/pyproject.toml index b9ee4898..e80b7d5e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "markdown>=3.7", "pandas>=2.2.2", "setuptools>=75.1.0", + "postmarker>=1.0", ] [build-system] diff --git a/syftbox/client/plugins/sync.py b/syftbox/client/plugins/sync.py index 4259bae3..7500c42b 100644 --- a/syftbox/client/plugins/sync.py +++ b/syftbox/client/plugins/sync.py @@ -16,7 +16,8 @@ ) CLIENT_CHANGELOG_FOLDER = "syft_changelog" -IGNORE_FOLDERS = [CLIENT_CHANGELOG_FOLDER] +STAGING = "staging" +IGNORE_FOLDERS = [CLIENT_CHANGELOG_FOLDER, STAGING] # write operations @@ -125,8 +126,9 @@ def filter_changes( valid_changes.append(change) valid_change_files.append(change.sub_path) continue + # todo we need to handle this properly if perm_file_at_path.read == [user_email]: - if change.internal_path[-10:] == "_.syftperm": + if change.internal_path.endswith("_.syftperm"): # include changes for syft_perm file even if only we have read perms. valid_changes.append(change) valid_change_files.append(change.sub_path) @@ -192,9 +194,9 @@ def pull_changes(client_config, changes): data = None if response.status_code == 200: - print( - f"> {client_config.email} /read {change.kind} {change.internal_path}", - ) + # print( + # f"> {client_config.email} /read {change.kind} {change.internal_path}", + # ) remote_changes.append((ok_change, data)) else: print( @@ -259,7 +261,7 @@ def ascii_for_change(changes) -> str: pipe = "├──" if count == len(changes): pipe = "└──" - change_text += pipe + change + change_text += pipe + change + "\n" return change_text @@ -308,6 +310,7 @@ def sync_up(client_config): # send val changes results = push_changes(client_config, val) + deleted_files = [] changed_files = [] for result in results: @@ -320,10 +323,12 @@ def sync_up(client_config): # combine successful changes qwith old dir state combined_tree = old_dir_state.tree + + # add new successful changes combined_tree.update(synced_dir_state.tree) synced_dir_state.tree = combined_tree - synced_dir_state = delete_files(new_dir_state, deleted_files) + synced_dir_state = delete_files(synced_dir_state, deleted_files) change_text = "" if len(changed_files): @@ -334,8 +339,6 @@ def sync_up(client_config): change_text += f"❌ Syncing Up {len(deleted_files)} Deletes\n" change_text += ascii_for_change(deleted_files) - print(change_text) - synced_dir_state.save(dir_filename) n_changes += len(changed_files) + len(deleted_files) diff --git a/syftbox/lib/lib.py b/syftbox/lib/lib.py index 0d306979..7ef73a57 100644 --- a/syftbox/lib/lib.py +++ b/syftbox/lib/lib.py @@ -7,8 +7,11 @@ import inspect import json import os +import pkgutil import re +import subprocess import sys +import sysconfig import textwrap import types import zlib @@ -20,13 +23,14 @@ from importlib.util import spec_from_loader from pathlib import Path from threading import Lock +from typing import Any from urllib.parse import urlparse import markdown import pandas as pd import pkg_resources import requests -from typing_extensions import Any, Self +from typing_extensions import Self USER_GROUP_GLOBAL = "GLOBAL" @@ -128,6 +132,26 @@ def no_permission(self) -> Self: def mine_with_public_read(self, email: str) -> Self: return SyftPermission(admin=[email], read=[email, "GLOBAL"], write=[email]) + @classmethod + def mine_with_public_write(self, email: str) -> Self: + return SyftPermission( + admin=[email], read=[email, "GLOBAL"], write=[email, "GLOBAL"] + ) + + @classmethod + def theirs_with_my_read(self, their_email, my_email: str) -> Self: + return SyftPermission( + admin=[their_email], read=[their_email, my_email], write=[their_email] + ) + + @classmethod + def theirs_with_my_read_write(self, their_email, my_email: str) -> Self: + return SyftPermission( + admin=[their_email], + read=[their_email, my_email], + write=[their_email, my_email], + ) + def __repr__(self) -> str: string = "SyftPermission:\n" string += f"{self.filepath}\n" @@ -199,11 +223,34 @@ def internal_path(self) -> str: return self.parent_path + "/" + self.sub_path def read(self) -> bytes: - with open(self.full_path, "rb") as f: - return f.read() + if is_symlink(self.full_path): + # write a text file with a syftlink + data = convert_to_symlink(self.full_path).encode("utf-8") + return data + else: + with open(self.full_path, "rb") as f: + return f.read() def write(self, data: bytes) -> bool: - return self.write_to(data, self.full_path) + # if its a syftlink turn it into a symlink + if data.startswith(b"syft://"): + syft_link = SyftLink.from_url(data.decode("utf-8")) + abs_path = os.path.join( + os.path.abspath(self.sync_folder), syft_link.sync_path + ) + if not os.path.exists(abs_path): + raise Exception( + f"Cant make symlink because source doesnt exist {abs_path}" + ) + dir_path = os.path.dirname(self.full_path) + os.makedirs(dir_path, exist_ok=True) + if os.path.exists(self.full_path) and is_symlink(self.full_path): + os.unlink(self.full_path) + os.symlink(abs_path, self.full_path) + + return True + else: + return self.write_to(data, self.full_path) def delete(self) -> bool: try: @@ -235,9 +282,34 @@ class DirState(Jsonable): sub_path: str +def get_symlink(file_path) -> str: + return os.readlink(file_path) + + +def is_symlink(file_path) -> bool: + return os.path.islink(file_path) + + +def symlink_to_syftlink(file_path): + return SyftLink.from_path(file_path) + + +def convert_to_symlink(path): + if not is_symlink(path): + raise Exception(f"Cant convert a non symlink {path}") + abs_path = get_symlink(path) + syft_link = symlink_to_syftlink(abs_path) + return str(syft_link) + + def get_file_hash(file_path: str) -> str: - with open(file_path, "rb") as file: - return hashlib.md5(file.read()).hexdigest() + if is_symlink(file_path): + # return the hash of the syftlink instead + sym_link_string = convert_to_symlink(file_path) + return hashlib.md5(sym_link_string.encode("utf-8")).hexdigest() + else: + with open(file_path, "rb") as file: + return hashlib.md5(file.read()).hexdigest() def ignore_dirs(directory: str, root: str, ignore_folders=None) -> bool: @@ -552,6 +624,16 @@ def use(self): os.environ["SYFTBOX_SYNC_DIR"] = self.sync_folder print(f"> Setting Sync Dir to: {self.sync_folder}") + def create_job_inbox(self): + jobs = Path(self.datasite_path) / "jobs" + inbox = Path(self.datasite_path) / "jobs" / "inbox" + os.makedirs(jobs, exist_ok=True) + os.makedirs(inbox, exist_ok=True) + perm = SyftPermission.mine_with_public_write(self.email) + perm.save(inbox) + print("✅ Job Inbox Created") + return str(os.path.abspath(jobs)) + class SharedState: def __init__(self, client_config: ClientConfig): @@ -1023,7 +1105,6 @@ def exec_module(self, module): self.populate_datasets_module(module) if self.fullname.endswith(".code"): - print("are we hitting this node?") self.populate_code_module(module) def populate_datasets_module(self, module): @@ -1075,6 +1156,7 @@ class TabularDataset(Jsonable): readme_link: SyftLink | None = None loader_link: SyftLink | None = None _client_config: ClientConfig | None = None + has_private: bool = False def _repr_html_(self): output = f"{self.name}\n" @@ -1100,13 +1182,17 @@ def _repr_html_(self): # can also do from df where you specify the destination @classmethod - def from_csv(self, file_path: str, name: str | None = None): + def from_csv( + self, file_path: str, name: str | None = None, has_private: bool = False + ): if name is None: name = os.path.basename(file_path) syft_link = SyftLink.from_path(file_path) df = pd.read_csv(file_path) schema = self.create_schema(df) - return TabularDataset(name=name, syft_link=syft_link, schema=schema) + return TabularDataset( + name=name, syft_link=syft_link, schema=schema, has_private=has_private + ) @property def import_string(self) -> str: @@ -1117,9 +1203,10 @@ def import_string(self) -> str: return string def readme_template(self) -> str: + private = f"\nPrivate data: {self.has_private}\n" if self.has_private else "" readme = f""" # {self.name} - + {private} Schema: {self.schema} ## Import Syntax @@ -1227,6 +1314,7 @@ def _repr_html_(self): table_data.append( { "Name": item.name, + "Private": item.has_private, "Syft Link": "..." + str(item.syft_link)[-20:], "Schema": str(list(item.schema.keys()))[0:100] + "...", "Readme": str(item.readme_link)[-20:], @@ -1342,15 +1430,22 @@ def __call__(self, *args, **kwargs): return self._func(*args, **kwargs) @property - def code(self): + def raw_code(self) -> str: + if self._func: + return self.get_function_source(self._func) + code = "" if self._client_config: code_link = self._client_config.resolve_link(self.syft_link) with open(code_link) as f: code = f.read() + return code + + @property + def code(self): from IPython.display import Markdown - return Markdown(f"```python\n{code}\n```") + return Markdown(f"```python\n{self.raw_code}\n```") def run(self, *args, resolve_private: bool = False, **kwargs): # todo figure out how to override sy_path in the sub code @@ -1377,7 +1472,8 @@ def run(self, *args, resolve_private: bool = False, **kwargs): else: raise Exception("run client_config.use()") - def extract_imports(self, source_code): + @classmethod + def extract_imports(cls, source_code): imports = set() tree = ast.parse(source_code) for node in ast.walk(tree): @@ -1442,7 +1538,268 @@ def file_path(self): if self._client_config: return self._client_config.resolve_link(self.syft_link) + def to_flow( + self, client_config, inputs=None, output=None, template="python", path=None + ) -> str: + if path is None: + path = Path(client_config.sync_folder) / "staging" + os.makedirs(path, exist_ok=True) + if output is None: + output = {} + + if "name" not in output: + output["name"] = "result" + if "format" not in output: + output["format"] = "json" + + their_email = list(inputs.values())[0].syft_link.datasite + if "permission" not in output: + perm = SyftPermission.theirs_with_my_read_write( + their_email=their_email, my_email=client_config.email + ) + output["permission"] = perm + + # create folders + init_flow(client_config, path, self.name, inputs, output, template) + # save main.py + main_code = create_main_py(client_config, inputs, output, self) + + flow_dir = Path(os.path.abspath(f"{path}/{self.name}")) + + main_code_path = flow_dir / "main.py" + with open(main_code_path, "w") as f: + f.write(main_code) + main_shell_path = flow_dir / "run.sh" + main_shell_code = make_run_sh() + with open(main_shell_path, "w") as f: + f.write(main_shell_code) + make_executable(main_shell_path) + return str(flow_dir) + def syftbox_code(func): code = Code.from_func(func) return code + + +def get_syftbox_editable_path(): + commands = [ + ["uv", "pip", "list", "--format=columns"], + ["pip", "list", "--format=columns"], + ] + + for command in commands: + try: + # Run the pip list command and filter for 'syftbox' + result = subprocess.run(command, capture_output=True, text=True, check=True) + for line in result.stdout.splitlines(): + if "syftbox" in line: + parts = line.split() + if ( + len(parts) > 2 and "/" in parts[-1] + ): # Path is typically the last part + return parts[-1] + except subprocess.CalledProcessError: + # Ignore errors and continue with the next command + continue + + return None + + +def make_executable(file_path): + import os + import stat + + current_permissions = os.stat(file_path).st_mode + os.chmod( + file_path, current_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH + ) + + +def make_run_sh() -> str: + return """ +#!/bin/sh +uv run main.py +""" + + +def init_flow( + client_config, + path, + name: str, + inputs: dict[str, Any], + output: dict[str, Any], + template: str = "python", +): + flow_dir = Path(os.path.abspath(f"{path}/{name}")) + os.makedirs(flow_dir, exist_ok=True) + # make inputs + for inp, value in inputs.items(): + inp_path = flow_dir / "inputs" / inp + os.makedirs(inp_path, exist_ok=True) + + if isinstance(value, TabularDataset): + syft_link = value.syft_link + local_path = client_config.resolve_link(syft_link) + filename = os.path.basename(str(syft_link)) + inp_link_path = inp_path / filename + if not os.path.exists(inp_link_path): + os.symlink(local_path, inp_link_path) + if value.has_private: + local_path_private = str(local_path) + ".syftlink" + if os.path.exists(local_path_private): + inp_link_path_private = str(inp_link_path) + ".syftlink" + if not os.path.exists(inp_link_path_private): + os.symlink(local_path_private, inp_link_path_private) + else: + print("Cant read private link?") + # value.syft_link.to_file(inp_link_path) + + # create output + out_format = output["format"] + if out_format != "json": + raise Exception("Only supports json") + out_permission = output["permission"] + out_path = flow_dir / "output" / output["name"] + os.makedirs(out_path, exist_ok=True) + out_permission.save(out_path) + + +def make_input_code(inputs): + code = """ +def input_reader(): + from syftbox.lib import sy_path + import pandas as pd + + inputs = {} +""" + for key, value in inputs.items(): + if isinstance(value, TabularDataset): + path = "./inputs/trade_data/trade_mock.csv" + code += f" inputs['{key}'] = pd.read_csv(sy_path(\"{path}\"))" + code += """ + return inputs +""" + return textwrap.dedent(code) + + +def make_output_code(output): + code = "" + name = output["name"] + if output["format"] == "json": + output_path = f"./output/{name}/{name}.json" + code += f""" +def output_writer({name}): + import json + + with open("{output_path}", "w") as f: + f.write(json.dumps({name})) +""" + return textwrap.dedent(code) + + +def get_standard_lib_modules(): + """Return a set of standard library module names.""" + standard_lib_path = sysconfig.get_path("stdlib") + return {module.name for module in pkgutil.iter_modules([standard_lib_path])} + + +def get_deps(code): + imports = set() + tree = ast.parse(code) + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + imports.add(alias.name.split(".")[0]) + elif isinstance(node, ast.ImportFrom): + imports.add(node.module.split(".")[0]) + + deps = {} + installed_packages = {pkg.key: pkg.version for pkg in pkg_resources.working_set} + standard_lib_modules = get_standard_lib_modules() + + for package in imports: + # Skip standard library packages + if package not in standard_lib_modules: + if package in installed_packages: + deps[package] = installed_packages[package] + else: + deps[package] = "" + + return deps + + +def make_main_code(code_obj): + code = """ +def main(): + print(f"Running: {__name__} from {__author__}") + inputs = input_reader() + print("> Reading Inputs", inputs) +""" + code += f""" + output = {code_obj.clean_name}(**inputs) +""" + code += """ + print("> Writing Outputs", output) + output_writer(output) + print(f"> ✅ Running {__name__} Complete!") + +main() +""" + return textwrap.dedent(code) + + +def make_deps_comments(deps): + code = """ +# /// script +# dependencies = [ +""" + for key, value in deps.items(): + code += f'# "{key}' + if value != "": + code += f"=={value}" + code += '",' + "\n" + code += "# ]\n" + code += "#" + syftbox_path = get_syftbox_editable_path() + if syftbox_path: + code += ( + """ +# [tool.uv.sources] +# syftbox = { path = \"""" + + syftbox_path + + '", editable = true }' + "" + ) + + code += """ +# /// +""" + return textwrap.dedent(code) + + +def create_main_py(client_config, inputs, output, code_obj): + code = "" + code += f"__name__ = '{code_obj.name}'\n" + code += f"__author__ = '{client_config.email}'\n" + code += make_input_code(inputs) + code += "\n" + code += make_output_code(output) + code += "\n" + code += "\n# START YOUR CODE\n" + code += code_obj.raw_code + code += "\n" + code += "\n# END YOUR CODE\n" + code += "\n" + code += make_main_code(code_obj) + code += "\n" + + deps = get_deps(code) + + # prepend + deps_code = make_deps_comments(deps) + code = deps_code + "\n" + code + code += "\n" + + code = textwrap.dedent(code) + return code diff --git a/syftbox/server/server.py b/syftbox/server/server.py index acb45931..f18e6482 100644 --- a/syftbox/server/server.py +++ b/syftbox/server/server.py @@ -173,31 +173,38 @@ async def register(request: Request): @app.post("/write") async def write(request: Request): - data = await request.json() - email = data["email"] - change_dict = data["change"] - change_dict["kind"] = FileChangeKind(change_dict["kind"]) - change = FileChange(**change_dict) - - change.sync_folder = os.path.abspath(SNAPSHOT_FOLDER) - - if change.kind_write: - bin_data = strtobin(data["data"]) - result = change.write(bin_data) - elif change.kind_delete: - result = change.delete() - else: - raise Exception(f"Unknown type of change kind. {change.kind}") - if result: - print(f"> {email} {change.kind}: {change.internal_path}") + try: + data = await request.json() + email = data["email"] + change_dict = data["change"] + change_dict["kind"] = FileChangeKind(change_dict["kind"]) + change = FileChange(**change_dict) + + change.sync_folder = os.path.abspath(SNAPSHOT_FOLDER) + + if change.kind_write: + bin_data = strtobin(data["data"]) + result = change.write(bin_data) + elif change.kind_delete: + result = change.delete() + else: + raise Exception(f"Unknown type of change kind. {change.kind}") + if result: + print(f"> {email} {change.kind}: {change.internal_path}") + return JSONResponse( + {"status": "success", "change": change.to_dict()}, + status_code=200, + ) return JSONResponse( - {"status": "success", "change": change.to_dict()}, - status_code=200, + {"status": "error", "change": change.to_dict()}, + status_code=400, + ) + except Exception as e: + print("Exception writing", e) + return JSONResponse( + {"status": "error", "change": change.to_dict()}, + status_code=400, ) - return JSONResponse( - {"status": "error", "change": change.to_dict()}, - status_code=400, - ) @app.post("/read") diff --git a/symlink.sh b/symlink.sh new file mode 100755 index 00000000..10bcdaf9 --- /dev/null +++ b/symlink.sh @@ -0,0 +1,8 @@ +#!/bin/bash +mkdir -p users/madhava/madhava@openmined.org/a +echo '{"admin": ["madhava@openmined.org"], "read": ["madhava@openmined.org", "GLOBAL"], "write": ["madhava@openmined.org"], "filepath": null}' > users/madhava/madhava@openmined.org/a/_.syftperm +echo "aaa" > users/madhava/madhava@openmined.org/a/a.txt + +mkdir -p users/madhava/madhava@openmined.org/a/b +ln -s users/madhava/madhava@openmined.org/a/a.txt users/madhava/madhava@openmined.org/a/b/b.txt + diff --git a/uv.lock b/uv.lock index b8b8dd74..a8625b6d 100644 --- a/uv.lock +++ b/uv.lock @@ -577,6 +577,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/5f/e351af9a41f866ac3f1fac4ca0613908d9a41741cfcf2228f4ad853b697d/pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669", size = 20556 }, ] +[[package]] +name = "postmarker" +version = "1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "requests", marker = "python_full_version >= '3.12'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/51/79/a527c6e91bc1c6980bc779b83249f59bf1cba8b259147c799934297cc7a8/postmarker-1.0.tar.gz", hash = "sha256:e735303fdf8ede667a1c6e64a95a96e97f0dabbeca726d0ae1f066bdd799fe34", size = 21627 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a1/4b/045369491569fda223d2e0bc821534de14278dd4facdb386405ea17b9d80/postmarker-1.0-py3-none-any.whl", hash = "sha256:0fa49f236c7193650896cbf31bbfac34043e352574c6c7e3e2ad2b954704f064", size = 24134 }, +] + [[package]] name = "psutil" version = "6.0.0" @@ -878,6 +890,7 @@ dependencies = [ { name = "jinja2", marker = "python_full_version >= '3.12'" }, { name = "markdown", marker = "python_full_version >= '3.12'" }, { name = "pandas", marker = "python_full_version >= '3.12'" }, + { name = "postmarker", marker = "python_full_version >= '3.12'" }, { name = "requests", marker = "python_full_version >= '3.12'" }, { name = "setuptools", marker = "python_full_version >= '3.12'" }, { name = "sqlalchemy", marker = "python_full_version >= '3.12'" }, @@ -904,6 +917,7 @@ requires-dist = [ { name = "markdown", specifier = ">=3.7" }, { name = "mypy", marker = "extra == 'dev'" }, { name = "pandas", specifier = ">=2.2.2" }, + { name = "postmarker", specifier = ">=1.0" }, { name = "pytest", marker = "extra == 'dev'" }, { name = "pytest-cov", marker = "extra == 'dev'" }, { name = "pytest-xdist", extras = ["psutil"], marker = "extra == 'dev'" },