From 5abfb8f7f381e9dcac6ae712af10b323874b766c Mon Sep 17 00:00:00 2001 From: Niklas Date: Wed, 18 Sep 2024 08:03:35 +0200 Subject: [PATCH 1/9] =?UTF-8?q?Feature/SK-996=20|=C2=A0=20(#702)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fedn/common/config.py | 1 - fedn/network/api/interface.py | 21 ++++--------------- .../storage/statestore/mongostatestore.py | 14 +++++++++++-- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/fedn/common/config.py b/fedn/common/config.py index 517e57d94..ceb2fe65a 100644 --- a/fedn/common/config.py +++ b/fedn/common/config.py @@ -16,7 +16,6 @@ FEDN_CUSTOM_URL_PREFIX = os.environ.get("FEDN_CUSTOM_URL_PREFIX", "") -FEDN_ALLOW_LOCAL_PACKAGE = os.environ.get("FEDN_ALLOW_LOCAL_PACKAGE", False) FEDN_PACKAGE_EXTRACT_DIR = os.environ.get("FEDN_PACKAGE_EXTRACT_DIR", "package") diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 3ffaadecf..1e31cb5eb 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -7,7 +7,7 @@ from werkzeug.security import safe_join from werkzeug.utils import secure_filename -from fedn.common.config import FEDN_ALLOW_LOCAL_PACKAGE, get_controller_config, get_network_config +from fedn.common.config import get_controller_config, get_network_config from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString @@ -522,10 +522,6 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name, package): :return: A json response with combiner assignment config. :rtype: :class:`flask.Response` """ - local_package = FEDN_ALLOW_LOCAL_PACKAGE - if local_package: - local_package = True - if package == "remote": package_object = self.statestore.get_compute_package() if package_object is None: @@ -540,18 +536,8 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name, package): 203, ) helper_type = self.control.statestore.get_helper() - elif package == "local" and local_package is False: - print("Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.") - return ( - jsonify( - { - "success": False, - "message": "Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.", - } - ), - 400, - ) - elif package == "local" and local_package is True: + else: + # Else package is "local": helper_type = "" # Assign client to combiner @@ -582,6 +568,7 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name, package): "combiner": combiner.name, "ip": remote_addr, "status": "available", + "package": package, } # Add client to network self.control.network.add_client(client_config) diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 3ef204b5c..3886a3db7 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -738,12 +738,22 @@ def set_client(self, client_data): """ client_data["updated_at"] = str(datetime.now()) try: - self.clients.update_one({"client_id": client_data["client_id"]}, {"$set": client_data}, True) + # self.clients.update_one({"client_id": client_data["client_id"]}, {"$set": client_data}, True) + self.clients.update_one( + {"client_id": client_data["client_id"]}, + {"$set": {k: v for k, v in client_data.items() if v is not None}}, + upsert=True + ) except KeyError: # If client_id is not present, use name as identifier, for backwards compatibility id = str(uuid.uuid4()) client_data["client_id"] = id - self.clients.update_one({"name": client_data["name"]}, {"$set": client_data}, True) + # self.clients.update_one({"name": client_data["name"]}, {"$set": client_data}, True) + self.clients.update_one( + {"client_id": client_data["client_id"]}, + {"$set": {k: v for k, v in client_data.items() if v is not None}}, + upsert=True + ) def get_client(self, client_id): """Get client by client_id. From e967b4abd7c2ffd5f89fe41be9c4bfb810fbf38d Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 19 Sep 2024 10:37:21 +0200 Subject: [PATCH 2/9] Feature/SK-1039 | Add upload, activate & deactivate package to fedn api v1 (#706) --- fedn/common/config.py | 2 + fedn/network/api/interface.py | 14 +- fedn/network/api/v1/model_routes.py | 16 +- fedn/network/api/v1/package_routes.py | 157 +++++++++++++++++- fedn/network/api/v1/shared.py | 21 ++- .../storage/statestore/stores/model_store.py | 2 +- .../statestore/stores/package_store.py | 140 +++++++++++++--- 7 files changed, 309 insertions(+), 43 deletions(-) diff --git a/fedn/common/config.py b/fedn/common/config.py index ceb2fe65a..e2b994b48 100644 --- a/fedn/common/config.py +++ b/fedn/common/config.py @@ -18,6 +18,8 @@ FEDN_PACKAGE_EXTRACT_DIR = os.environ.get("FEDN_PACKAGE_EXTRACT_DIR", "package") +FEDN_COMPUTE_PACKAGE_DIR = os.environ.get("FEDN_COMPUTE_PACKAGE_DIR", "/app/client/package/") + def get_environment_config(): """Get the configuration from environment variables.""" diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 1e31cb5eb..bc3f0ee79 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -7,7 +7,7 @@ from werkzeug.security import safe_join from werkzeug.utils import secure_filename -from fedn.common.config import get_controller_config, get_network_config +from fedn.common.config import FEDN_COMPUTE_PACKAGE_DIR, get_controller_config, get_network_config from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString @@ -230,7 +230,7 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript file_name = file.filename storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") - file_path = safe_join(os.getcwd(), storage_file_name) + file_path = safe_join(FEDN_COMPUTE_PACKAGE_DIR, storage_file_name) file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) @@ -370,22 +370,20 @@ def download_compute_package(self, name): try: mutex = threading.Lock() mutex.acquire() - # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory(os.getcwd(), name, as_attachment=True) + + return send_from_directory(FEDN_COMPUTE_PACKAGE_DIR, name, as_attachment=True) except Exception: try: data = self.control.get_compute_package(name) # TODO: make configurable, perhaps in config.py or package.py - file_path = safe_join(os.getcwd(), name) + file_path = safe_join(FEDN_COMPUTE_PACKAGE_DIR, name) with open(file_path, "wb") as fh: fh.write(data) # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory(os.getcwd(), name, as_attachment=True) + return send_from_directory(FEDN_COMPUTE_PACKAGE_DIR, name, as_attachment=True) except Exception: raise finally: - # Delete the file after it has been saved - os.remove(file_path) mutex.release() def _create_checksum(self, name=None): diff --git a/fedn/network/api/v1/model_routes.py b/fedn/network/api/v1/model_routes.py index f227443e0..db52fae7e 100644 --- a/fedn/network/api/v1/model_routes.py +++ b/fedn/network/api/v1/model_routes.py @@ -5,19 +5,13 @@ from fedn.network.api.auth import jwt_auth_required from fedn.network.api.shared import modelstorage_config -from fedn.network.api.v1.shared import api_version, get_limit, get_post_data_to_kwargs, get_reverse, get_typed_list_headers, mdb -from fedn.network.storage.s3.base import RepositoryBase -from fedn.network.storage.s3.miniorepository import MINIORepository +from fedn.network.api.v1.shared import api_version, get_limit, get_post_data_to_kwargs, get_reverse, get_typed_list_headers, mdb, minio_repository from fedn.network.storage.statestore.stores.model_store import ModelStore from fedn.network.storage.statestore.stores.shared import EntityNotFound bp = Blueprint("model", __name__, url_prefix=f"/api/{api_version}/models") model_store = ModelStore(mdb, "control.model") -repository: RepositoryBase = None - -if modelstorage_config["storage_type"] == "S3": - repository = MINIORepository(modelstorage_config["storage_config"]) @bp.route("/", methods=["GET"]) @@ -631,11 +625,11 @@ def download(id: str): type: string """ try: - if repository is not None: + if minio_repository is not None: model = model_store.get(id, use_typing=False) model_id = model["model"] - file = repository.get_artifact_stream(model_id, modelstorage_config["storage_config"]["storage_bucket"]) + file = minio_repository.get_artifact_stream(model_id, modelstorage_config["storage_config"]["storage_bucket"]) return send_file(file, as_attachment=True, download_name=model_id) else: @@ -685,11 +679,11 @@ def get_parameters(id: str): type: string """ try: - if repository is not None: + if minio_repository is not None: model = model_store.get(id, use_typing=False) model_id = model["model"] - file = repository.get_artifact_stream(model_id, modelstorage_config["storage_config"]["storage_bucket"]) + file = minio_repository.get_artifact_stream(model_id, modelstorage_config["storage_config"]["storage_bucket"]) file_bytes = io.BytesIO() for chunk in file.stream(32 * 1024): diff --git a/fedn/network/api/v1/package_routes.py b/fedn/network/api/v1/package_routes.py index 7add1a220..0d559c2fa 100644 --- a/fedn/network/api/v1/package_routes.py +++ b/fedn/network/api/v1/package_routes.py @@ -1,7 +1,11 @@ +import os + from flask import Blueprint, jsonify, request +from werkzeug.security import safe_join +from fedn.common.config import FEDN_COMPUTE_PACKAGE_DIR from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, get_use_typing, mdb +from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, get_use_typing, mdb, repository from fedn.network.storage.statestore.stores.package_store import PackageStore from fedn.network.storage.statestore.stores.shared import EntityNotFound @@ -422,6 +426,155 @@ def get_active_package(): return jsonify(response), 200 except EntityNotFound: - return jsonify({"message": f"Entity with id: {id} not found"}), 404 + return jsonify({"message": "Entity not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/active", methods=["PUT"]) +@jwt_auth_required(role="admin") +def set_active_package(): + """Set active package + Sets the active package + --- + tags: + - Packages + responses: + 200: + description: The package was set as active + schema: + type: object + properties: + message: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + data = request.json + package_id = data["id"] + package_store.set_active(package_id) + return jsonify({"message": "Active package set"}), 200 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/active", methods=["DELETE"]) +@jwt_auth_required(role="admin") +def delete_active_package(): + """Delete active package + Deletes the active package + --- + tags: + - Packages + responses: + 200: + description: The active package was deleted + schema: + type: object + properties: + message: + type: string + 404: + description: There was no active package present + schema: + type: object + properties: + message: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + package_store.delete_active() + return jsonify({"message": "Active package deleted"}), 200 + except EntityNotFound: + return jsonify({"message": "Entity not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/", methods=["POST"]) +@jwt_auth_required(role="admin") +def upload_package(): + """Upload a package + Uploads a package to the system. The package is stored in the database and the file is stored in the file system. + --- + tags: + - Packages + requestBody: + required: true + content: + multipart/form-data: + schema: + type: object + properties: + name: + type: string + description: The name of the package + description: + type: string + description: The description of the package + file: + type: string + format: binary + description: The package file + helper: + type: string + description: The helper setting for the package + file_name: + type: string + description: The display name of the file + responses: + 200: + description: The package was uploaded + schema: + type: object + properties: + message: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + data = request.form.to_dict() + file = request.files["file"] + file_name = file.filename + + data["file_name"] = file_name + + valid, response = package_store.add(data) + + if not valid: + return jsonify({"message": response}), 400 + + storage_file_name = response["storage_file_name"] + try: + file_path = safe_join(FEDN_COMPUTE_PACKAGE_DIR, storage_file_name) + if not os.path.exists(FEDN_COMPUTE_PACKAGE_DIR): + os.makedirs(FEDN_COMPUTE_PACKAGE_DIR, exist_ok=True) + file.save(file_path) + repository.set_compute_package(storage_file_name, file_path) + except Exception: + package_store.delete(response["id"]) + return jsonify({"message": "An unexpected error occurred"}), 500 + + package_store.set_active(response["id"]) + return jsonify({"message": "Package uploaded"}), 200 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/shared.py b/fedn/network/api/v1/shared.py index 0fda39c45..e9d8af937 100644 --- a/fedn/network/api/v1/shared.py +++ b/fedn/network/api/v1/shared.py @@ -3,7 +3,10 @@ import pymongo from pymongo.database import Database -from fedn.network.api.shared import network_id, statestore_config +from fedn.network.api.shared import modelstorage_config, network_id, statestore_config +from fedn.network.storage.s3.base import RepositoryBase +from fedn.network.storage.s3.miniorepository import MINIORepository +from fedn.network.storage.s3.repository import Repository from fedn.network.storage.statestore.stores.client_store import ClientStore api_version = "v1" @@ -13,6 +16,22 @@ client_store = ClientStore(mdb, "network.clients") +minio_repository: RepositoryBase = None + +if modelstorage_config["storage_type"] == "S3": + minio_repository = MINIORepository(modelstorage_config["storage_config"]) + + +storage_collection = mdb["network.storage"] + +storage_config = storage_collection.find_one({"status": "enabled"}, projection={"_id": False}) + +repository: RepositoryBase = None + +if storage_config["storage_type"] == "S3": + repository = Repository(storage_config["storage_config"]) + + def is_positive_integer(s): return s is not None and s.isdigit() and int(s) > 0 diff --git a/fedn/network/storage/statestore/stores/model_store.py b/fedn/network/storage/statestore/stores/model_store.py index 3048f2a26..c923f6d4a 100644 --- a/fedn/network/storage/statestore/stores/model_store.py +++ b/fedn/network/storage/statestore/stores/model_store.py @@ -63,7 +63,7 @@ def _validate(self, item: Model) -> Tuple[bool, str]: return True, "" - def _complement(self, item: Model) -> Model: + def _complement(self, item: Model): if "key" not in item or item["key"] is None: item["key"] = "models" diff --git a/fedn/network/storage/statestore/stores/package_store.py b/fedn/network/storage/statestore/stores/package_store.py index 6649e00e5..5c777cc55 100644 --- a/fedn/network/storage/statestore/stores/package_store.py +++ b/fedn/network/storage/statestore/stores/package_store.py @@ -1,8 +1,11 @@ +import uuid from datetime import datetime from typing import Any, Dict, List, Tuple import pymongo +from bson import ObjectId from pymongo.database import Database +from werkzeug.utils import secure_filename from fedn.network.storage.statestore.stores.store import Store @@ -11,16 +14,7 @@ class Package: def __init__( - self, - id: str, - key: str, - committed_at: datetime, - description: str, - file_name: str, - helper: str, - name: str, - storage_file_name: str, - active: bool = False + self, id: str, key: str, committed_at: datetime, description: str, file_name: str, helper: str, name: str, storage_file_name: str, active: bool = False ): self.key = key self.committed_at = committed_at @@ -47,7 +41,7 @@ def from_dict(data: dict, active_package: dict) -> "Package": helper=data["helper"] if "helper" in data else None, name=data["name"] if "name" in data else None, storage_file_name=data["storage_file_name"] if "storage_file_name" in data else None, - active=active + active=active, ) @@ -78,27 +72,136 @@ def get(self, id: str, use_typing: bool = False) -> Package: return Package.from_dict(document, response_active) + def _validate(self, item: Package) -> Tuple[bool, str]: + if "file_name" not in item or not item["file_name"]: + return False, "File name is required" + + if not self._allowed_file_extension(item["file_name"]): + return False, "File extension not allowed" + + if "helper" not in item or not item["helper"]: + return False, "Helper is required" + + return True, "" + + def _complement(self, item: Package): + if "id" not in item or item.id is None: + item["id"] = str(uuid.uuid4()) + + if "key" not in item or item.key is None: + item["key"] = "package_trail" + + if "committed_at" not in item or item.committed_at is None: + item["committed_at"] = datetime.now() + + extension = item["file_name"].rsplit(".", 1)[1].lower() + + if "storage_file_name" not in item or item.storage_file_name is None: + storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") + item["storage_file_name"] = storage_file_name + + def set_active(self, id: str) -> bool: + """Set the active entity + param id: The id of the entity + type: str + return: Whether the operation was successful + """ + kwargs = {"_id": ObjectId(id)} if ObjectId.is_valid(id) else {"id": id} + kwargs["key"] = "package_trail" + + document = self.database[self.collection].find_one(kwargs) + + if document is None: + raise EntityNotFound(f"Entity with id {id} not found") + + committed_at = datetime.now() + obj_to_insert = { + "key": "active", + "id": document["id"], + "committed_at": committed_at, + "description": document["description"], + "file_name": document["file_name"], + "helper": document["helper"], + "name": document["name"], + "storage_file_name": document["storage_file_name"], + } + + self.database[self.collection].update_one({"key": "active"}, {"$set": obj_to_insert}, upsert=True) + + return True + def get_active(self, use_typing: bool = False) -> Package: """Get the active entity param use_typing: Whether to return the entity as a typed object or as a dict type: bool return: The entity """ - response = self.database[self.collection].find_one({"key": "active"}) + kwargs = {"key": "active"} + response = self.database[self.collection].find_one(kwargs) if response is None: - raise EntityNotFound(f"Entity with id {id} not found") + raise EntityNotFound("Entity not found") return Package.from_dict(response, response) if use_typing else from_document(response) + def _allowed_file_extension(self, filename: str, ALLOWED_EXTENSIONS={"gz", "bz2", "tar", "zip", "tgz"}) -> bool: + """Check if file extension is allowed. + + :param filename: The filename to check. + :type filename: str + :return: True and extension str if file extension is allowed, else False and None. + :rtype: Tuple (bool, str) + """ + if "." in filename: + extension = filename.rsplit(".", 1)[1].lower() + if extension in ALLOWED_EXTENSIONS: + return True + + return False + def update(self, id: str, item: Package) -> bool: raise NotImplementedError("Update not implemented for PackageStore") - def add(self, item: Package)-> Tuple[bool, Any]: - raise NotImplementedError("Add not implemented for PackageStore") + def add(self, item: Package) -> Tuple[bool, Any]: + valid, message = self._validate(item) + if not valid: + return False, message + + self._complement(item) + + return super().add(item) def delete(self, id: str) -> bool: - raise NotImplementedError("Delete not implemented for PackageStore") + kwargs = {"_id": ObjectId(id)} if ObjectId.is_valid(id) else {"id": id} + kwargs["key"] = "package_trail" + document = self.database[self.collection].find_one(kwargs) + + if document is None: + raise EntityNotFound(f"Entity with (id) {id} not found") + + result = super().delete(document["_id"]) + + if not result: + return False + + kwargs["key"] = "active" + + document_active = self.database[self.collection].find_one(kwargs) + + if document_active is not None: + return super().delete(document_active["_id"]) + + return True + + def delete_active(self): + kwargs = {"key": "active"} + + document_active = self.database[self.collection].find_one(kwargs) + + if document_active is None: + raise EntityNotFound("Entity not found") + + return super().delete(document_active["_id"]) def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDING, use_typing: bool = False, **kwargs) -> Dict[int, List[Package]]: """List entities @@ -127,10 +230,7 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI result = [Package.from_dict(item, response_active) for item in response["result"]] - return { - "count": response["count"], - "result": result - } + return {"count": response["count"], "result": result} def count(self, **kwargs) -> int: kwargs["key"] = "package_trail" From bcb17107411b83cb25a15a25f11e12e89d7eb5f5 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 23 Sep 2024 08:25:41 +0200 Subject: [PATCH 3/9] added set current model to /api/v1 (#708) --- fedn/network/api/v1/model_routes.py | 41 +++++++++++++++++++ fedn/network/api/v1/package_routes.py | 8 +++- .../storage/statestore/stores/model_store.py | 32 ++++++++++++--- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/fedn/network/api/v1/model_routes.py b/fedn/network/api/v1/model_routes.py index db52fae7e..9a3abdc47 100644 --- a/fedn/network/api/v1/model_routes.py +++ b/fedn/network/api/v1/model_routes.py @@ -736,3 +736,44 @@ def get_active_model(): return jsonify({"message": "No active model found"}), 404 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/active", methods=["PUT"]) +@jwt_auth_required(role="admin") +def set_active_model(): + """Set active model + Sets the active model (id). + --- + tags: + - Models + parameters: + - name: model + in: body + required: true + type: object + description: The model data to update + responses: + 200: + description: The updated active model id + schema: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + data = request.get_json() + model_id = data["id"] + + response = model_store.set_active(model_id) + + if response: + return jsonify({"message": "Active model set"}), 200 + else: + return jsonify({"message": "Failed to set active model"}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/package_routes.py b/fedn/network/api/v1/package_routes.py index 0d559c2fa..d844a93ba 100644 --- a/fedn/network/api/v1/package_routes.py +++ b/fedn/network/api/v1/package_routes.py @@ -458,8 +458,12 @@ def set_active_package(): try: data = request.json package_id = data["id"] - package_store.set_active(package_id) - return jsonify({"message": "Active package set"}), 200 + response = package_store.set_active(package_id) + + if response: + return jsonify({"message": "Active package set"}), 200 + else: + return jsonify({"message": "Active package not set"}), 500 except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/storage/statestore/stores/model_store.py b/fedn/network/storage/statestore/stores/model_store.py index c923f6d4a..27efcc9a3 100644 --- a/fedn/network/storage/statestore/stores/model_store.py +++ b/fedn/network/storage/statestore/stores/model_store.py @@ -26,7 +26,7 @@ def from_dict(data: dict) -> "Model": model=data["model"] if "model" in data else None, parent_model=data["parent_model"] if "parent_model" in data else None, session_id=data["session_id"] if "session_id" in data else None, - committed_at=data["committed_at"] if "committed_at" in data else None + committed_at=data["committed_at"] if "committed_at" in data else None, ) @@ -76,7 +76,7 @@ def update(self, id: str, item: Model) -> Tuple[bool, Any]: return super().update(id, item) - def add(self, item: Model)-> Tuple[bool, Any]: + def add(self, item: Model) -> Tuple[bool, Any]: raise NotImplementedError("Add not implemented for ModelStore") def delete(self, id: str) -> bool: @@ -104,10 +104,7 @@ def list(self, limit: int, skip: int, sort_key: str, sort_order=pymongo.DESCENDI response = super().list(limit, skip, sort_key or "committed_at", sort_order, use_typing=use_typing, **kwargs) result = [Model.from_dict(item) for item in response["result"]] if use_typing else response["result"] - return { - "count": response["count"], - "result": result - } + return {"count": response["count"], "result": result} def list_descendants(self, id: str, limit: int, use_typing: bool = False) -> List[Model]: """List descendants @@ -220,3 +217,26 @@ def get_active(self) -> str: raise EntityNotFound("Active model not found") return active_model["model"] + + def set_active(self, id: str) -> bool: + """Set the active model + param id: The id of the entity + type: str + description: The id of the entity, can be either the id or the model (property) + return: True if successful + """ + kwargs = {"key": "models"} + if ObjectId.is_valid(id): + id_obj = ObjectId(id) + kwargs["_id"] = id_obj + else: + kwargs["model"] = id + + model = self.database[self.collection].find_one(kwargs) + + if model is None: + raise EntityNotFound(f"Entity with (id | model) {id} not found") + + self.database[self.collection].update_one({"key": "current_model"}, {"$set": {"model": model["model"]}}) + + return True From 67c6c6ba0b80ce1e720fd462066cb1f36ad45916 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Mon, 23 Sep 2024 10:44:47 +0200 Subject: [PATCH 4/9] Bugfix/SK-1048 | Logs about grpc stream and heartbeat back online (#707) --- fedn/network/clients/client.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index 8508291ff..6b88503d4 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -407,9 +407,13 @@ def _listen_to_task_stream(self): r.sender.client_id = self.id # Add client to metadata self._add_grpc_metadata("client", self.name) + status_code = None while self._connected: try: + if status_code == grpc.StatusCode.UNAVAILABLE: + logger.info("GRPC TaskStream: server available again.") + status_code = None for request in self.combinerStub.TaskStream(r, metadata=self.metadata): if request: logger.debug("Received model update request from combiner: {}.".format(request)) @@ -444,6 +448,7 @@ def _listen_to_task_stream(self): logger.warning("GRPC TaskStream: server unavailable during model update request stream. Retrying.") # Retry after a delay time.sleep(5) + continue if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == "Token expired": @@ -750,6 +755,8 @@ def _send_heartbeat(self, update_frequency=2.0): heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER, client_id=self.id)) try: self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata) + if self._missed_heartbeat > 0: + logger.info("GRPC heartbeat: combiner available again after {} missed heartbeats.".format(self._missed_heartbeat)) self._missed_heartbeat = 0 except grpc.RpcError as e: status_code = e.code() From c4b60b14409d6b7034849beb3261d656154ae141 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 24 Sep 2024 13:33:37 +0200 Subject: [PATCH 5/9] Feature/SK-948 | Combiner config flags in fedn client (#709) --- fedn/cli/client_cmd.py | 29 ++++++++++++++++++++++------- fedn/network/clients/client.py | 32 ++++++++++++++++++-------------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/fedn/cli/client_cmd.py b/fedn/cli/client_cmd.py index 8a68399ed..fb124dcf6 100644 --- a/fedn/cli/client_cmd.py +++ b/fedn/cli/client_cmd.py @@ -3,11 +3,10 @@ import click import requests -from fedn.common.exceptions import InvalidClientConfig -from fedn.network.clients.client import Client - from fedn.cli.main import main from fedn.cli.shared import CONTROLLER_DEFAULTS, apply_config, get_api_url, get_token, print_response +from fedn.common.exceptions import InvalidClientConfig +from fedn.network.clients.client import Client def validate_client_config(config): @@ -17,9 +16,12 @@ def validate_client_config(config): """ try: if config["discover_host"] is None or config["discover_host"] == "": - raise InvalidClientConfig("Missing required configuration: discover_host") + if config["combiner"] is None or config["combiner"] == "": + raise InvalidClientConfig("Missing required configuration: discover_host or combiner") if "discover_port" not in config.keys(): config["discover_port"] = None + if config["remote_compute_context"] and config["discover_host"] is None: + raise InvalidClientConfig("Remote compute context requires discover_host") except Exception: raise InvalidClientConfig("Could not load config from file. Check config") @@ -27,8 +29,7 @@ def validate_client_config(config): @main.group("client") @click.pass_context def client_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass @@ -79,7 +80,10 @@ def list_clients(ctx, protocol: str, host: str, port: str, token: str = None, n_ @click.option("-s", "--secure", required=False, default=False) @click.option("-pc", "--preshared-cert", required=False, default=False) @click.option("-v", "--verify", is_flag=True, help="Verify SSL/TLS for REST service") -@click.option("-c", "--preferred-combiner", type=str,required=False, default="",help="name of the preferred combiner") +@click.option("-c", "--preferred-combiner", type=str, required=False, default="", help="name of the preferred combiner") +@click.option("--combiner", type=str, required=False, default="", help="Skip combiner assignment from discover service and attatch directly to combiner host.") +@click.option("--combiner-port", type=str, required=False, default="12080", help="Combiner port, need to be used with --combiner") +@click.option("--proxy-server", type=str, required=False, default="", help="gRPC proxy server, need to be used together with --combiner") @click.option("-va", "--validator", required=False, default=True) @click.option("-tr", "--trainer", required=False, default=True) @click.option("-in", "--init", required=False, default=None, help="Set to a filename to (re)init client from file state.") @@ -102,6 +106,9 @@ def client_cmd( preshared_cert, verify, preferred_combiner, + combiner, + combiner_port, + proxy_server, validator, trainer, init, @@ -143,6 +150,9 @@ def client_cmd( "preshared_cert": preshared_cert, "verify": verify, "preferred_combiner": preferred_combiner, + "combiner": combiner, + "combiner_port": combiner_port, + "proxy_server": proxy_server, "validator": validator, "trainer": trainer, "logfile": logfile, @@ -156,6 +166,11 @@ def client_cmd( click.echo(f"\nClient configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + # proxy_server needs combiner check + if config["proxy_server"]: + if not config["combiner"]: + click.echo("--proxy-server/proxy_server requires a combiner host in --combiner/combiner") + return try: validate_client_config(config) except InvalidClientConfig as e: diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index 6b88503d4..b70ff9975 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -62,18 +62,6 @@ def __init__(self, config): self.id = config["client_id"] or str(uuid.uuid4()) - self.connector = ConnectorClient( - host=config["discover_host"], - port=config["discover_port"], - token=config["token"], - name=config["name"], - remote_package=config["remote_compute_context"], - force_ssl=config["force_ssl"], - verify=config["verify"], - combiner=config["preferred_combiner"], - id=self.id, - ) - # Validate client name match = re.search(VALID_NAME_REGEX, config["name"]) if not match: @@ -94,8 +82,24 @@ def __init__(self, config): self.inbox = queue.Queue() - # Attach to the FEDn network (get combiner) - combiner_config = self.assign() + # Attach to the FEDn network (get combiner or attach directly) + if config["combiner"]: + combiner_config = {"status": "assigned", "host": config["combiner"], "port": config["combiner_port"], "helper_type": ""} + if config["proxy_server"]: + combiner_config["fqdn"] = config["proxy_server"] + else: + self.connector = ConnectorClient( + host=config["discover_host"], + port=config["discover_port"], + token=config["token"], + name=config["name"], + remote_package=config["remote_compute_context"], + force_ssl=config["force_ssl"], + verify=config["verify"], + combiner=config["preferred_combiner"], + id=self.id, + ) + combiner_config = self.assign() self.connect(combiner_config) self._initialize_dispatcher(self.config) From bf3c30f8c7c938a18293074ba3e607dba4704ccc Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Wed, 25 Sep 2024 10:53:10 +0200 Subject: [PATCH 6/9] Bugfix/SK-1063 | Update sys_platform for windows in environment markers (#710) --- examples/FedSimSiam/client/python_env.yaml | 6 +++--- examples/flower-client/client/python_env.yaml | 6 +++--- examples/huggingface/client/python_env.yaml | 6 +++--- examples/mnist-pytorch-DPSGD/client/python_env.yaml | 6 +++--- examples/mnist-pytorch/client/python_env.yaml | 6 +++--- examples/monai-2D-mednist/client/python_env.yaml | 6 +++--- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/examples/FedSimSiam/client/python_env.yaml b/examples/FedSimSiam/client/python_env.yaml index f4cdc5c6f..d728b82be 100644 --- a/examples/FedSimSiam/client/python_env.yaml +++ b/examples/FedSimSiam/client/python_env.yaml @@ -4,12 +4,12 @@ build_dependencies: - setuptools - wheel==0.37.1 dependencies: - - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") # PyTorch macOS x86 builds deprecation - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win" or sys_platform == "linux" and python_version >= "3.9") + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - numpy==1.24.4; python_version == "3.8" - fedn diff --git a/examples/flower-client/client/python_env.yaml b/examples/flower-client/client/python_env.yaml index 16879da90..a82e7e50d 100644 --- a/examples/flower-client/client/python_env.yaml +++ b/examples/flower-client/client/python_env.yaml @@ -5,12 +5,12 @@ build_dependencies: - wheel==0.37.1 dependencies: - fedn[flower] - - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") # PyTorch macOS x86 builds deprecation - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win" or sys_platform == "linux" and python_version >= "3.9") + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - numpy==1.24.4; python_version == "3.8" - fire==0.3.1 diff --git a/examples/huggingface/client/python_env.yaml b/examples/huggingface/client/python_env.yaml index 23052db0a..87ee6f32d 100644 --- a/examples/huggingface/client/python_env.yaml +++ b/examples/huggingface/client/python_env.yaml @@ -4,12 +4,12 @@ build_dependencies: - setuptools - wheel dependencies: - - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") # PyTorch macOS x86 builds deprecation - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win" or sys_platform == "linux" and python_version >= "3.9") + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - numpy==1.24.4; python_version == "3.8" - transformers diff --git a/examples/mnist-pytorch-DPSGD/client/python_env.yaml b/examples/mnist-pytorch-DPSGD/client/python_env.yaml index c9341a6ef..13d586102 100644 --- a/examples/mnist-pytorch-DPSGD/client/python_env.yaml +++ b/examples/mnist-pytorch-DPSGD/client/python_env.yaml @@ -5,12 +5,12 @@ build_dependencies: - wheel dependencies: - fedn - - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") # PyTorch macOS x86 builds deprecation - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win" or sys_platform == "linux" and python_version >= "3.9") + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - numpy==1.24.4; python_version == "3.8" - opacus diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index 642ed1cf0..272b196ea 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -5,11 +5,11 @@ build_dependencies: - wheel dependencies: - fedn - - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") # PyTorch macOS x86 builds deprecation - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win" or sys_platform == "linux" and python_version >= "3.9") + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - numpy==1.24.4; python_version == "3.8" diff --git a/examples/monai-2D-mednist/client/python_env.yaml b/examples/monai-2D-mednist/client/python_env.yaml index 49d371872..35354bf3b 100644 --- a/examples/monai-2D-mednist/client/python_env.yaml +++ b/examples/monai-2D-mednist/client/python_env.yaml @@ -5,12 +5,12 @@ build_dependencies: - wheel dependencies: - fedn - - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torch==2.4.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") # PyTorch macOS x86 builds deprecation - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win" or sys_platform == "linux") + - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win" or sys_platform == "linux" and python_version >= "3.9") + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - numpy==1.24.4; python_version == "3.8" - monai-weekly[pillow, tqdm] From 347711489a7be127408ef59d24e9123c76292b99 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 27 Sep 2024 13:24:33 +0200 Subject: [PATCH 7/9] Feature/SK-1073 | Add SEO meta tags to docs (#713) --- docs/introduction.rst | 9 ++++++++- docs/quickstart.rst | 10 ++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/docs/introduction.rst b/docs/introduction.rst index ec14e8ef7..101fb0d43 100644 --- a/docs/introduction.rst +++ b/docs/introduction.rst @@ -71,4 +71,11 @@ For professionals / Enteprise, we offer `Dedicated support Date: Fri, 27 Sep 2024 13:44:41 +0200 Subject: [PATCH 8/9] Feature/SK-895 | Use the system's default cert store (#711) --- fedn/network/clients/client.py | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index b70ff9975..fa7000a99 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -3,7 +3,6 @@ import os import queue import re -import socket import sys import threading import time @@ -13,9 +12,7 @@ import grpc import requests -from cryptography.hazmat.primitives.serialization import Encoding from google.protobuf.json_format import MessageToJson -from OpenSSL import SSL from tenacity import retry, stop_after_attempt import fedn.network.grpc.fedn_pb2 as fedn @@ -162,20 +159,6 @@ def _add_grpc_metadata(self, key, value): # Set metadata using tuple concatenation self.metadata += ((key, value),) - def _get_ssl_certificate(self, domain, port=443): - context = SSL.Context(SSL.SSLv23_METHOD) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((domain, port)) - ssl_sock = SSL.Connection(context, sock) - ssl_sock.set_tlsext_host_name(domain.encode()) - ssl_sock.set_connect_state() - ssl_sock.do_handshake() - cert = ssl_sock.get_peer_certificate() - ssl_sock.close() - sock.close() - cert = cert.to_cryptography().public_bytes(Encoding.PEM).decode() - return cert - def connect(self, combiner_config): """Connect to combiner. @@ -207,10 +190,8 @@ def connect(self, combiner_config): channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) elif self.config["secure"]: secure = True - logger.info("Using CA certificate for GRPC channel.") - cert = self._get_ssl_certificate(host, port=port) - - credentials = grpc.ssl_channel_credentials(cert.encode("utf-8")) + logger.info("Using default location for root certificates.") + credentials = grpc.ssl_channel_credentials() if self.config["token"]: token = self.config["token"] auth_creds = grpc.metadata_call_credentials(GrpcAuth(token)) From 041ec14afe508627a333114f2c40d57d9e4d1df4 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 27 Sep 2024 14:19:33 +0000 Subject: [PATCH 9/9] bump --- docs/conf.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 5945fa7e2..68193fb2b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,7 +11,7 @@ author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.15.0" +release = "0.16.0" # Add any Sphinx extension module names here, as strings extensions = [ diff --git a/pyproject.toml b/pyproject.toml index f42d4a0b2..d8189a06c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.15.0" +version = "0.16.0" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst"