From 75956dce3829a0ce85af338ee942baf27e579576 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Wed, 29 May 2024 14:15:22 +0200 Subject: [PATCH 01/22] Bug/SK-836 | Grpc error retry passes over max tries (#615) --- .devcontainer/Dockerfile | 14 ++------------ fedn/network/clients/client.py | 1 + 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index cf420369e..1a9ca1001 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,14 +1,10 @@ -ARG BASE_IMG -FROM $BASE_IMG +FROM python:3.10-slim # Non-root user with sudo access ARG USERNAME=default ARG USER_UID=1000 ARG USER_GID=$USER_UID -# Versioning -ARG DOCKER_VERSION=19.03.9 -ARG COMPOSE_VERSION=1.29.2 # Avoid warnings by switching to noninteractive ENV DEBIAN_FRONTEND=noninteractive @@ -27,13 +23,7 @@ RUN apt-get --allow-releaseinfo-change update \ curl \ git \ vim \ - # - # Install docker binaries - && curl -L https://download.docker.com/linux/static/stable/x86_64/docker-${DOCKER_VERSION}.tgz | tar xvz docker/docker \ - && cp docker/docker /usr/local/bin \ - && rm -R docker \ - && curl -L https://github.com/docker/compose/releases/download/${COMPOSE_VERSION}/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose \ - && chmod +x /usr/local/bin/docker-compose \ + ssh \ # # Create a non-root user to use if preferred && groupadd --gid $USER_GID $USERNAME \ diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index df20a8956..14b95a906 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -709,6 +709,7 @@ def _send_heartbeat(self, update_frequency=2.0): ) if self._missed_heartbeat > self.config["reconnect_after_missed_heartbeat"]: self.disconnect() + self._missed_heartbeat = 0 if status_code == grpc.StatusCode.UNAUTHENTICATED: details = e.details() if details == "Token expired": From 7ca66aeb293dbb8ccd7f214c3c369685a28a438f Mon Sep 17 00:00:00 2001 From: stefanhellander <59477428+stefanhellander@users.noreply.github.com> Date: Fri, 7 Jun 2024 10:03:57 +0200 Subject: [PATCH 02/22] Feature/SK-884 | FEDn compatible with Python 3.12 (#619) --- .github/workflows/integration-tests.yaml | 14 +++++++------- examples/FedSimSiam/client/python_env.yaml | 2 +- examples/huggingface/client/python_env.yaml | 2 +- examples/mnist-keras/client/python_env.yaml | 6 +++--- examples/mnist-keras/client/python_env_macosx.yaml | 6 +++--- examples/mnist-keras/requirements.txt | 2 +- examples/mnist-pytorch/client/python_env.yaml | 8 ++++---- fedn/network/clients/client.py | 10 ++++++---- pyproject.toml | 5 +++-- 9 files changed, 29 insertions(+), 26 deletions(-) diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 44bb3d008..19aa20593 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -1,14 +1,14 @@ name: "integration tests" -on: +on: push: branches: - - master - - develop - - 'release/**' + - master + - develop + - "release/**" pull_request: branches: - - '**' + - "**" jobs: integration-tests: @@ -17,7 +17,7 @@ jobs: to_test: - "mnist-keras numpyhelper" - "mnist-pytorch numpyhelper" - python_version: ["3.8","3.9","3.10", "3.11"] + python_version: ["3.8", "3.9", "3.10", "3.11", "3.12"] os: - ubuntu-22.04 runs-on: ${{ matrix.os }} @@ -28,7 +28,7 @@ jobs: - uses: actions/setup-python@v4 with: python-version: ${{ matrix.python_version }} - + - name: run ${{ matrix.to_test }} run: .ci/tests/examples/run.sh ${{ matrix.to_test }} diff --git a/examples/FedSimSiam/client/python_env.yaml b/examples/FedSimSiam/client/python_env.yaml index 49b1ad2ec..82aba8729 100644 --- a/examples/FedSimSiam/client/python_env.yaml +++ b/examples/FedSimSiam/client/python_env.yaml @@ -6,4 +6,4 @@ build_dependencies: dependencies: - torch==2.2.0 - torchvision==0.17.0 - - fedn==0.9.0 \ No newline at end of file + - fedn==0.9.0 diff --git a/examples/huggingface/client/python_env.yaml b/examples/huggingface/client/python_env.yaml index 7f4c4afc3..590675911 100644 --- a/examples/huggingface/client/python_env.yaml +++ b/examples/huggingface/client/python_env.yaml @@ -8,4 +8,4 @@ dependencies: - torchvision==0.17.1 - fedn==0.9.0 - transformers==4.39.3 - - datasets==2.19.0 \ No newline at end of file + - datasets==2.19.0 diff --git a/examples/mnist-keras/client/python_env.yaml b/examples/mnist-keras/client/python_env.yaml index 7220540b6..d0ba7b878 100644 --- a/examples/mnist-keras/client/python_env.yaml +++ b/examples/mnist-keras/client/python_env.yaml @@ -2,8 +2,8 @@ name: mnist-keras build_dependencies: - pip - setuptools - - wheel==0.37.1 + - wheel dependencies: - - tensorflow==2.13.1 + - tensorflow>=2.13.1 - fire==0.3.1 - - fedn==0.9.0 + - fedn diff --git a/examples/mnist-keras/client/python_env_macosx.yaml b/examples/mnist-keras/client/python_env_macosx.yaml index d13fc466a..a5ddba599 100644 --- a/examples/mnist-keras/client/python_env_macosx.yaml +++ b/examples/mnist-keras/client/python_env_macosx.yaml @@ -2,9 +2,9 @@ name: mnist-keras build_dependencies: - pip - setuptools - - wheel==0.37.1 + - wheel dependencies: - tensorflow-macos - - tensorflow-metal + - tensorflow-metal - fire==0.3.1 - - fedn==0.9.0b2 + - fedn diff --git a/examples/mnist-keras/requirements.txt b/examples/mnist-keras/requirements.txt index 749cf78a8..be46f4d58 100644 --- a/examples/mnist-keras/requirements.txt +++ b/examples/mnist-keras/requirements.txt @@ -1,3 +1,3 @@ -tensorflow==2.13.1 +tensorflow>=2.13.1 fire==0.3.1 docker==6.1.1 \ No newline at end of file diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index a0f10ee40..afdea926f 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -2,8 +2,8 @@ name: mnist-pytorch build_dependencies: - pip - setuptools - - wheel==0.37.1 + - wheel dependencies: - - torch==2.2.1 - - torchvision==0.17.1 - - fedn==0.9.0 \ No newline at end of file + - torch==2.3.1 + - torchvision==0.18.1 + - fedn diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index 14b95a906..c0f1b0baa 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -10,8 +10,8 @@ import time import uuid from datetime import datetime -from distutils.dir_util import copy_tree from io import BytesIO +from shutil import copytree import grpc from cryptography.hazmat.primitives.serialization import Encoding @@ -22,11 +22,13 @@ import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.config import FEDN_AUTH_SCHEME, FEDN_PACKAGE_EXTRACT_DIR -from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream +from fedn.common.log_config import (logger, set_log_level_from_string, + set_log_stream) from fedn.network.clients.connect import ConnectorClient, Status from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString -from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator +from fedn.network.combiner.modelservice import (get_tmp_path, + upload_request_generator) from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers.helpers import get_helper @@ -340,7 +342,7 @@ def _initialize_dispatcher(self, config): } from_path = os.path.join(os.getcwd(), "client") - copy_tree(from_path, self.run_path) + copytree(from_path, self.run_path) self.dispatcher = Dispatcher(dispatch_config, self.run_path) # Get or create python environment activate_cmd = self.dispatcher._get_or_create_python_env() diff --git a/pyproject.toml b/pyproject.toml index d52b57269..cb37baba2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.9.5" +version = "0.9.6b1" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst" @@ -24,9 +24,10 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", ] -requires-python = '>=3.8,<3.12' +requires-python = '>=3.8,<3.13' dependencies = [ "requests", "urllib3>=1.26.4", From 0628178027064771af693991e7f598c8984d0e49 Mon Sep 17 00:00:00 2001 From: Salman Toor Date: Fri, 7 Jun 2024 13:02:25 +0200 Subject: [PATCH 03/22] Bug/SK-875 | Fix health check in combiner #618 --- docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index b6563ace6..85386c6da 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -107,7 +107,7 @@ services: "/bin/grpc_health_probe", "-addr=localhost:12080" ] - interval: 2s + interval: 20s timeout: 10s retries: 5 depends_on: From 48d2647219426a42807e8770f338f31ab2201b40 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 7 Jun 2024 13:03:10 +0200 Subject: [PATCH 04/22] Bug/SK-853 | Helper for package ignored in start_session #616 --- fedn/network/api/client.py | 2 +- fedn/network/api/interface.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/fedn/network/api/client.py b/fedn/network/api/client.py index cd0ca5a7a..d26be03ff 100644 --- a/fedn/network/api/client.py +++ b/fedn/network/api/client.py @@ -571,7 +571,7 @@ def start_session( round_buffer_size: int = -1, delete_models: bool = True, validate: bool = True, - helper: str = "numpyhelper", + helper: str = "", min_clients: int = 1, requested_clients: int = 8, ): diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 663add77e..5cd465085 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -963,7 +963,7 @@ def start_session( round_buffer_size=-1, delete_models=True, validate=True, - helper="numpyhelper", + helper="", min_clients=1, requested_clients=8, ): @@ -1002,13 +1002,17 @@ def start_session( return jsonify({"success": False, "message": "A session is already running."}) # Check if compute package is set - if not self.statestore.get_compute_package(): + package = self.statestore.get_compute_package() + if not package: return jsonify( { "success": False, "message": "No compute package set. Set compute package before starting session.", } ) + if not helper: + # get helper from compute package + helper = package["helper"] # Check that initial (seed) model is set if not self.statestore.get_initial_model(): From f4b9e1b27f4c0df1852023d192ab3f6c18d04d36 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 7 Jun 2024 11:10:13 +0000 Subject: [PATCH 05/22] update version --- docs/conf.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 10c077efe..1fe8d9929 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,7 +12,7 @@ author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.9.5" +release = "0.9.6" # Add any Sphinx extension module names here, as strings extensions = [ diff --git a/pyproject.toml b/pyproject.toml index cb37baba2..c11605cb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.9.6b1" +version = "0.9.6" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst" From 9fd444233e336bacd684fda15e442a72b97aa2ba Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Mon, 10 Jun 2024 09:15:13 +0000 Subject: [PATCH 06/22] docs fix --- docs/quickstart.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/quickstart.rst b/docs/quickstart.rst index fa5b83eaa..d231aca9e 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -123,7 +123,7 @@ For example, to split the data in 10 parts and start a client using the 8th part $env:FEDN_PACKAGE_EXTRACT_DIR="package" $env:FEDN_NUM_DATA_SPLITS=10 $env:FEDN_DATA_PATH="./data/clients/8/mnist.pt" - fedn run client -in client.yaml --secure=True --force-ssl + fedn client start -in client.yaml --secure=True --force-ssl Start a training session From 25b47cb6f62e54ea11dd9692a5992631aef23838 Mon Sep 17 00:00:00 2001 From: Salman Toor Date: Mon, 10 Jun 2024 13:42:52 +0200 Subject: [PATCH 07/22] Feature/SK-855 | Monai Example (#620) --- examples/monai-2D-mednist/.dockerignore | 4 + examples/monai-2D-mednist/.gitignore | 6 + examples/monai-2D-mednist/README.rst | 169 ++++++++++++++++++ examples/monai-2D-mednist/client/data.py | 153 ++++++++++++++++ examples/monai-2D-mednist/client/fedn.yaml | 10 ++ examples/monai-2D-mednist/client/model.py | 64 +++++++ .../monai-2D-mednist/client/python_env.yaml | 12 ++ .../monai-2D-mednist/client/requirements.txt | 8 + examples/monai-2D-mednist/client/train.py | 133 ++++++++++++++ examples/monai-2D-mednist/client/validate.py | 97 ++++++++++ .../monai-2D-mednist/client_settings.yaml | 6 + .../docker-compose.override.yaml | 36 ++++ 12 files changed, 698 insertions(+) create mode 100644 examples/monai-2D-mednist/.dockerignore create mode 100644 examples/monai-2D-mednist/.gitignore create mode 100644 examples/monai-2D-mednist/README.rst create mode 100644 examples/monai-2D-mednist/client/data.py create mode 100644 examples/monai-2D-mednist/client/fedn.yaml create mode 100644 examples/monai-2D-mednist/client/model.py create mode 100644 examples/monai-2D-mednist/client/python_env.yaml create mode 100644 examples/monai-2D-mednist/client/requirements.txt create mode 100644 examples/monai-2D-mednist/client/train.py create mode 100644 examples/monai-2D-mednist/client/validate.py create mode 100644 examples/monai-2D-mednist/client_settings.yaml create mode 100644 examples/monai-2D-mednist/docker-compose.override.yaml diff --git a/examples/monai-2D-mednist/.dockerignore b/examples/monai-2D-mednist/.dockerignore new file mode 100644 index 000000000..8ba9024ad --- /dev/null +++ b/examples/monai-2D-mednist/.dockerignore @@ -0,0 +1,4 @@ +data +seed.npz +*.tgz +*.tar.gz \ No newline at end of file diff --git a/examples/monai-2D-mednist/.gitignore b/examples/monai-2D-mednist/.gitignore new file mode 100644 index 000000000..a9f01054b --- /dev/null +++ b/examples/monai-2D-mednist/.gitignore @@ -0,0 +1,6 @@ +data +*.npz +*.tgz +*.tar.gz +.mnist-pytorch +client.yaml \ No newline at end of file diff --git a/examples/monai-2D-mednist/README.rst b/examples/monai-2D-mednist/README.rst new file mode 100644 index 000000000..c2c536f27 --- /dev/null +++ b/examples/monai-2D-mednist/README.rst @@ -0,0 +1,169 @@ +FEDn Project: MonAI 2D Classification with the MedNIST Dataset (PyTorch) +------------------------------------------------------------------------ + +This is an example FEDn Project based on the MonAI 2D Classification with the MedNIST Dataset. +The example is intented as a minimalistic quickstart and automates the handling of training data +by letting the client download and create its partition of the dataset as it starts up. + +Links: + +- MonAI: https://monai.io/ +- Base example notebook: https://github.com/Project-MONAI/tutorials/blob/main/2d_classification/mednist_tutorial.ipynb +- MedNIST dataset: https://github.com/Project-MONAI/MONAI-extra-test-data/releases/download/0.8.1/MedNIST.tar.gz + +Prerequisites +------------- + +Using FEDn Studio: + +- `Python 3.8, 3.9, 3.10 or 3.11 `__ +- `A FEDn Studio account `__ + +If using pseudo-distributed mode with docker-compose: + +- `Docker `__ +- `Docker Compose `__ + +Creating the compute package and seed model +------------------------------------------- + +Install fedn: + +.. code-block:: + + pip install fedn + +Clone this repository, then locate into this directory: + +.. code-block:: + + git clone https://github.com/scaleoutsystems/fedn.git + cd fedn/examples/monai-2D-mednist + +Create the compute package: + +.. code-block:: + + fedn package create --path client + +This should create a file 'package.tgz' in the project folder. + +Next, generate a seed model (the first model in a global model trail): + +.. code-block:: + + fedn run build --path client + +This will create a seed model called 'seed.npz' in the root of the project. This step will take a few minutes, depending on hardware and internet connection (builds a virtualenv). + +Using FEDn Studio +----------------- + +Follow the guide here to set up your FEDn Studio project and learn how to connect clients (using token authentication): `Studio guide `__. +On the step "Upload Files", upload 'package.tgz' and 'seed.npz' created above. + +Connecting clients: +=================== + +**NOTE: In case a different data path needs to be set, use the env variable FEDN_DATA_PATH.** + +.. code-block:: + + export FEDN_PACKAGE_EXTRACT_DIR=package + export FEDN_DATA_PATH=./data/ + export FEDN_CLIENT_SETTINGS_PATH=/client_settings.yaml + fedn client start -in client.yaml --secure=True --force-ssl + +Connecting clients using Docker: +================================ + +For convenience, there is a Docker image hosted on ghrc.io with fedn preinstalled. To start a client using Docker: + +.. code-block:: + + docker run \ + -v $PWD/client.yaml:/app/client.yaml \ + -v $PWD/client_settings.yaml:/app/client_settings.yaml \ + -e FEDN_PACKAGE_EXTRACT_DIR=package \ + -e FEDN_DATA_PATH=./data/ \ + -e FEDN_CLIENT_SETTINGS_PATH=/app/client_settings.yaml \ + ghcr.io/scaleoutsystems/fedn/fedn:0.9.0 run client -in client.yaml --force-ssl --secure=True + + +**NOTE: The following instructions are only for SDK-based client communication and for local development environments using Docker.** + + +Local development mode using Docker/docker compose +-------------------------------------------------- + +Follow the steps above to install FEDn, generate 'package.tgz' and 'seed.tgz'. + +Start a pseudo-distributed FEDn network using docker-compose: + +.. code-block:: + + docker compose \ + -f ../../docker-compose.yaml \ + -f docker-compose.override.yaml \ + up + +This starts up local services for MongoDB, Minio, the API Server, one Combiner and two clients. +You can verify the deployment using these urls: + +- API Server: http://localhost:8092/get_controller_status +- Minio: http://localhost:9000 +- Mongo Express: http://localhost:8081 + +Upload the package and seed model to FEDn controller using the APIClient. In Python: + +.. code-block:: + + from fedn import APIClient + client = APIClient(host="localhost", port=8092) + client.set_active_package("package.tgz", helper="numpyhelper") + client.set_active_model("seed.npz") + +You can now start a training session with 5 rounds (default): + +.. code-block:: + + client.start_session() + +Automate experimentation with several clients +============================================= + +If you want to scale the number of clients, you can do so by modifying ``docker-compose.override.yaml``. For example, +in order to run with 3 clients, change the environment variable ``FEDN_NUM_DATA_SPLITS`` to 3, and add one more client +by copying ``client1`` and setting ``FEDN_DATA_PATH`` to ``/app/package/data3/`` + + +Access message logs and validation data from MongoDB +==================================================== + +You can access and download event logs and validation data via the API, and you can also as a developer obtain +the MongoDB backend data using pymongo or via the MongoExpress interface: + +- http://localhost:8081/db/fedn-network/ + +The credentials are as set in docker-compose.yaml in the root of the repository. + +Access global models +==================== + +You can obtain global model updates from the 'fedn-models' bucket in Minio: + +- http://localhost:9000 + +Reset the FEDn deployment +========================= + +To purge all data from a deployment incuding all session and round data, access the MongoExpress UI interface and +delete the entire ``fedn-network`` collection. Then restart all services. + +Clean up +======== +You can clean up by running + +.. code-block:: + + docker-compose -f ../../docker-compose.yaml -f docker-compose.override.yaml down -v diff --git a/examples/monai-2D-mednist/client/data.py b/examples/monai-2D-mednist/client/data.py new file mode 100644 index 000000000..0a8b5c306 --- /dev/null +++ b/examples/monai-2D-mednist/client/data.py @@ -0,0 +1,153 @@ +import os +import random + +import numpy as np +import PIL +import torch +import yaml +from monai.apps import download_and_extract + +dir_path = os.path.dirname(os.path.realpath(__file__)) +abs_path = os.path.abspath(dir_path) + +DATA_CLASSES = {"AbdomenCT": 0, "BreastMRI": 1, "CXR": 2, "ChestCT": 3, "Hand": 4, "HeadCT": 5} + + +def split_data(data_path="data/MedNIST", splits=100, validation_split=0.9): + # create clients + clients = {"client " + str(i): {"train": [], "validation": []} for i in range(splits)} + + for class_ in os.listdir(data_path): + if os.path.isdir(os.path.join(data_path, class_)): + patients_in_class = [os.path.join(class_, patient) for patient in os.listdir(os.path.join(data_path, class_))] + np.random.shuffle(patients_in_class) + chops = np.int32(np.linspace(0, len(patients_in_class), splits + 1)) + for split in range(splits): + p = patients_in_class[chops[split] : chops[split + 1]] + valsplit = np.int32(len(p) * validation_split) + + clients["client " + str(split)]["train"] += p[:valsplit] + clients["client " + str(split)]["validation"] += p[valsplit:] + + with open(os.path.join(os.path.dirname(data_path), "data_splits.yaml"), "w") as file: + yaml.dump(clients, file, default_flow_style=False) + + +def get_data(out_dir="data"): + """Get data from the external repository. + + :param out_dir: Path to data directory. If doesn't + :type data_dir: str + """ + # Make dir if necessary + if not os.path.exists(out_dir): + os.mkdir(out_dir) + + resource = "https://github.com/Project-MONAI/MONAI-extra-test-data/releases/download/0.8.1/MedNIST.tar.gz" + md5 = "0bc7306e7427e00ad1c5526a6677552d" + + compressed_file = os.path.join(out_dir, "MedNIST.tar.gz") + + data_dir = os.path.abspath(out_dir) + print("data_dir:", data_dir) + if os.path.exists(data_dir): + print("path exist.") + if not os.path.exists(compressed_file): + print("compressed file does not exist, downloading and extracting data.") + download_and_extract(resource, compressed_file, data_dir, md5) + else: + print("files already exist.") + + split_data() + + +def get_classes(data_path): + """Get a list of classes from the dataset + + :param data_path: Path to data directory. + :type data_path: str + """ + if data_path is None: + data_path = os.environ.get("FEDN_DATA_PATH", abs_path + "/data/MedNIST") + + class_names = sorted(x for x in os.listdir(data_path) if os.path.isdir(os.path.join(data_path, x))) + return class_names + + +def load_data(data_path, sample_size=None, is_train=True): + """Load data from disk. + + :param data_path: Path to data directory. + :type data_path: str + :param is_train: Whether to load training or test data. + :type is_train: bool + :return: Tuple of data and labels. + :rtype: tuple + """ + if data_path is None: + data_path = os.environ.get("FEDN_DATA_PATH", abs_path + "/data/MedNIST") + + class_names = get_classes(data_path) + num_class = len(class_names) + + image_files_all = [[os.path.join(data_path, class_names[i], x) for x in os.listdir(os.path.join(data_path, class_names[i]))] for i in range(num_class)] + + # To make the dataset small, we are using sample_size=100 images of each class. + if sample_size is None: + image_files = image_files_all + + else: + image_files = [random.sample(inner_list, sample_size) for inner_list in image_files_all] + + num_each = [len(image_files[i]) for i in range(num_class)] + image_files_list = [] + image_class = [] + for i in range(num_class): + image_files_list.extend(image_files[i]) + image_class.extend([i] * num_each[i]) + num_total = len(image_class) + image_width, image_height = PIL.Image.open(image_files_list[0]).size + + print(f"Total image count: {num_total}") + print(f"Image dimensions: {image_width} x {image_height}") + print(f"Label names: {class_names}") + print(f"Label counts: {num_each}") + + val_frac = 0.1 + length = len(image_files_list) + indices = np.arange(length) + np.random.shuffle(indices) + + val_split = int(val_frac * length) + val_indices = indices[:val_split] + train_indices = indices[val_split:] + + train_x = [image_files_list[i] for i in train_indices] + train_y = [image_class[i] for i in train_indices] + val_x = [image_files_list[i] for i in val_indices] + val_y = [image_class[i] for i in val_indices] + + print(f"Training count: {len(train_x)}, Validation count: " f"{len(val_x)}") + + if is_train: + return train_x, train_y + else: + return val_x, val_y, class_names + + +class MedNISTDataset(torch.utils.data.Dataset): + def __init__(self, data_path, image_files, transforms): + self.data_path = data_path + self.image_files = image_files + self.transforms = transforms + + def __len__(self): + return len(self.image_files) + + def __getitem__(self, index): + return (self.transforms(os.path.join(self.data_path, self.image_files[index])), DATA_CLASSES[os.path.dirname(self.image_files[index])]) + + +if __name__ == "__main__": + # Prepare data if not already done + get_data() diff --git a/examples/monai-2D-mednist/client/fedn.yaml b/examples/monai-2D-mednist/client/fedn.yaml new file mode 100644 index 000000000..b05504102 --- /dev/null +++ b/examples/monai-2D-mednist/client/fedn.yaml @@ -0,0 +1,10 @@ +python_env: python_env.yaml +entry_points: + build: + command: python model.py + startup: + command: python data.py + train: + command: python train.py + validate: + command: python validate.py \ No newline at end of file diff --git a/examples/monai-2D-mednist/client/model.py b/examples/monai-2D-mednist/client/model.py new file mode 100644 index 000000000..4f8596b85 --- /dev/null +++ b/examples/monai-2D-mednist/client/model.py @@ -0,0 +1,64 @@ +import collections + +import torch +from monai.networks.nets import DenseNet121 + +from fedn.utils.helpers.helpers import get_helper + +HELPER_MODULE = "numpyhelper" +helper = get_helper(HELPER_MODULE) + + +def compile_model(): + """Compile the MonAI model. + + :return: The compiled model. + :rtype: torch.nn.Module + """ + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model = DenseNet121(spatial_dims=2, in_channels=1, out_channels=6).to(device) + return model + + +def save_parameters(model, out_path): + """Save model paramters to file. + + :param model: The model to serialize. + :type model: torch.nn.Module + :param out_path: The path to save to. + :type out_path: str + """ + parameters_np = [val.cpu().numpy() for _, val in model.state_dict().items()] + helper.save(parameters_np, out_path) + + +def load_parameters(model_path): + """Load model parameters from file and populate model. + + param model_path: The path to load from. + :type model_path: str + :return: The loaded model. + :rtype: torch.nn.Module + """ + model = compile_model() + parameters_np = helper.load(model_path) + + params_dict = zip(model.state_dict().keys(), parameters_np) + state_dict = collections.OrderedDict({key: torch.tensor(x) for key, x in params_dict}) + model.load_state_dict(state_dict, strict=True) + return model + + +def init_seed(out_path="seed.npz"): + """Initialize seed model and save it to file. + + :param out_path: The path to save the seed model to. + :type out_path: str + """ + # Init and save + model = compile_model() + save_parameters(model, out_path) + + +if __name__ == "__main__": + init_seed("../seed.npz") diff --git a/examples/monai-2D-mednist/client/python_env.yaml b/examples/monai-2D-mednist/client/python_env.yaml new file mode 100644 index 000000000..7580ffb76 --- /dev/null +++ b/examples/monai-2D-mednist/client/python_env.yaml @@ -0,0 +1,12 @@ +name: monai-2d-mdnist +build_dependencies: + - pip + - setuptools + - wheel==0.37.1 +dependencies: + - torch==2.2.1 + - torchvision==0.17.1 + - fedn==0.9.0 + - monai-weekly[pillow, tqdm] + - scikit-learn + - tensorboard diff --git a/examples/monai-2D-mednist/client/requirements.txt b/examples/monai-2D-mednist/client/requirements.txt new file mode 100644 index 000000000..a37218f00 --- /dev/null +++ b/examples/monai-2D-mednist/client/requirements.txt @@ -0,0 +1,8 @@ +setuptools +wheel==0.37.1 +torch==2.2.1 +torchvision==0.17.1 +fedn==0.9.0 +monai-weekly[pillow, tqdm] +scikit-learn +tensorboard diff --git a/examples/monai-2D-mednist/client/train.py b/examples/monai-2D-mednist/client/train.py new file mode 100644 index 000000000..e3cb235c0 --- /dev/null +++ b/examples/monai-2D-mednist/client/train.py @@ -0,0 +1,133 @@ +import os +import sys + +import numpy as np +import torch +import yaml +from data import MedNISTDataset +from model import load_parameters, save_parameters +from monai.data import DataLoader +from monai.transforms import ( + Compose, + EnsureChannelFirst, + LoadImage, + RandFlip, + RandRotate, + RandZoom, + ScaleIntensity, +) + +from fedn.utils.helpers.helpers import save_metadata + +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.abspath(dir_path)) + + +train_transforms = Compose( + [ + LoadImage(image_only=True), + EnsureChannelFirst(), + ScaleIntensity(), + RandRotate(range_x=np.pi / 12, prob=0.5, keep_size=True), + RandFlip(spatial_axis=0, prob=0.5), + RandZoom(min_zoom=0.9, max_zoom=1.1, prob=0.5), + ] +) + + +def train(in_model_path, out_model_path, data_path=None, client_settings_path=None): + """Complete a model update. + + Load model paramters from in_model_path (managed by the FEDn client), + perform a model update, and write updated paramters + to out_model_path (picked up by the FEDn client). + + :param in_model_path: The path to the input model. + :type in_model_path: str + :param out_model_path: The path to save the output model to. + :type out_model_path: str + :param data_path: The path to the data directory. + :type data_path: str + :param client_settings_path: path to a local client settings file. + :type client_settings_path: str + """ + if client_settings_path is None: + client_settings_path = os.environ.get("FEDN_CLIENT_SETTINGS_PATH", dir_path + "/client_settings.yaml") + + print("client_settings_path: ", client_settings_path) + with open(client_settings_path, "r") as fh: # Used by CJG for local training + try: + client_settings = dict(yaml.safe_load(fh)) + except yaml.YAMLError: + raise + + print("client settings: ", client_settings) + batch_size = client_settings["batch_size"] + max_epochs = client_settings["local_epochs"] + num_workers = client_settings["num_workers"] + split_index = client_settings["split_index"] + lr = client_settings["lr"] + + if data_path is None: + data_path = os.environ.get("FEDN_DATA_PATH") + + with open(os.path.join(os.path.dirname(data_path), "data_splits.yaml"), "r") as file: + clients = yaml.safe_load(file) + + image_list = clients["client " + str(split_index)]["train"] + + train_ds = MedNISTDataset(data_path="data/MedNIST", transforms=train_transforms, image_files=image_list) + + train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers) + + # Load parmeters and initialize model + model = load_parameters(in_model_path) + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + optimizer = torch.optim.Adam(model.parameters(), 1e-5) + loss_function = torch.nn.CrossEntropyLoss() + + # Train + epoch_loss_values = [] + # writer = SummaryWriter() + + for epoch in range(max_epochs): + print("-" * 10) + print(f"epoch {epoch + 1}/{max_epochs}") + model.train() + epoch_loss = 0 + step = 0 + for batch_data in train_loader: + step += 1 + inputs, labels = batch_data[0].to(device), batch_data[1].to(device) + optimizer.zero_grad() + outputs = model(inputs) + loss = loss_function(outputs, labels) + loss.backward() + optimizer.step() + epoch_loss += loss.item() + print(f"{step}/{len(train_loader) // train_loader.batch_size}, " f"train_loss: {loss.item():.4f}") + + epoch_loss /= step + epoch_loss_values.append(epoch_loss) + print(f"epoch {epoch + 1} average loss: {epoch_loss:.4f}") + + print("training completed!") + + # Metadata needed for aggregation server side + metadata = { + # num_examples are mandatory + "num_examples": len(train_loader), + "batch_size": batch_size, + "epochs": max_epochs, + "lr": lr, + } + + # Save JSON metadata file (mandatory) + save_metadata(metadata, out_model_path) + + # Save model update (mandatory) + save_parameters(model, out_model_path) + + +if __name__ == "__main__": + train(sys.argv[1], sys.argv[2]) diff --git a/examples/monai-2D-mednist/client/validate.py b/examples/monai-2D-mednist/client/validate.py new file mode 100644 index 000000000..74292c34f --- /dev/null +++ b/examples/monai-2D-mednist/client/validate.py @@ -0,0 +1,97 @@ +import os +import sys + +import torch +import yaml +from data import DATA_CLASSES, MedNISTDataset +from model import load_parameters +from monai.data import DataLoader +from monai.transforms import ( + Compose, + EnsureChannelFirst, + LoadImage, + ScaleIntensity, +) +from sklearn.metrics import accuracy_score, classification_report, f1_score + +from fedn.utils.helpers.helpers import save_metrics + +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.abspath(dir_path)) + +val_transforms = Compose([LoadImage(image_only=True), EnsureChannelFirst(), ScaleIntensity()]) + + +def validate(in_model_path, out_json_path, data_path=None, client_settings_path=None): + """Validate model. + + :param in_model_path: The path to the input model. + :type in_model_path: str + :param out_json_path: The path to save the output JSON to. + :type out_json_path: str + :param data_path: The path to the data file. + :type data_path: str + :param client_settings_path: The path to the local client settings file. + :type client_settings_path: str + """ + if client_settings_path is None: + client_settings_path = os.environ.get("FEDN_CLIENT_SETTINGS_PATH", dir_path + "/client_settings.yaml") + + with open(client_settings_path, "r") as fh: # Used by CJG for local training + try: + client_settings = dict(yaml.safe_load(fh)) + except yaml.YAMLError: + raise + + num_workers = client_settings["num_workers"] + batch_size = client_settings["batch_size"] + split_index = client_settings["split_index"] + + if data_path is None: + data_path = os.environ.get("FEDN_DATA_PATH") + + with open(os.path.join(os.path.dirname(data_path), "data_splits.yaml"), "r") as file: + clients = yaml.safe_load(file) + + image_list = clients["client " + str(split_index)]["validation"] + + val_ds = MedNISTDataset(data_path="data/MedNIST", transforms=val_transforms, image_files=image_list) + + val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers) + + # Load model + model = load_parameters(in_model_path) + model.eval() + + y_true = [] + y_pred = [] + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + with torch.no_grad(): + for val_data in val_loader: + val_images, val_labels = ( + val_data[0].to(device), + val_data[1].to(device), + ) + pred = model(val_images).argmax(dim=1) + for i in range(len(pred)): + y_true.append(val_labels[i].item()) + y_pred.append(pred[i].item()) + + class_names = list(DATA_CLASSES.keys()) + print("class names: ", class_names) + cr = classification_report(y_true, y_pred, digits=4, output_dict=True, target_names=class_names) + report = {class_name + "_" + metric: cr[class_name][metric] for class_name in cr if isinstance(cr[class_name], dict) for metric in cr[class_name]} + report.update({class_name: cr[class_name] for class_name in cr if isinstance(cr[class_name], str)}) + + # JSON schema + report.update({"test_accuracy": accuracy_score(y_true, y_pred), "test_f1_score": f1_score(y_true, y_pred, average="macro")}) + for r in report: + print(r, ": ", report[r]) + + # Save JSON + save_metrics(report, out_json_path) + + +if __name__ == "__main__": + validate(sys.argv[1], sys.argv[2]) diff --git a/examples/monai-2D-mednist/client_settings.yaml b/examples/monai-2D-mednist/client_settings.yaml new file mode 100644 index 000000000..f7bccb303 --- /dev/null +++ b/examples/monai-2D-mednist/client_settings.yaml @@ -0,0 +1,6 @@ +lr: 0.01 +batch_size: 32 +local_epochs: 10 +num_workers: 1 +sample_size: 30 +split_index: 4 diff --git a/examples/monai-2D-mednist/docker-compose.override.yaml b/examples/monai-2D-mednist/docker-compose.override.yaml new file mode 100644 index 000000000..afeaf1437 --- /dev/null +++ b/examples/monai-2D-mednist/docker-compose.override.yaml @@ -0,0 +1,36 @@ +# Compose schema version +version: '3.4' + +# Overriding requirements + +x-env: &defaults + GET_HOSTS_FROM: dns + FEDN_PACKAGE_EXTRACT_DIR: package + +services: + + client1: + extends: + file: ${HOST_REPO_DIR:-.}/docker-compose.yaml + service: client + environment: + <<: *defaults + FEDN_DATA_PATH: /app/package/client/data/MedNIST + FEDN_CLIENT_SETTINGS_PATH: /app/client_settings.yaml + deploy: + replicas: 1 + volumes: + - ${HOST_REPO_DIR:-.}/fedn:/app/fedn + - ${HOST_REPO_DIR:-.}/examples/monai-2D-mednist/client_settings.yaml:/app/client_settings.yaml + + client2: + extends: + file: ${HOST_REPO_DIR:-.}/docker-compose.yaml + service: client + environment: + <<: *defaults + FEDN_DATA_PATH: /app/package/client/data/MedNIST + deploy: + replicas: 1 + volumes: + - ${HOST_REPO_DIR:-.}/fedn:/app/fedn From fa592f32224db21a88f47890af35e11b9c1a33e8 Mon Sep 17 00:00:00 2001 From: Salman Toor Date: Mon, 10 Jun 2024 16:21:27 +0200 Subject: [PATCH 08/22] Bug/SK-893 - MonAI example, requirements.txt file removed #622 --- examples/monai-2D-mednist/client/requirements.txt | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 examples/monai-2D-mednist/client/requirements.txt diff --git a/examples/monai-2D-mednist/client/requirements.txt b/examples/monai-2D-mednist/client/requirements.txt deleted file mode 100644 index a37218f00..000000000 --- a/examples/monai-2D-mednist/client/requirements.txt +++ /dev/null @@ -1,8 +0,0 @@ -setuptools -wheel==0.37.1 -torch==2.2.1 -torchvision==0.17.1 -fedn==0.9.0 -monai-weekly[pillow, tqdm] -scikit-learn -tensorboard From b7ac10b66b06cf2e5720ef801ed48dff6c635c7d Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 11 Jun 2024 11:33:18 +0200 Subject: [PATCH 09/22] Feature/SK-866 | Inference TaskType workflow (#614) Also introduces RoundConfig typed object --- fedn/network/api/v1/__init__.py | 3 +- fedn/network/api/v1/inference_routes.py | 34 ++++++ .../combiner/aggregators/aggregatorbase.py | 33 +++--- fedn/network/combiner/combiner.py | 95 +++++++++++------ fedn/network/combiner/interfaces.py | 3 +- fedn/network/combiner/roundhandler.py | 100 ++++++++++++++++-- fedn/network/controller/control.py | 41 ++++++- fedn/network/controller/controlbase.py | 25 +++-- .../storage/statestore/mongostatestore.py | 5 +- 9 files changed, 262 insertions(+), 77 deletions(-) create mode 100644 fedn/network/api/v1/inference_routes.py diff --git a/fedn/network/api/v1/__init__.py b/fedn/network/api/v1/__init__.py index bb8d8d33c..0e05dd249 100644 --- a/fedn/network/api/v1/__init__.py +++ b/fedn/network/api/v1/__init__.py @@ -1,5 +1,6 @@ from fedn.network.api.v1.client_routes import bp as client_bp from fedn.network.api.v1.combiner_routes import bp as combiner_bp +from fedn.network.api.v1.inference_routes import bp as inference_bp from fedn.network.api.v1.model_routes import bp as model_bp from fedn.network.api.v1.package_routes import bp as package_bp from fedn.network.api.v1.round_routes import bp as round_bp @@ -7,4 +8,4 @@ from fedn.network.api.v1.status_routes import bp as status_bp from fedn.network.api.v1.validation_routes import bp as validation_bp -_routes = [client_bp, combiner_bp, model_bp, package_bp, round_bp, session_bp, status_bp, validation_bp] +_routes = [client_bp, combiner_bp, model_bp, package_bp, round_bp, session_bp, status_bp, validation_bp, inference_bp] diff --git a/fedn/network/api/v1/inference_routes.py b/fedn/network/api/v1/inference_routes.py new file mode 100644 index 000000000..6da2dc8b4 --- /dev/null +++ b/fedn/network/api/v1/inference_routes.py @@ -0,0 +1,34 @@ +import threading + +from flask import Blueprint, jsonify, request + +from fedn.network.api.auth import jwt_auth_required +from fedn.network.api.shared import control +from fedn.network.api.v1.shared import api_version + +bp = Blueprint("inference", __name__, url_prefix=f"/api/{api_version}/infer") + + +@bp.route("/start", methods=["POST"]) +@jwt_auth_required(role="admin") +def start_session(): + """Start a new inference session. + param: session_id: The session id to start. + type: session_id: str + param: rounds: The number of rounds to run. + type: rounds: int + """ + try: + data = request.json if request.headers["Content-Type"] == "application/json" else request.form.to_dict() + session_id: str = data.get("session_id") + + if not session_id or session_id == "": + return jsonify({"message": "Session ID is required"}), 400 + + session_config = {"session_id": session_id} + + threading.Thread(target=control.inference_session, kwargs={"config": session_config}).start() + + return jsonify({"message": "Inference session started"}), 200 + except Exception: + return jsonify({"message": "Failed to start inference session"}), 500 diff --git a/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/network/combiner/aggregators/aggregatorbase.py index 0a9c33f43..44d10fca2 100644 --- a/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/network/combiner/aggregators/aggregatorbase.py @@ -1,6 +1,7 @@ import importlib import json import queue +import traceback from abc import ABC, abstractmethod from fedn.common.log_config import logger @@ -9,7 +10,7 @@ class AggregatorBase(ABC): - """ Abstract class defining an aggregator. + """Abstract class defining an aggregator. :param id: A reference to id of :class: `fedn.network.combiner.Combiner` :type id: str @@ -25,7 +26,7 @@ class AggregatorBase(ABC): @abstractmethod def __init__(self, storage, server, modelservice, round_handler): - """ Initialize the aggregator.""" + """Initialize the aggregator.""" self.name = self.__class__.__name__ self.storage = storage self.server = server @@ -75,25 +76,31 @@ def on_model_update(self, model_update): else: logger.warning("AGGREGATOR({}): Invalid model update, skipping.".format(self.name)) except Exception as e: - logger.error("AGGREGATOR({}): failed to receive model update! {}".format(self.name, e)) + tb = traceback.format_exc() + logger.error("AGGREGATOR({}): failed to receive model update: {}".format(self.name, e)) + logger.error(tb) pass def _validate_model_update(self, model_update): - """ Validate the model update. + """Validate the model update. :param model_update: A ModelUpdate message. :type model_update: object :return: True if the model update is valid, False otherwise. :rtype: bool """ - data = json.loads(model_update.meta)["training_metadata"] - if "num_examples" not in data.keys(): - logger.error("AGGREGATOR({}): Model validation failed, num_examples missing in metadata.".format(self.name)) + try: + data = json.loads(model_update.meta)["training_metadata"] + _ = data["num_examples"] + except KeyError: + tb = traceback.format_exc() + logger.error("AGGREGATOR({}): Invalid model update, missing metadata.".format(self.name)) + logger.error(tb) return False return True def next_model_update(self): - """ Get the next model update from the queue. + """Get the next model update from the queue. :param helper: A helper object. :type helper: object @@ -104,7 +111,7 @@ def next_model_update(self): return model_update def load_model_update(self, model_update, helper): - """ Load the memory representation of the model update. + """Load the memory representation of the model update. Load the model update paramters and the associate metadata into memory. @@ -132,15 +139,13 @@ def load_model_update(self, model_update, helper): return model, training_metadata def get_state(self): - """ Get the state of the aggregator's queue, including the number of model updates.""" - state = { - "queue_len": self.model_updates.qsize() - } + """Get the state of the aggregator's queue, including the number of model updates.""" + state = {"queue_len": self.model_updates.qsize()} return state def get_aggregator(aggregator_module_name, storage, server, modelservice, control): - """ Return an instance of the helper class. + """Return an instance of the helper class. :param helper_module_name: The name of the helper plugin module. :type helper_module_name: str diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 8eacd917e..70755ac6b 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -15,7 +15,7 @@ from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream from fedn.network.combiner.connect import ConnectorCombiner, Status from fedn.network.combiner.modelservice import ModelService -from fedn.network.combiner.roundhandler import RoundHandler +from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler from fedn.network.grpc.server import Server from fedn.network.storage.s3.repository import Repository from fedn.network.storage.statestore.mongostatestore import MongoStateStore @@ -65,7 +65,6 @@ def __init__(self, config): # Client queues self.clients = {} - # Validate combiner name match = re.search(VALID_NAME_REGEX, config["name"]) if not match: @@ -161,7 +160,7 @@ def __whoami(self, client, instance): client.role = role_to_proto_role(instance.role) return client - def request_model_update(self, config, clients=[]): + def request_model_update(self, session_id, model_id, config, clients=[]): """Ask clients to update the current global model. :param config: the model configuration to send to clients @@ -170,32 +169,14 @@ def request_model_update(self, config, clients=[]): :type clients: list """ - # The request to be added to the client queue - request = fedn.TaskRequest() - request.model_id = config["model_id"] - request.correlation_id = str(uuid.uuid4()) - request.timestamp = str(datetime.now()) - request.data = json.dumps(config) - request.type = fedn.StatusType.MODEL_UPDATE - request.session_id = config["session_id"] - - request.sender.name = self.id - request.sender.role = fedn.COMBINER - - if len(clients) == 0: - clients = self.get_active_trainers() - - for client in clients: - request.receiver.name = client - request.receiver.role = fedn.WORKER - self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) + request, clients = self._send_request_type(fedn.StatusType.MODEL_UPDATE, session_id, model_id, config, clients) if len(clients) < 20: logger.info("Sent model update request for model {} to clients {}".format(request.model_id, clients)) else: logger.info("Sent model update request for model {} to {} clients".format(request.model_id, len(clients))) - def request_model_validation(self, model_id, config, clients=[]): + def request_model_validation(self, session_id, model_id, clients=[]): """Ask clients to validate the current global model. :param model_id: the model id to validate @@ -206,30 +187,76 @@ def request_model_validation(self, model_id, config, clients=[]): :type clients: list """ - # The request to be added to the client queue + request, clients = self._send_request_type(fedn.StatusType.MODEL_VALIDATION, session_id, model_id, clients) + + if len(clients) < 20: + logger.info("Sent model validation request for model {} to clients {}".format(request.model_id, clients)) + else: + logger.info("Sent model validation request for model {} to {} clients".format(request.model_id, len(clients))) + + def request_model_inference(self, session_id: str, model_id: str, clients: list = []) -> None: + """Ask clients to perform inference on the model. + + :param model_id: the model id to perform inference on + :type model_id: str + :param config: the model configuration to send to clients + :type config: dict + :param clients: the clients to send the request to + :type clients: list + + """ + request, clients = self._send_request_type(fedn.StatusType.INFERENCE, session_id, model_id, clients) + + if len(clients) < 20: + logger.info("Sent model inference request for model {} to clients {}".format(request.model_id, clients)) + else: + logger.info("Sent model inference request for model {} to {} clients".format(request.model_id, len(clients))) + + def _send_request_type(self, request_type, session_id, model_id, config=None, clients=[]): + """Send a request of a specific type to clients. + + :param request_type: the type of request + :type request_type: :class:`fedn.network.grpc.fedn_pb2.StatusType` + :param model_id: the model id to send in the request + :type model_id: str + :param config: the model configuration to send to clients + :type config: dict + :param clients: the clients to send the request to + :type clients: list + :return: the request and the clients + :rtype: tuple + """ request = fedn.TaskRequest() request.model_id = model_id request.correlation_id = str(uuid.uuid4()) request.timestamp = str(datetime.now()) - # request.is_inference = (config['task'] == 'inference') - request.type = fedn.StatusType.MODEL_VALIDATION + request.type = request_type + request.session_id = session_id request.sender.name = self.id request.sender.role = fedn.COMBINER - request.session_id = config["session_id"] - if len(clients) == 0: - clients = self.get_active_validators() + if request_type == fedn.StatusType.MODEL_UPDATE: + request.data = json.dumps(config) + if len(clients) == 0: + clients = self.get_active_trainers() + elif request_type == fedn.StatusType.MODEL_VALIDATION: + if len(clients) == 0: + clients = self.get_active_validators() + elif request_type == fedn.StatusType.INFERENCE: + request.data = json.dumps(config) + if len(clients) == 0: + # TODO: add inference clients type + clients = self.get_active_validators() + + # TODO: if inference, request.data should be user-defined data/parameters for client in clients: request.receiver.name = client request.receiver.role = fedn.WORKER self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) - if len(clients) < 20: - logger.info("Sent model validation request for model {} to clients {}".format(request.model_id, clients)) - else: - logger.info("Sent model validation request for model {} to {} clients".format(request.model_id, len(clients))) + return request, clients def get_active_trainers(self): """Get a list of active trainers. @@ -410,7 +437,7 @@ def Start(self, control: fedn.ControlRequest, context): """ logger.info("grpc.Combiner.Start: Starting round") - config = {} + config = RoundConfig() for parameter in control.parameter: config.update({parameter.key: parameter.value}) diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index bf10a00f1..935b75442 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -8,6 +8,7 @@ import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc +from fedn.network.combiner.roundhandler import RoundConfig class CombinerUnavailableError(Exception): @@ -202,7 +203,7 @@ def set_aggregator(self, aggregator): else: raise - def submit(self, config): + def submit(self, config: RoundConfig): """Submit a compute plan to the combiner. :param config: The job configuration. diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index 4edc04b6e..ef9029de9 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -4,6 +4,7 @@ import sys import time import uuid +from typing import TypedDict from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator @@ -12,6 +13,57 @@ from fedn.utils.parameters import Parameters +class RoundConfig(TypedDict): + """Round configuration. + + :param _job_id: A universally unique identifier for the round. Set by Combiner. + :type _job_id: str + :param committed_at: The time the round was committed. Set by Controller. + :type committed_at: str + :param task: The task to perform in the round. Set by Controller. Supported tasks are "training", "validation", and "inference". + :type task: str + :param round_id: The round identifier as str(int) + :type round_id: str + :param round_timeout: The round timeout in seconds. Set by user interfaces or Controller. + :type round_timeout: str + :param rounds: The number of rounds. Set by user interfaces. + :param model_id: The model identifier. Set by user interfaces or Controller (get_latest_model). + :type model_id: str + :param model_version: The model version. Currently not used. + :type model_version: str + :param model_type: The model type. Currently not used. + :type model_type: str + :param model_size: The size of the model. Currently not used. + :type model_size: int + :param model_parameters: The model parameters. Currently not used. + :type model_parameters: dict + :param model_metadata: The model metadata. Currently not used. + :type model_metadata: dict + :param session_id: The session identifier. Set by (Controller?). + :type session_id: str + :param helper_type: The helper type. + :type helper_type: str + :param aggregator: The aggregator type. + :type aggregator: str + """ + + _job_id: str + committed_at: str + task: str + round_id: str + round_timeout: str + rounds: int + model_id: str + model_version: str + model_type: str + model_size: int + model_parameters: dict + model_metadata: dict + session_id: str + helper_type: str + aggregator: str + + class ModelUpdateError(Exception): pass @@ -42,7 +94,7 @@ def __init__(self, storage, server, modelservice): def set_aggregator(self, aggregator): self.aggregator = get_aggregator(aggregator, self.storage, self.server, self.modelservice, self) - def push_round_config(self, round_config): + def push_round_config(self, round_config: RoundConfig) -> str: """Add a round_config (job description) to the inbox. :param round_config: A dict containing the round configuration (from global controller). @@ -144,8 +196,11 @@ def _training_round(self, config, clients): meta["nr_required_updates"] = int(config["clients_required"]) meta["timeout"] = float(config["round_timeout"]) + session_id = config["session_id"] + model_id = config["model_id"] + # Request model updates from all active clients. - self.server.request_model_update(config, clients=clients) + self.server.request_model_update(session_id=session_id, model_id=model_id, config=config, clients=clients) # If buffer_size is -1 (default), the round terminates when/if all clients have completed. if int(config["buffer_size"]) == -1: @@ -182,7 +237,7 @@ def _training_round(self, config, clients): meta["aggregation_time"] = data return model, meta - def _validation_round(self, config, clients, model_id): + def _validation_round(self, session_id, model_id, clients): """Send model validation requests to clients. :param config: The round config object (passed to the client). @@ -192,7 +247,19 @@ def _validation_round(self, config, clients, model_id): :param model_id: The ID of the model to validate :type model_id: str """ - self.server.request_model_validation(model_id, config, clients) + self.server.request_model_validation(session_id, model_id, clients=clients) + + def _inference_round(self, session_id: str, model_id: str, clients: list): + """Send model inference requests to clients. + + :param config: The round config object (passed to the client). + :type config: dict + :param clients: clients to send inference requests to + :type clients: list + :param model_id: The ID of the model to use for inference + :type model_id: str + """ + self.server.request_model_inference(session_id, model_id, clients=clients) def stage_model(self, model_id, timeout_retry=3, retry=2): """Download a model from persistent storage and set in modelservice. @@ -271,17 +338,28 @@ def _check_nr_round_clients(self, config): logger.info("Too few clients to start round.") return False - def execute_validation_round(self, round_config): + def execute_validation_round(self, session_id, model_id): """Coordinate validation rounds as specified in config. :param round_config: The round config object. :type round_config: dict """ - model_id = round_config["model_id"] logger.info("COMBINER orchestrating validation of model {}".format(model_id)) self.stage_model(model_id) validators = self._assign_round_clients(self.server.max_clients, type="validators") - self._validation_round(round_config, validators, model_id) + self._validation_round(session_id, model_id, validators) + + def execute_inference_round(self, session_id: str, model_id: str) -> None: + """Coordinate inference rounds as specified in config. + + :param round_config: The round config object. + :type round_config: dict + """ + logger.info("COMBINER orchestrating inference using model {}".format(model_id)) + self.stage_model(model_id) + # TODO: Implement inference client type + clients = self._assign_round_clients(self.server.max_clients, type="validators") + self._inference_round(session_id, model_id, clients) def execute_training_round(self, config): """Coordinates clients to execute training tasks. @@ -330,6 +408,8 @@ def run(self, polling_interval=1.0): while True: try: round_config = self.round_configs.get(block=False) + session_id = round_config["session_id"] + model_id = round_config["model_id"] # Check that the minimum allowed number of clients are connected ready = self._check_nr_round_clients(round_config) @@ -343,8 +423,10 @@ def run(self, polling_interval=1.0): round_meta["status"] = "Success" round_meta["name"] = self.server.id self.server.statestore.set_round_combiner_data(round_meta) - elif round_config["task"] == "validation" or round_config["task"] == "inference": - self.execute_validation_round(round_config) + elif round_config["task"] == "validation": + self.execute_validation_round(session_id, model_id) + elif round_config["task"] == "inference": + self.execute_inference_round(session_id, model_id) else: logger.warning("config contains unkown task type.") else: diff --git a/fedn/network/controller/control.py b/fedn/network/controller/control.py index 7919fc620..c7a6d1c26 100644 --- a/fedn/network/controller/control.py +++ b/fedn/network/controller/control.py @@ -8,6 +8,7 @@ from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.combiner.modelservice import load_model_from_BytesIO +from fedn.network.combiner.roundhandler import RoundConfig from fedn.network.controller.controlbase import ControlBase from fedn.network.state import ReducerState @@ -78,7 +79,7 @@ def __init__(self, statestore): super().__init__(statestore) self.name = "DefaultControl" - def start_session(self, session_id: str, rounds: int): + def start_session(self, session_id: str, rounds: int) -> None: if self._state == ReducerState.instructing: logger.info("Controller already in INSTRUCTING state. A session is in progress.") return @@ -132,7 +133,7 @@ def start_session(self, session_id: str, rounds: int): self.set_session_status(session_id, "Finished") self._state = ReducerState.idle - def session(self, config): + def session(self, config: RoundConfig) -> None: """Execute a new training session. A session consists of one or several global rounds. All rounds in the same session have the same round_config. @@ -183,7 +184,41 @@ def session(self, config): self.set_session_status(config["session_id"], "Finished") self._state = ReducerState.idle - def round(self, session_config, round_id): + def inference_session(self, config: RoundConfig) -> None: + """Execute a new inference session. + + :param config: The round config. + :type config: InferenceConfig + :return: None + """ + if self._state == ReducerState.instructing: + logger.info("Controller already in INSTRUCTING state. A session is in progress.") + return + + if len(self.network.get_combiners()) < 1: + logger.warning("Inference round cannot start, no combiners connected!") + return + + if "model_id" not in config.keys(): + config["model_id"] = self.statestore.get_latest_model() + + config["committed_at"] = datetime.datetime.now() + config["task"] = "inference" + config["rounds"] = str(1) + config["clients_required"] = 1 + + participating_combiners = self.get_participating_combiners(config) + + # Check if the policy to start the round is met, Default is number of combiners > 0 + round_start = self.evaluate_round_start_policy(participating_combiners) + + if round_start: + logger.info("Inference round start policy met, {} participating combiners.".format(len(participating_combiners))) + for combiner, _ in participating_combiners: + combiner.submit(config) + logger.info("Inference round submitted to combiner {}".format(combiner)) + + def round(self, session_config: RoundConfig, round_id: str): """Execute one global round. : param session_config: The session config. diff --git a/fedn/network/controller/controlbase.py b/fedn/network/controller/controlbase.py index d667e01c4..141848b78 100644 --- a/fedn/network/controller/controlbase.py +++ b/fedn/network/controller/controlbase.py @@ -7,6 +7,7 @@ from fedn.common.log_config import logger from fedn.network.api.network import Network from fedn.network.combiner.interfaces import CombinerUnavailableError +from fedn.network.combiner.roundhandler import RoundConfig from fedn.network.state import ReducerState from fedn.network.storage.s3.repository import Repository @@ -113,14 +114,12 @@ def idle(self): return False def get_model_info(self): - """:return: - """ + """:return:""" return self.statestore.get_model_trail() # TODO: remove use statestore.get_events() instead def get_events(self): - """:return: - """ + """:return:""" return self.statestore.get_events() def get_latest_round_id(self): @@ -135,8 +134,7 @@ def get_latest_round(self): return round def get_compute_package_name(self): - """:return: - """ + """:return:""" definition = self.statestore.get_compute_package() if definition: try: @@ -163,7 +161,7 @@ def get_compute_package(self, compute_package=""): else: return None - def create_session(self, config, status="Initialized"): + def create_session(self, config: RoundConfig, status: str = "Initialized") -> None: """Initialize a new session in backend db.""" if "session_id" not in config.keys(): session_id = uuid.uuid4() @@ -209,7 +207,7 @@ def set_round_status(self, round_id, status): """ self.statestore.set_round_status(round_id, status) - def set_round_config(self, round_id, round_config): + def set_round_config(self, round_id, round_config: RoundConfig): """Upate round in backend db. :param round_id: The round unique identifier @@ -223,7 +221,7 @@ def request_model_updates(self, combiners): """Ask Combiner server to produce a model update. :param combiners: A list of combiners - :type combiners: tuple (combiner, comboner_round_config) + :type combiners: tuple (combiner, combiner_round_config) """ cl = [] for combiner, combiner_round_config in combiners: @@ -273,22 +271,23 @@ def get_participating_combiners(self, combiner_round_config): self._handle_unavailable_combiner(combiner) continue - is_participating = self.evaluate_round_participation_policy(combiner_round_config, nr_active_clients) + clients_required = int(combiner_round_config["clients_required"]) + is_participating = self.evaluate_round_participation_policy(clients_required, nr_active_clients) if is_participating: combiners.append((combiner, combiner_round_config)) return combiners - def evaluate_round_participation_policy(self, compute_plan, nr_active_clients): + def evaluate_round_participation_policy(self, clients_required: int, nr_active_clients: int) -> bool: """Evaluate policy for combiner round-participation. A combiner participates if it is responsive and reports enough active clients to participate in the round. """ - if int(compute_plan["clients_required"]) <= nr_active_clients: + if clients_required <= nr_active_clients: return True else: return False - def evaluate_round_start_policy(self, combiners): + def evaluate_round_start_policy(self, combiners: list): """Check if the policy to start a round is met. :param combiners: A list of combiners diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 6bf3be4ff..724077984 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -6,6 +6,7 @@ from google.protobuf.json_format import MessageToDict from fedn.common.log_config import logger +from fedn.network.combiner.roundhandler import RoundConfig from fedn.network.state import ReducerStateToString, StringToReducerState @@ -859,7 +860,7 @@ def create_round(self, round_data): # TODO: Add check if round_id already exists self.rounds.insert_one(round_data) - def set_session_config(self, id, config): + def set_session_config(self, id: str, config: RoundConfig) -> None: """Set the session configuration. :param id: The session id @@ -886,7 +887,7 @@ def set_round_combiner_data(self, data): """ self.rounds.update_one({"round_id": str(data["round_id"])}, {"$push": {"combiners": data}}, True) - def set_round_config(self, round_id, round_config): + def set_round_config(self, round_id, round_config: RoundConfig): """Set round configuration. :param round_id: The round unique identifier From 85cc99b04b989f8f9aaa616b5fb88302df3bef79 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 11 Jun 2024 12:02:32 +0200 Subject: [PATCH 10/22] Feature/SK-896 | Add dependabot.yml (#623 ) --- .github/dependabot.yml | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .github/dependabot.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..2378d96de --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,34 @@ +version: 2 +updates: + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "weekly" + open-pull-requests-limit: 5 + commit-message: + prefix: "deps" + labels: + - "dependencies" + reviewers: + - "Wrede" + - "stefanhellander" + # To ignore a dependency, uncomment the next two lines and replace "example-package" with the name of the dependency + #ignore: + # - dependency-name: "example-package" + # versions: ["< 1.0.0"] + + - package-ecosystem: "poetry" + directory: "/" + schedule: + interval: "weekly" + open-pull-requests-limit: 5 + commit-message: + prefix: "deps" + labels: + - "dependencies" + reviewers: + - "Wrede" + - "stefanhellander" + #ignore: + # - dependency-name: "example-package" + # versions: ["< 1.0.0"] \ No newline at end of file From 479dce2895bb5e826f8ec7c3ff367b6d7b664f91 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 11 Jun 2024 10:04:40 +0000 Subject: [PATCH 11/22] fix --- .github/dependabot.yml | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 2378d96de..a7a774ff2 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -15,20 +15,4 @@ updates: # To ignore a dependency, uncomment the next two lines and replace "example-package" with the name of the dependency #ignore: # - dependency-name: "example-package" - # versions: ["< 1.0.0"] - - - package-ecosystem: "poetry" - directory: "/" - schedule: - interval: "weekly" - open-pull-requests-limit: 5 - commit-message: - prefix: "deps" - labels: - - "dependencies" - reviewers: - - "Wrede" - - "stefanhellander" - #ignore: - # - dependency-name: "example-package" # versions: ["< 1.0.0"] \ No newline at end of file From f532281569c21ad8bd8c8a176eef409e880eecd9 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 11 Jun 2024 14:09:58 +0200 Subject: [PATCH 12/22] Github/SK-897 | Add release-drafter and labeler (#630) --- .github/labeler.yml | 14 ++++++ .github/release-drafter.yml | 37 ++++++++++++++++ .github/workflows/branch-name-check.yaml | 2 +- .github/workflows/integration-tests.yaml | 6 +++ .github/workflows/labeler.yaml | 14 ++++++ .github/workflows/pr_size_labeler.yaml | 54 ++++++++++++++++++++++++ .github/workflows/release-drafter.yml | 41 ++++++++++++++++++ 7 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 .github/labeler.yml create mode 100644 .github/release-drafter.yml create mode 100644 .github/workflows/labeler.yaml create mode 100644 .github/workflows/pr_size_labeler.yaml create mode 100644 .github/workflows/release-drafter.yml diff --git a/.github/labeler.yml b/.github/labeler.yml new file mode 100644 index 000000000..fdea24b8e --- /dev/null +++ b/.github/labeler.yml @@ -0,0 +1,14 @@ +# Label pull requests based on file paths, titles, or branch names +docs: +- changed-files: + - any-glob-to-any-file: ['docs/**','**/*.md'] + +github: +- changed-files: + - any-glob-to-any-file: ['.github/**'] + +feature: +- head-branch: ['^feature', 'feature'] + +fix: +- head-branch: ['^bug', 'bug', '^fix', 'fix'] \ No newline at end of file diff --git a/.github/release-drafter.yml b/.github/release-drafter.yml new file mode 100644 index 000000000..66eedb62a --- /dev/null +++ b/.github/release-drafter.yml @@ -0,0 +1,37 @@ +name-template: ' Release v$RESOLVED_VERSION' +tag-template: 'v$RESOLVED_VERSION' +categories: + - title: 'Features' + labels: + - 'feature' + - 'enhancement' + - title: 'Fixes' + labels: + - 'fix' + - 'bugfix' + - 'bug' + - title: 'Other' + label: + - 'chore' + - 'refactor' +change-template: '- $TITLE @$AUTHOR (#$NUMBER)' +change-title-escapes: '\<*_&' # You can add # and @ to disable mentions, and add ` to disable code blocks. +exclude-labels: + - 'dependencies' + - 'docs' + - 'github' +version-resolver: + major: + labels: + - 'major' + minor: + labels: + - 'minor' + patch: + labels: + - 'patch' + default: patch +template: | + ## What’s Changed + + $CHANGES \ No newline at end of file diff --git a/.github/workflows/branch-name-check.yaml b/.github/workflows/branch-name-check.yaml index 23abed21a..67ff4d04c 100644 --- a/.github/workflows/branch-name-check.yaml +++ b/.github/workflows/branch-name-check.yaml @@ -7,7 +7,7 @@ on: - master env: - BRANCH_REGEX: '^((feature|hotfix|bugfix|bug|docs|refactor)\/.+)|(release\/v((([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?))$' + BRANCH_REGEX: '^((feature|github|hotfix|bugfix|fix|bug|docs|refactor)\/.+)|(release\/v((([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?))$' jobs: branch-name-check: diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 19aa20593..ed354a78b 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -2,11 +2,17 @@ name: "integration tests" on: push: + paths-ignore: + - 'docs/**' + - '.github/**' branches: - master - develop - "release/**" pull_request: + paths-ignore: + - 'docs/**' + - '.github/**' branches: - "**" diff --git a/.github/workflows/labeler.yaml b/.github/workflows/labeler.yaml new file mode 100644 index 000000000..ba5965967 --- /dev/null +++ b/.github/workflows/labeler.yaml @@ -0,0 +1,14 @@ +name: "Pull Request Labeler" +on: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + labeler: + permissions: + contents: read + pull-requests: write + runs-on: ubuntu-latest + steps: + - uses: actions/labeler@v5 + \ No newline at end of file diff --git a/.github/workflows/pr_size_labeler.yaml b/.github/workflows/pr_size_labeler.yaml new file mode 100644 index 000000000..f614e7e23 --- /dev/null +++ b/.github/workflows/pr_size_labeler.yaml @@ -0,0 +1,54 @@ +name: "PR Size Labeler" + +on: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + label: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Get PR Size + id: get_pr_size + run: | + CHANGED_FILES=$(jq '.pull_request.changed_files' $GITHUB_EVENT_PATH) + ADDITIONS=$(jq '.pull_request.additions' $GITHUB_EVENT_PATH) + DELETIONS=$(jq '.pull_request.deletions' $GITHUB_EVENT_PATH) + echo "CHANGED_FILES=$CHANGED_FILES" >> $GITHUB_ENV + echo "ADDITIONS=$ADDITIONS" >> $GITHUB_ENV + echo "DELETIONS=$DELETIONS" >> $GITHUB_ENV + + # OBS that we are cuurently not on a stable version, thus major is disabled for now + - name: Apply Labels Based on Size + if: ${{ github.event.pull_request.changed_files != '' }} + run: | + PATCH_THRESHOLD=10 + MINOR_THRESHOLD=500 + MAJOR_THRESHOLD=1000 + + TOTAL_CHANGES=$(($ADDITIONS + $DELETIONS)) + + echo "Total changes: $TOTAL_CHANGES" + + if [ "$TOTAL_CHANGES" -le "$PATCH_THRESHOLD" ]; then + LABEL="patch" + elif [ "$TOTAL_CHANGES" -le "$MINOR_THRESHOLD" ]; then + LABEL="minor" + else + LABEL="minor" + fi + + echo "Applying label: $LABEL" + + curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \ + -H "Accept: application/vnd.github.v3+json" \ + https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels \ + -d "{\"labels\":[\"$LABEL\"]}" + + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + ADDITIONS: ${{ env.ADDITIONS }} + DELETIONS: ${{ env.DELETIONS }} \ No newline at end of file diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml new file mode 100644 index 000000000..5f1474644 --- /dev/null +++ b/.github/workflows/release-drafter.yml @@ -0,0 +1,41 @@ +name: Release Drafter + +on: + push: + # branches to consider in the event; optional, defaults to all + branches: + - master + # pull_request event is required only for autolabeler + pull_request: + # Only following types are handled by the action, but one can default to all as well + types: [opened, reopened, synchronize] + # pull_request_target event is required for autolabeler to support PRs from forks + # pull_request_target: + # types: [opened, reopened, synchronize] + +permissions: + contents: read + +jobs: + update_release_draft: + permissions: + # write permission is required to create a github release + contents: write + # write permission is required for autolabeler + # otherwise, read permission is required at least + pull-requests: write + runs-on: ubuntu-latest + steps: + # (Optional) GitHub Enterprise requires GHE_HOST variable set + #- name: Set GHE_HOST + # run: | + # echo "GHE_HOST=${GITHUB_SERVER_URL##https:\/\/}" >> $GITHUB_ENV + + # Drafts your next Release notes as Pull Requests are merged into "master" + - uses: release-drafter/release-drafter@v6 + # (Optional) specify config name to use, relative to .github/. Default: release-drafter.yml + # with: + # config-name: my-config.yml + # disable-autolabeler: true + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file From bf459632f7ad2762ec3a560432a8a028ffe70e78 Mon Sep 17 00:00:00 2001 From: Jelle Date: Wed, 12 Jun 2024 17:40:20 +0200 Subject: [PATCH 13/22] Docs/Update faq.rst (#629) --- docs/faq.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/faq.rst b/docs/faq.rst index 223aa2e49..b40ab2f5b 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -52,7 +52,7 @@ see the section about model marshaling: Q: Can I start a client listening only to training requests or only on validation requests?: -------------------------------------------------------------------------------------------- -Yes! You can toggle which message streams a client subscibes to when starting the client. For example, to start a pure validation client: +Yes! You can toggle which message streams a client subscribes to when starting the client. For example, to start a pure validation client: .. code-block:: bash @@ -62,12 +62,12 @@ Yes! You can toggle which message streams a client subscibes to when starting th Q: How do you approach the question of output privacy? ---------------------------------------------------------------------------------- -We take security in (federated) machine learning seriously. Federated learning is a foundational technology that impoves input privacy +We take security in (federated) machine learning seriously. Federated learning is a foundational technology that improves input privacy in machine learning by allowing datasets to stay local and private, and not copied to a server. FEDn is designed to provide an industry grade -implementation of the core communication and aggregration layers of federated learning, as well as configurable modules for traceability, logging +implementation of the core communication and aggregation layers of federated learning, as well as configurable modules for traceability, logging etc, to allow the developer balance between privacy and auditability. With `FEDn Studio `__ we add functionality for user authentication, authorization, and federated client identity management. As such, The FEDn Framework provides -a comprehensive software suite for implemeting secure federated learning following industry best-practices. +a comprehensive software suite for implementing secure federated learning following industry best-practices. Going beyond input privacy, there are several additional considerations relating to output privacy and potential attacks on (federated) machine learning systems. For an introduction to the topic, see this blog post: @@ -85,4 +85,4 @@ with the Scaleout team. - `LEAKPRO: Leakage Profiling and Risk Oversight for Machine Learning Models `__ - `Validating a System Development Kit for edge federated learning `__ - `Trusted Execution Environments for Federated Learning: `__ -- `Robust IoT Security: Intrusion Detection Leveraging Contributions from Multiple Systems `__ \ No newline at end of file +- `Robust IoT Security: Intrusion Detection Leveraging Contributions from Multiple Systems `__ From a0ba7d880e6a752d71fdb8bc8badd6979a327030 Mon Sep 17 00:00:00 2001 From: Jelle Date: Wed, 12 Jun 2024 21:04:28 +0200 Subject: [PATCH 14/22] Update studio.rst (#633) --- docs/studio.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/studio.rst b/docs/studio.rst index 26e4615a0..e52a9fa6a 100644 --- a/docs/studio.rst +++ b/docs/studio.rst @@ -5,7 +5,7 @@ Studio FEDn Studio is a web-based tool for managing and monitoring federated learning experiments. It provides the FEDn network as a managed service, as well as a user-friendly interface for monitoring the progress of training and visualizing the results. FEDn Studio is available as a SaaS at `fedn.scaleoutsystems.com `_ . It is free for development, testing and research (one project per user, backend compute resources sized for dev/test). -Scaleout can also support users to scale up experiments and demonstrators on Studio, by granting custom resource quotas. Additonally, charts are available for self-managed deployment on-premise or in your cloud VPC (all major cloud providers). Contact the Scaleout team for more information. +Scaleout can also support users to scale up experiments and demonstrators on Studio, by granting custom resource quotas. Additionally, charts are available for self-managed deployment on-premise or in your cloud VPC (all major cloud providers). Contact the Scaleout team for more information. Getting started --------------- @@ -29,7 +29,7 @@ Once you have created a project, you can find it via the sidebar link Projects. 2. **Clients**: management of client configurations and a list of current clients. Observe that this feature does not deploy clients, instead it configures a client config that contains a unique token which is required to connect to the reducer and the combiner. 3. **Combiners**: a list of combiners. Observe number of active clients for each combiner. 4. **Sessions**: a list of sessions with related models. Configure and start a new session. Upload compute package and seed model, set number of rounds, timeout limit etc. -5. **Models**: a list of models generated across sessions, and dahsboards for visualizing training progress. +5. **Models**: a list of models generated across sessions, and dashboards for visualizing training progress. 6. **Events**: a log of events from the combiner and the clients of the federated network. 7. **Settings**: project settings, including the option to give access to other users and to delete the project. @@ -46,7 +46,7 @@ Please see :ref:`package-creation` for instructions on how to create a package a Upload files ------------ -In the Studio UI, navigate to the project you created and click on the "Sessions" tab. Click on the "New Session" button. Under the Compute package tab, select a name and upload the generated package file. Under the Seed model tab upload the generated seed file: +In the Studio UI, navigate to the project you created and click on the "Sessions" tab. Click on the "New Session" button. Under the "Compute package" tab, select a name and upload the generated package file. Under the "Seed model" tab, upload the generated seed file: .. image:: img/upload_package.png @@ -69,12 +69,12 @@ If the client is successfully connected, you should see the client listed in the Start a training session ------------------------ -In Studio click on the "Sessions" link, then the "New session" button in the upper right corner. Click the Start session tab and enter your desirable settings (or use default) and hit the "Start run" button. In the terminal where your are running your client you should now see some activity. When the round is completed you can see the results in the FEDn Studio UI on the "Models" page. +In Studio click on the "Sessions" link, then the "New session" button in the upper right corner. Click the "Start session" tab and enter your desirable settings (or use default) and hit the "Start run" button. In the terminal where your are running your client you should now see some activity. When the round is completed, you can see the results in the FEDn Studio UI on the "Models" page. Watch the training progress --------------------------- -Once a training session is started, you can monitor the progress of the training by navigating to "Sessions" and click on the "Open" button of the active session. The session page will list the models as soon as they are generated. To get more information about a particular model, navigate to the model page by clicking the model name. From the model page you can download the model wieghts and get validation metrics. +Once a training session is started, you can monitor the progress of the training by navigating to "Sessions" and click on the "Open" button of the active session. The session page will list the models as soon as they are generated. To get more information about a particular model, navigate to the model page by clicking the model name. From the model page you can download the model weights and get validation metrics. To get an overview of how the models have evolved over time, navigate to the "Models" tab in the sidebar. Here you can see a list of all models generated across sessions along with a graph showing some metrics of how the models are performing. @@ -86,5 +86,5 @@ Accessing the API ----------------- The FEDn Studio API is available at /api/v1/. The controller host can be found in the project dashboard. Further, to access the API you need an admin API token. -Nevigate to the "Settings" tab in the project and click on the "Generate token" button. Copy the token and use it to access the API. Please see :py:mod:`fedn.network.api` for how to pass the token to the APIClient. +Navigate to the "Settings" tab in the project and click on the "Generate token" button. Copy the token and use it to access the API. Please see :py:mod:`fedn.network.api` for how to pass the token to the APIClient. From 59838979a265d49dce3f3e946bd34328b603125e Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 13 Jun 2024 10:58:08 +0200 Subject: [PATCH 15/22] =?UTF-8?q?Feature/SK-871=20|=C2=A0Enable=20updating?= =?UTF-8?q?=20models=20via=20the=20API=20(#632)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * patch and put endpoints added to enabling updates * Address - Information exposure through an exception, scan --- fedn/network/api/v1/model_routes.py | 180 +++++++++++++++--- .../storage/statestore/stores/model_store.py | 20 +- .../storage/statestore/stores/store.py | 12 +- 3 files changed, 179 insertions(+), 33 deletions(-) diff --git a/fedn/network/api/v1/model_routes.py b/fedn/network/api/v1/model_routes.py index 4de0822d2..5b2ebf925 100644 --- a/fedn/network/api/v1/model_routes.py +++ b/fedn/network/api/v1/model_routes.py @@ -4,8 +4,8 @@ from flask import Blueprint, jsonify, request, send_file from fedn.network.api.auth import jwt_auth_required -from fedn.network.api.v1.shared import api_version, get_limit, get_post_data_to_kwargs, get_reverse, get_typed_list_headers, mdb from fedn.network.api.shared import modelstorage_config +from fedn.network.api.v1.shared import api_version, get_limit, get_post_data_to_kwargs, get_reverse, get_typed_list_headers, mdb from fedn.network.storage.s3.base import RepositoryBase from fedn.network.storage.s3.miniorepository import MINIORepository from fedn.network.storage.statestore.stores.model_store import ModelStore @@ -117,8 +117,8 @@ def get_models(): response = {"count": models["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -202,8 +202,8 @@ def list_models(): response = {"count": models["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -250,8 +250,8 @@ def get_models_count(): count = model_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -302,8 +302,8 @@ def models_count(): count = model_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -346,10 +346,132 @@ def get_model(id: str): response = model return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/", methods=["PATCH"]) +@jwt_auth_required(role="admin") +def patch_model(id: str): + """Patch model + Updates a model based on the provided id. Only the fields that are present in the request will be updated. + --- + tags: + - Models + parameters: + - name: id + in: path + required: true + type: string + description: The id or model property of the model + - name: model + in: body + required: true + type: object + description: The model data to update + responses: + 200: + description: The updated model + schema: + $ref: '#/definitions/Model' + 404: + description: The model was not found + schema: + type: object + properties: + message: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + model = model_store.get(id, use_typing=False) + + data = request.get_json() + _id = model["id"] + + # Update the model with the new data + # Only update the fields that are present in the request + for key, value in data.items(): + if key in ["_id", "model"]: + continue + model[key] = value + + success = model_store.update(_id, model) + + if success: + response = model + return jsonify(response), 200 + + return jsonify({"message": "Failed to update model"}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 + + +@bp.route("/", methods=["PUT"]) +@jwt_auth_required(role="admin") +def put_model(id: str): + """Patch model + Updates a model based on the provided id. All fields will be updated with the new data. + --- + tags: + - Models + parameters: + - name: id + in: path + required: true + type: string + description: The id or model property of the model + - name: model + in: body + required: true + type: object + description: The model data to update + responses: + 200: + description: The updated model + schema: + $ref: '#/definitions/Model' + 404: + description: The model was not found + schema: + type: object + properties: + message: + type: string + 500: + description: An error occurred + schema: + type: object + properties: + message: + type: string + """ + try: + model = model_store.get(id, use_typing=False) + data = request.get_json() + _id = model["id"] + + success = model_store.update(_id, data) + + if success: + response = model + return jsonify(response), 200 + + return jsonify({"message": "Failed to update model"}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("//descendants", methods=["GET"]) @@ -400,10 +522,10 @@ def get_descendants(id: str): response = descendants return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("//ancestors", methods=["GET"]) @@ -469,10 +591,10 @@ def get_ancestors(id: str): response = ancestors return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("//download", methods=["GET"]) @@ -517,10 +639,10 @@ def download(id: str): return send_file(file, as_attachment=True, download_name=model_id) else: return jsonify({"message": "No model storage configured"}), 500 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("//parameters", methods=["GET"]) @@ -581,7 +703,7 @@ def get_parameters(id: str): return jsonify(array=weights), 200 else: return jsonify({"message": "No model storage configured"}), 500 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/storage/statestore/stores/model_store.py b/fedn/network/storage/statestore/stores/model_store.py index 5fff639f3..775eccd83 100644 --- a/fedn/network/storage/statestore/stores/model_store.py +++ b/fedn/network/storage/statestore/stores/model_store.py @@ -57,8 +57,24 @@ def get(self, id: str, use_typing: bool = False) -> Model: return Model.from_dict(document) if use_typing else from_document(document) - def update(self, id: str, item: Model) -> bool: - raise NotImplementedError("Update not implemented for ModelStore") + def _validate(self, item: Model) -> Tuple[bool, str]: + if "model" not in item or not item["model"]: + return False, "Model is required" + + return True, "" + + def _complement(self, item: Model) -> Model: + if "key" not in item or item["key"] is None: + item["key"] = "models" + + def update(self, id: str, item: Model) -> Tuple[bool, Any]: + valid, message = self._validate(item) + if not valid: + return False, message + + self._complement(item) + + return super().update(id, item) def add(self, item: Model)-> Tuple[bool, Any]: raise NotImplementedError("Add not implemented for ModelStore") diff --git a/fedn/network/storage/statestore/stores/store.py b/fedn/network/storage/statestore/stores/store.py index f76ad3e12..f1175c9f7 100644 --- a/fedn/network/storage/statestore/stores/store.py +++ b/fedn/network/storage/statestore/stores/store.py @@ -30,8 +30,16 @@ def get(self, id: str, use_typing: bool = False) -> T: return from_document(document) if not use_typing else document - def update(self, id: str, item: T) -> bool: - pass + def update(self, id: str, item: T) -> Tuple[bool, Any]: + try: + result = self.database[self.collection].update_one({"_id": ObjectId(id)}, {"$set": item}) + if result.modified_count == 1: + document = self.database[self.collection].find_one({"_id": ObjectId(id)}) + return True, from_document(document) + else: + return False, "Entity not found" + except Exception as e: + return False, str(e) def add(self, item: T) -> Tuple[bool, Any]: try: From 5e7a5a43d0de5e875c278349fff6ddfb54f59808 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 14 Jun 2024 10:53:11 +0200 Subject: [PATCH 16/22] Feature/SK-898 | Add presigned put url to inference workflow (#631) --- examples/mnist-pytorch/client/fedn.yaml | 4 +- examples/mnist-pytorch/client/predict.py | 37 ++++++++++++ fedn/network/clients/client.py | 76 ++++++++++++++++++++++-- fedn/network/clients/state.py | 1 + fedn/network/combiner/combiner.py | 66 ++++++++++---------- fedn/network/storage/s3/repository.py | 34 +++++++++++ 6 files changed, 178 insertions(+), 40 deletions(-) create mode 100644 examples/mnist-pytorch/client/predict.py diff --git a/examples/mnist-pytorch/client/fedn.yaml b/examples/mnist-pytorch/client/fedn.yaml index b05504102..30873488b 100644 --- a/examples/mnist-pytorch/client/fedn.yaml +++ b/examples/mnist-pytorch/client/fedn.yaml @@ -7,4 +7,6 @@ entry_points: train: command: python train.py validate: - command: python validate.py \ No newline at end of file + command: python validate.py + predict: + command: python predict.py \ No newline at end of file diff --git a/examples/mnist-pytorch/client/predict.py b/examples/mnist-pytorch/client/predict.py new file mode 100644 index 000000000..aaf9f0f50 --- /dev/null +++ b/examples/mnist-pytorch/client/predict.py @@ -0,0 +1,37 @@ +import os +import sys + +import torch +from data import load_data +from model import load_parameters + +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.abspath(dir_path)) + + +def predict(in_model_path, out_artifact_path, data_path=None): + """Validate model. + + :param in_model_path: The path to the input model. + :type in_model_path: str + :param out_artifact_path: The path to save the predict output to. + :type out_artifact_path: str + :param data_path: The path to the data file. + :type data_path: str + """ + # Load data + x_test, y_test = load_data(data_path, is_train=False) + + # Load model + model = load_parameters(in_model_path) + model.eval() + + # Predict + with torch.no_grad(): + y_pred = model(x_test) + # Save prediction to file/artifact, the artifact will be uploaded to the object store by the client + torch.save(y_pred, out_artifact_path) + + +if __name__ == "__main__": + predict(sys.argv[1], sys.argv[2]) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index c0f1b0baa..6f5edd332 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -14,6 +14,7 @@ from shutil import copytree import grpc +import requests from cryptography.hazmat.primitives.serialization import Encoding from google.protobuf.json_format import MessageToJson from OpenSSL import SSL @@ -22,13 +23,11 @@ import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.config import FEDN_AUTH_SCHEME, FEDN_PACKAGE_EXTRACT_DIR -from fedn.common.log_config import (logger, set_log_level_from_string, - set_log_stream) +from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream from fedn.network.clients.connect import ConnectorClient, Status from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString -from fedn.network.combiner.modelservice import (get_tmp_path, - upload_request_generator) +from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers.helpers import get_helper @@ -438,12 +437,18 @@ def _listen_to_task_stream(self): request=request, sesssion_id=request.session_id, ) - logger.info("Received model update request of type {} for model_id {}".format(request.type, request.model_id)) + logger.info("Received task request of type {} for model_id {}".format(request.type, request.model_id)) if request.type == fedn.StatusType.MODEL_UPDATE and self.config["trainer"]: self.inbox.put(("train", request)) elif request.type == fedn.StatusType.MODEL_VALIDATION and self.config["validator"]: self.inbox.put(("validate", request)) + elif request.type == fedn.StatusType.INFERENCE and self.config["validator"]: + logger.info("Received inference request for model_id {}".format(request.model_id)) + presigned_url = json.loads(request.data) + presigned_url = presigned_url["presigned_url"] + logger.info("Inference presigned URL: {}".format(presigned_url)) + self.inbox.put(("infer", request)) else: logger.error("Unknown request type: {}".format(request.type)) @@ -586,6 +591,51 @@ def _process_validation_request(self, model_id: str, is_inference: bool, session self.state = ClientState.idle return validation + def _process_inference_request(self, model_id: str, session_id: str, presigned_url: str): + """Process an inference request. + + :param model_id: The model id of the model to be used for inference. + :type model_id: str + :param session_id: The id of the current session. + :type session_id: str + :param presigned_url: The presigned URL for the data to be used for inference. + :type presigned_url: str + :return: None + """ + self.send_status(f"Processing inference request for model_id {model_id}", sesssion_id=session_id) + try: + model = self.get_model_from_combiner(str(model_id)) + if model is None: + logger.error("Could not retrieve model from combiner. Aborting inference request.") + return + inpath = self.helper.get_tmp_path() + + with open(inpath, "wb") as fh: + fh.write(model.getbuffer()) + + outpath = get_tmp_path() + self.dispatcher.run_cmd(f"predict {inpath} {outpath}") + + # Upload the inference result to the presigned URL + with open(outpath, "rb") as fh: + response = requests.put(presigned_url, data=fh.read()) + + os.unlink(inpath) + os.unlink(outpath) + + if response.status_code != 200: + logger.warning("Inference upload failed with status code {}".format(response.status_code)) + self.state = ClientState.idle + return + + except Exception as e: + logger.warning("Inference failed with exception {}".format(e)) + self.state = ClientState.idle + return + + self.state = ClientState.idle + return + def process_request(self): """Process training and validation tasks.""" while True: @@ -682,6 +732,22 @@ def process_request(self): self.state = ClientState.idle self.inbox.task_done() + elif task_type == "infer": + self.state = ClientState.inferencing + try: + presigned_url = json.loads(request.data) + except json.JSONDecodeError as e: + logger.error(f"Failed to decode inference request data: {e}") + self.state = ClientState.idle + continue + + if "presigned_url" not in presigned_url: + logger.error("Inference request missing presigned_url.") + self.state = ClientState.idle + continue + presigned_url = presigned_url["presigned_url"] + _ = self._process_inference_request(request.model_id, request.session_id, presigned_url) + self.state = ClientState.idle except queue.Empty: pass except grpc.RpcError as e: diff --git a/fedn/network/clients/state.py b/fedn/network/clients/state.py index a349f846e..d7f82a769 100644 --- a/fedn/network/clients/state.py +++ b/fedn/network/clients/state.py @@ -7,6 +7,7 @@ class ClientState(Enum): idle = 1 training = 2 validating = 3 + inferencing = 4 def ClientStateToString(state): diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 70755ac6b..48e62466c 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -169,12 +169,12 @@ def request_model_update(self, session_id, model_id, config, clients=[]): :type clients: list """ - request, clients = self._send_request_type(fedn.StatusType.MODEL_UPDATE, session_id, model_id, config, clients) + clients = self._send_request_type(fedn.StatusType.MODEL_UPDATE, session_id, model_id, config, clients) if len(clients) < 20: - logger.info("Sent model update request for model {} to clients {}".format(request.model_id, clients)) + logger.info("Sent model update request for model {} to clients {}".format(model_id, clients)) else: - logger.info("Sent model update request for model {} to {} clients".format(request.model_id, len(clients))) + logger.info("Sent model update request for model {} to {} clients".format(model_id, len(clients))) def request_model_validation(self, session_id, model_id, clients=[]): """Ask clients to validate the current global model. @@ -187,12 +187,12 @@ def request_model_validation(self, session_id, model_id, clients=[]): :type clients: list """ - request, clients = self._send_request_type(fedn.StatusType.MODEL_VALIDATION, session_id, model_id, clients) + clients = self._send_request_type(fedn.StatusType.MODEL_VALIDATION, session_id, model_id, clients) if len(clients) < 20: - logger.info("Sent model validation request for model {} to clients {}".format(request.model_id, clients)) + logger.info("Sent model validation request for model {} to clients {}".format(model_id, clients)) else: - logger.info("Sent model validation request for model {} to {} clients".format(request.model_id, len(clients))) + logger.info("Sent model validation request for model {} to {} clients".format(model_id, len(clients))) def request_model_inference(self, session_id: str, model_id: str, clients: list = []) -> None: """Ask clients to perform inference on the model. @@ -205,12 +205,12 @@ def request_model_inference(self, session_id: str, model_id: str, clients: list :type clients: list """ - request, clients = self._send_request_type(fedn.StatusType.INFERENCE, session_id, model_id, clients) + clients = self._send_request_type(fedn.StatusType.INFERENCE, session_id, model_id, clients) if len(clients) < 20: - logger.info("Sent model inference request for model {} to clients {}".format(request.model_id, clients)) + logger.info("Sent model inference request for model {} to clients {}".format(model_id, clients)) else: - logger.info("Sent model inference request for model {} to {} clients".format(request.model_id, len(clients))) + logger.info("Sent model inference request for model {} to {} clients".format(model_id, len(clients))) def _send_request_type(self, request_type, session_id, model_id, config=None, clients=[]): """Send a request of a specific type to clients. @@ -223,40 +223,38 @@ def _send_request_type(self, request_type, session_id, model_id, config=None, cl :type config: dict :param clients: the clients to send the request to :type clients: list - :return: the request and the clients - :rtype: tuple + :return: the clients + :rtype: list """ - request = fedn.TaskRequest() - request.model_id = model_id - request.correlation_id = str(uuid.uuid4()) - request.timestamp = str(datetime.now()) - request.type = request_type - request.session_id = session_id - - request.sender.name = self.id - request.sender.role = fedn.COMBINER - - if request_type == fedn.StatusType.MODEL_UPDATE: - request.data = json.dumps(config) - if len(clients) == 0: + if len(clients) == 0: + if request_type == fedn.StatusType.MODEL_UPDATE: clients = self.get_active_trainers() - elif request_type == fedn.StatusType.MODEL_VALIDATION: - if len(clients) == 0: + elif request_type == fedn.StatusType.MODEL_VALIDATION: clients = self.get_active_validators() - elif request_type == fedn.StatusType.INFERENCE: - request.data = json.dumps(config) - if len(clients) == 0: + elif request_type == fedn.StatusType.INFERENCE: # TODO: add inference clients type clients = self.get_active_validators() - - # TODO: if inference, request.data should be user-defined data/parameters - for client in clients: + request = fedn.TaskRequest() + request.model_id = model_id + request.correlation_id = str(uuid.uuid4()) + request.timestamp = str(datetime.now()) + request.type = request_type + request.session_id = session_id + + request.sender.name = self.id + request.sender.role = fedn.COMBINER request.receiver.name = client request.receiver.role = fedn.WORKER + # Set the request data, not used in validation + if request_type == fedn.StatusType.INFERENCE: + presigned_url = self.repository.presigned_put_url(self.repository.inference_bucket, f"{client}/{session_id}") + # TODO: in inference, request.data should also contain user-defined data/parameters + request.data = json.dumps({"presigned_url": presigned_url}) + elif request_type == fedn.StatusType.MODEL_UPDATE: + request.data = json.dumps(config) self._put_request_to_client_queue(request, fedn.Queue.TASK_QUEUE) - - return request, clients + return clients def get_active_trainers(self): """Get a list of active trainers. diff --git a/fedn/network/storage/s3/repository.py b/fedn/network/storage/s3/repository.py index c1704e5ca..2a5ee3449 100644 --- a/fedn/network/storage/s3/repository.py +++ b/fedn/network/storage/s3/repository.py @@ -1,3 +1,4 @@ +import datetime import uuid from fedn.common.log_config import logger @@ -10,12 +11,17 @@ class Repository: def __init__(self, config): self.model_bucket = config["storage_bucket"] self.context_bucket = config["context_bucket"] + try: + self.inference_bucket = config["inference_bucket"] + except KeyError: + self.inference_bucket = "fedn-inference" # TODO: Make a plug-in solution self.client = MINIORepository(config) self.client.create_bucket(self.context_bucket) self.client.create_bucket(self.model_bucket) + self.client.create_bucket(self.inference_bucket) def get_model(self, model_id): """Retrieve a model with id model_id. @@ -104,3 +110,31 @@ def delete_compute_package(self, compute_package): except Exception: logger.error("Failed to delete compute_package from repository.") raise + + def presigned_put_url(self, bucket: str, object_name: str, expires: datetime.timedelta = datetime.timedelta(hours=1)): + """Generate a presigned URL for an upload object request. + + :param bucket: The bucket name + :type bucket: str + :param object_name: The object name + :type object_name: str + :param expires: The time the URL is valid + :type expires: datetime.timedelta + :return: The URL + :rtype: str + """ + return self.client.client.presigned_put_object(bucket, object_name, expires) + + def presigned_get_url(self, bucket: str, object_name: str, expires: datetime.timedelta = datetime.timedelta(hours=1)) -> str: + """Generate a presigned URL for a download object request. + + :param bucket: The bucket name + :type bucket: str + :param object_name: The object name + :type object_name: str + :param expires: The time the URL is valid + :type expires: datetime.timedelta + :return: The URL + :rtype: str + """ + return self.client.client.presigned_get_object(bucket, object_name, expires) From b6f1404d910fc812ceb5d49cb29332eaeeb529ad Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 14 Jun 2024 11:20:08 +0000 Subject: [PATCH 17/22] bump version --- docs/conf.py | 19 +++++-------------- pyproject.toml | 2 +- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 1fe8d9929..fcd9a0ea9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,7 +12,7 @@ author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.9.6" +release = "0.10.0" # Add any Sphinx extension module names here, as strings extensions = [ @@ -24,7 +24,7 @@ "sphinx.ext.ifconfig", "sphinx.ext.viewcode", "sphinx_rtd_theme", - "sphinx_code_tabs" + "sphinx_code_tabs", ] # The master toctree document. @@ -71,15 +71,12 @@ # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -89,24 +86,18 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, "fedn.tex", "FEDn Documentation", - "Scaleout Systems AB", "manual"), + (master_doc, "fedn.tex", "FEDn Documentation", "Scaleout Systems AB", "manual"), ] # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, "fedn", "FEDn Documentation", - [author], 1) -] +man_pages = [(master_doc, "fedn", "FEDn Documentation", [author], 1)] # Grouping the document tree into Texinfo files. List of tuples # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, "fedn", "FEDn Documentation", - author, "fedn", "One line description of project.", - "Miscellaneous"), + (master_doc, "fedn", "FEDn Documentation", author, "fedn", "One line description of project.", "Miscellaneous"), ] # Bibliographic Dublin Core info. diff --git a/pyproject.toml b/pyproject.toml index c11605cb6..edce3abc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.9.6" +version = "0.10.0" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst" From c2929202dbec968c73d519f4f82806df58d6f04d Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 17 Jun 2024 17:44:40 +0200 Subject: [PATCH 18/22] =?UTF-8?q?Fix/SK-894=C2=A0|=C2=A0Information=20expo?= =?UTF-8?q?sure=20through=20an=20exception=20=20(#637)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fedn/network/api/interface.py | 2 +- fedn/network/api/server.py | 27 +++++++++++------- fedn/network/api/v1/client_routes.py | 24 ++++++++-------- fedn/network/api/v1/combiner_routes.py | 24 ++++++++-------- fedn/network/api/v1/package_routes.py | 32 +++++++++++----------- fedn/network/api/v1/round_routes.py | 24 ++++++++-------- fedn/network/api/v1/session_routes.py | 35 ++++++++++++------------ fedn/network/api/v1/status_routes.py | 24 ++++++++-------- fedn/network/api/v1/validation_routes.py | 24 ++++++++-------- 9 files changed, 112 insertions(+), 104 deletions(-) diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 5cd465085..7fb6ae8ca 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -659,7 +659,7 @@ def set_initial_model(self, file): self.control.commit(file.filename, model) except Exception as e: logger.debug(e) - return jsonify({"success": False, "message": e}) + return jsonify({"success": False, "message": "Failed to add initial model."}) return jsonify({"success": True, "message": "Initial model added successfully."}) diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index 8f046ee80..e13cfdf5a 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -5,9 +5,8 @@ from fedn.common.config import get_controller_config from fedn.network.api.auth import jwt_auth_required from fedn.network.api.interface import API +from fedn.network.api.shared import control, statestore from fedn.network.api.v1 import _routes -from fedn.network.api.shared import statestore, control - custom_url_prefix = os.environ.get("FEDN_CUSTOM_URL_PREFIX", False) api = API(statestore, control) @@ -569,8 +568,10 @@ def add_combiner(): remote_addr = request.remote_addr try: response = api.add_combiner(**json_data, remote_addr=remote_addr) - except TypeError as e: - return jsonify({"success": False, "message": str(e)}), 400 + except TypeError: + return jsonify({"success": False, "message": "Invalid data provided"}), 400 + except Exception: + return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response @@ -589,8 +590,10 @@ def add_client(): remote_addr = request.remote_addr try: response = api.add_client(**json_data, remote_addr=remote_addr) - except TypeError as e: - return jsonify({"success": False, "message": str(e)}), 400 + except TypeError: + return jsonify({"success": False, "message": "Invalid data provided"}), 400 + except Exception: + return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response @@ -612,8 +615,10 @@ def list_combiners_data(): try: response = api.list_combiners_data(combiners) - except TypeError as e: - return jsonify({"success": False, "message": str(e)}), 400 + except TypeError: + return jsonify({"success": False, "message": "Invalid data provided"}), 400 + except Exception: + return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response @@ -630,8 +635,10 @@ def get_plot_data(): try: feature = request.args.get("feature", None) response = api.get_plot_data(feature=feature) - except TypeError as e: - return jsonify({"success": False, "message": str(e)}), 400 + except TypeError: + return jsonify({"success": False, "message": "Invalid data provided"}), 400 + except Exception: + return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response diff --git a/fedn/network/api/v1/client_routes.py b/fedn/network/api/v1/client_routes.py index d5ccc58ee..8fa13febe 100644 --- a/fedn/network/api/v1/client_routes.py +++ b/fedn/network/api/v1/client_routes.py @@ -121,8 +121,8 @@ def get_clients(): response = {"count": clients["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -206,8 +206,8 @@ def list_clients(): response = {"count": clients["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -267,8 +267,8 @@ def get_clients_count(): count = client_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -320,8 +320,8 @@ def clients_count(): count = client_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -364,7 +364,7 @@ def get_client(id: str): response = client return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/combiner_routes.py b/fedn/network/api/v1/combiner_routes.py index 1f9360461..9210a7e30 100644 --- a/fedn/network/api/v1/combiner_routes.py +++ b/fedn/network/api/v1/combiner_routes.py @@ -113,8 +113,8 @@ def get_combiners(): response = {"count": combiners["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -196,8 +196,8 @@ def list_combiners(): response = {"count": combiners["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -243,8 +243,8 @@ def get_combiners_count(): count = combiner_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -292,8 +292,8 @@ def combiners_count(): count = combiner_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -335,7 +335,7 @@ def get_combiner(id: str): response = combiner return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/package_routes.py b/fedn/network/api/v1/package_routes.py index 65783f54b..7add1a220 100644 --- a/fedn/network/api/v1/package_routes.py +++ b/fedn/network/api/v1/package_routes.py @@ -125,8 +125,8 @@ def get_packages(): response = {"count": packages["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -213,8 +213,8 @@ def list_packages(): response = {"count": packages["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -274,8 +274,8 @@ def get_packages_count(): count = package_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -336,8 +336,8 @@ def packages_count(): count = package_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -381,10 +381,10 @@ def get_package(id: str): response = package.__dict__ if use_typing else package return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/active", methods=["GET"]) @@ -421,7 +421,7 @@ def get_active_package(): response = package.__dict__ if use_typing else package return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/round_routes.py b/fedn/network/api/v1/round_routes.py index 4c2eb0c44..14476a091 100644 --- a/fedn/network/api/v1/round_routes.py +++ b/fedn/network/api/v1/round_routes.py @@ -101,8 +101,8 @@ def get_rounds(): response = {"count": rounds["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -180,8 +180,8 @@ def list_rounds(): response = {"count": rounds["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -221,8 +221,8 @@ def get_rounds_count(): count = round_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -266,8 +266,8 @@ def rounds_count(): count = round_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -309,7 +309,7 @@ def get_round(id: str): response = round return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/session_routes.py b/fedn/network/api/v1/session_routes.py index 240cb8443..f3f81fac0 100644 --- a/fedn/network/api/v1/session_routes.py +++ b/fedn/network/api/v1/session_routes.py @@ -3,11 +3,12 @@ from flask import Blueprint, jsonify, request from fedn.network.api.auth import jwt_auth_required +from fedn.network.api.shared import control from fedn.network.api.v1.shared import api_version, get_post_data_to_kwargs, get_typed_list_headers, mdb from fedn.network.storage.statestore.stores.session_store import SessionStore from fedn.network.storage.statestore.stores.shared import EntityNotFound + from .model_routes import model_store -from fedn.network.api.shared import control bp = Blueprint("session", __name__, url_prefix=f"/api/{api_version}/sessions") @@ -97,8 +98,8 @@ def get_sessions(): response = {"count": sessions["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -175,8 +176,8 @@ def list_sessions(): response = {"count": sessions["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -216,8 +217,8 @@ def get_sessions_count(): count = session_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -261,8 +262,8 @@ def sessions_count(): count = session_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -304,10 +305,10 @@ def get_session(id: str): response = session return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["POST"]) @@ -349,8 +350,8 @@ def post(): status_code: int = 201 if successful else 400 return jsonify(response), status_code - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/start", methods=["POST"]) @@ -386,5 +387,5 @@ def start_session(): threading.Thread(target=control.start_session, args=(session_id, rounds)).start() return jsonify({"message": "Session started"}), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/status_routes.py b/fedn/network/api/v1/status_routes.py index b88772b01..cf3907bea 100644 --- a/fedn/network/api/v1/status_routes.py +++ b/fedn/network/api/v1/status_routes.py @@ -131,8 +131,8 @@ def get_statuses(): response = {"count": statuses["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -226,8 +226,8 @@ def list_statuses(): response = {"count": statuses["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -288,8 +288,8 @@ def get_statuses_count(): count = status_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -350,8 +350,8 @@ def statuses_count(): count = status_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -395,7 +395,7 @@ def get_status(id: str): response = status.__dict__ if use_typing else status return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 diff --git a/fedn/network/api/v1/validation_routes.py b/fedn/network/api/v1/validation_routes.py index 59767e3e8..665abbb4b 100644 --- a/fedn/network/api/v1/validation_routes.py +++ b/fedn/network/api/v1/validation_routes.py @@ -138,8 +138,8 @@ def get_validations(): response = {"count": validations["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/list", methods=["POST"]) @@ -236,8 +236,8 @@ def list_validations(): response = {"count": validations["count"], "result": result} return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["GET"]) @@ -302,8 +302,8 @@ def get_validations_count(): count = validation_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/count", methods=["POST"]) @@ -367,8 +367,8 @@ def validations_count(): count = validation_store.count(**kwargs) response = count return jsonify(response), 200 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 @bp.route("/", methods=["GET"]) @@ -412,7 +412,7 @@ def get_validation(id: str): response = validation.__dict__ if use_typing else validation return jsonify(response), 200 - except EntityNotFound as e: - return jsonify({"message": str(e)}), 404 - except Exception as e: - return jsonify({"message": str(e)}), 500 + except EntityNotFound: + return jsonify({"message": f"Entity with id: {id} not found"}), 404 + except Exception: + return jsonify({"message": "An unexpected error occurred"}), 500 From 7d56caf5ebf03c8c36bfa995b3ad5fcb19abae1c Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Mon, 17 Jun 2024 21:25:45 +0200 Subject: [PATCH 19/22] Fix/SK-905 | Don't allow v8.4.0 of tenacity #636 --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index edce3abc6..ab2ba1688 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,8 @@ dependencies = [ "grpcio-health-checking~=1.60.0", "pyyaml", "plotly", - "virtualenv" + "virtualenv", + "tenacity!=8.4.0", ] [project.urls] From 5d3950361d9a1a78f8aadaa77e519be0f8181d3f Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 18 Jun 2024 09:54:27 +0200 Subject: [PATCH 20/22] Fix/SK-904 | Uncontrolled data used in path expression (#635) --- fedn/network/api/interface.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 7fb6ae8ca..cddf7bc91 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -1,11 +1,11 @@ import base64 import copy -import os import threading import uuid from io import BytesIO from flask import jsonify, send_from_directory +from werkzeug.security import safe_join from werkzeug.utils import secure_filename from fedn.common.config import get_controller_config, get_network_config @@ -232,7 +232,7 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript file_name = file.filename storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") - file_path = os.path.join("/app/client/package/", storage_file_name) + file_path = safe_join("/app/client/package/", storage_file_name) file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) @@ -377,7 +377,7 @@ def download_compute_package(self, name): try: data = self.control.get_compute_package(name) # TODO: make configurable, perhaps in config.py or package.py - file_path = os.path.join("/app/client/package/", name) + file_path = safe_join("/app/client/package/", name) with open(file_path, "wb") as fh: fh.write(data) # TODO: make configurable, perhaps in config.py or package.py @@ -399,7 +399,7 @@ def _create_checksum(self, name=None): name, message = self._get_compute_package_name() if name is None: return False, message, "" - file_path = os.path.join("/app/client/package/", name) # TODO: make configurable, perhaps in config.py or package.py + file_path = safe_join("/app/client/package/", name) # TODO: make configurable, perhaps in config.py or package.py try: sum = str(sha(file_path)) except FileNotFoundError: From c984eed937a69e957a884a232fa7f4e29a741d19 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 18 Jun 2024 10:26:54 +0200 Subject: [PATCH 21/22] Fix/SK-903 | Clear-text logging of sensitive information (#634) --- .github/workflows/branch-name-check.yaml | 2 +- fedn/network/api/interface.py | 2 -- fedn/network/grpc/server.py | 20 ++++++++++++-------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/.github/workflows/branch-name-check.yaml b/.github/workflows/branch-name-check.yaml index 67ff4d04c..41f431cc1 100644 --- a/.github/workflows/branch-name-check.yaml +++ b/.github/workflows/branch-name-check.yaml @@ -7,7 +7,7 @@ on: - master env: - BRANCH_REGEX: '^((feature|github|hotfix|bugfix|fix|bug|docs|refactor)\/.+)|(release\/v((([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?))$' + BRANCH_REGEX: '^((feature|github|dependabot|hotfix|bugfix|fix|bug|docs|refactor)\/.+)|(release\/v((([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?))$' jobs: branch-name-check: diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index cddf7bc91..c50adf8e2 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -626,8 +626,6 @@ def add_client(self, client_id, preferred_combiner, remote_addr): "certificate": cert, "helper_type": self.control.statestore.get_helper(), } - logger.info(f"Sending payload: {payload}") - return jsonify(payload) def get_initial_model(self): diff --git a/fedn/network/grpc/server.py b/fedn/network/grpc/server.py index 4354a7aa5..a23691505 100644 --- a/fedn/network/grpc/server.py +++ b/fedn/network/grpc/server.py @@ -9,10 +9,9 @@ class Server: - """ Class for configuring and launching the gRPC server.""" + """Class for configuring and launching the gRPC server.""" def __init__(self, servicer, modelservicer, config): - set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) @@ -34,21 +33,26 @@ def __init__(self, servicer, modelservicer, config): health_pb2_grpc.add_HealthServicer_to_server(self.health_servicer, self.server) if config["secure"]: - logger.info(f'Creating secure gRPCS server using certificate: {config["certificate"]}') + logger.info("Creating secure gRPCS server using certificate") server_credentials = grpc.ssl_server_credentials( - ((config["key"], config["certificate"],),)) - self.server.add_secure_port( - "[::]:" + str(config["port"]), server_credentials) + ( + ( + config["key"], + config["certificate"], + ), + ) + ) + self.server.add_secure_port("[::]:" + str(config["port"]), server_credentials) else: logger.info("Creating gRPC server") self.server.add_insecure_port("[::]:" + str(config["port"])) def start(self): - """ Start the gRPC server.""" + """Start the gRPC server.""" logger.info("gRPC Server started") self.server.start() def stop(self): - """ Stop the gRPC server.""" + """Stop the gRPC server.""" logger.info("gRPC Server stopped") self.server.stop(0) From 36edfd3b57dea0662a6f4352d28f25d5874437a0 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 18 Jun 2024 08:29:58 +0000 Subject: [PATCH 22/22] bump version --- docs/conf.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index fcd9a0ea9..bebc3a80e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,7 +12,7 @@ author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.10.0" +release = "0.11.0" # Add any Sphinx extension module names here, as strings extensions = [ diff --git a/pyproject.toml b/pyproject.toml index ab2ba1688..5773a38a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.10.0" +version = "0.11.0" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst"