From 2a207fe0f9972e3bc69cc48ffaf5149675499548 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Mon, 8 Jul 2024 13:05:52 +0000 Subject: [PATCH 01/15] init commit --- fedn/cli/combiner_cmd.py | 3 +- fedn/common/certificate/certificate.py | 40 +++--- fedn/network/api/interface.py | 77 ++--------- fedn/network/api/network.py | 14 +- fedn/network/api/server.py | 7 +- fedn/network/clients/client.py | 25 +--- fedn/network/clients/connect.py | 9 +- fedn/network/combiner/combiner.py | 130 +++++++++++------- fedn/network/combiner/interfaces.py | 7 - fedn/network/combiner/roundhandler.py | 5 +- fedn/network/combiner/shared.py | 13 ++ fedn/network/grpc/server.py | 17 ++- .../storage/statestore/mongostatestore.py | 7 +- 13 files changed, 163 insertions(+), 191 deletions(-) create mode 100644 fedn/network/combiner/shared.py diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index 02a797448..14d8c2d99 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -12,8 +12,7 @@ @main.group("combiner") @click.pass_context def combiner_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass diff --git a/fedn/common/certificate/certificate.py b/fedn/common/certificate/certificate.py index 3cb09016c..547175a20 100644 --- a/fedn/common/certificate/certificate.py +++ b/fedn/common/certificate/certificate.py @@ -9,24 +9,27 @@ class Certificate: - """Utility to generate unsigned certificates. - - """ + """Utility to generate unsigned certificates.""" CERT_NAME = "cert.pem" KEY_NAME = "key.pem" BITS = 2048 - def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", create_dirs=True): - try: - os.makedirs(cwd) - except OSError: - logger.info("Directory exists, will store all cert and keys here.") + def __init__(self, name=None, key_path="", cert_path="", create_dirs=False): + if create_dirs: + try: + cwd = os.getcwd() + os.makedirs(cwd) + except OSError: + logger.info("Directory exists, will store all cert and keys here.") + else: + logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) + + self.key_path = os.path.join(cwd, "key.pem") + self.cert_path = os.path.join(cwd, "cert.pem") else: - logger.info("Successfully created the directory to store cert and keys in {}".format(cwd)) - - self.key_path = os.path.join(cwd, key_name) - self.cert_path = os.path.join(cwd, cert_name) + self.key_path = key_path + self.cert_path = cert_path if name: self.name = name @@ -36,9 +39,7 @@ def __init__(self, cwd, name=None, key_name="key.pem", cert_name="cert.pem", cre def gen_keypair( self, ): - """Generate keypair. - - """ + """Generate keypair.""" key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 2048) cert = crypto.X509() @@ -73,8 +74,7 @@ def set_keypair_raw(self, certificate, privatekey): certfile.write(crypto.dump_certificate(crypto.FILETYPE_PEM, certificate)) def get_keypair_raw(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() with open(self.cert_path, "rb") as certfile: @@ -82,16 +82,14 @@ def get_keypair_raw(self): return copy.deepcopy(cert_buf), copy.deepcopy(key_buf) def get_key(self): - """:return: - """ + """:return:""" with open(self.key_path, "rb") as keyfile: key_buf = keyfile.read() key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_buf) return key def get_cert(self): - """:return: - """ + """:return:""" with open(self.cert_path, "rb") as certfile: cert_buf = certfile.read() cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf) diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 9936e0bc0..f1815f39f 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -1,5 +1,4 @@ -import base64 -import copy +import os import threading import uuid from io import BytesIO @@ -10,7 +9,7 @@ from fedn.common.config import get_controller_config, get_network_config from fedn.common.log_config import logger -from fedn.network.combiner.interfaces import CombinerInterface, CombinerUnavailableError +from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString from fedn.utils.checksum import sha @@ -231,7 +230,7 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript file_name = file.filename storage_file_name = secure_filename(f"{str(uuid.uuid4())}.{extension}") - file_path = safe_join("/app/client/package/", storage_file_name) + file_path = safe_join(os.getcwd(), storage_file_name) file.save(file_path) self.control.set_compute_package(storage_file_name, file_path) @@ -247,7 +246,8 @@ def set_compute_package(self, file, helper_type: str, name: str = None, descript ), 400, ) - + # Delete the file after it has been saved + os.remove(file_path) return jsonify({"success": True, "message": "Compute package set."}) def _get_compute_package_name(self): @@ -371,19 +371,21 @@ def download_compute_package(self, name): mutex = threading.Lock() mutex.acquire() # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: try: data = self.control.get_compute_package(name) # TODO: make configurable, perhaps in config.py or package.py - file_path = safe_join("/app/client/package/", name) + file_path = safe_join(os.getcwd(), name) with open(file_path, "wb") as fh: fh.write(data) # TODO: make configurable, perhaps in config.py or package.py - return send_from_directory("/app/client/package/", name, as_attachment=True) + return send_from_directory(os.getcwd(), name, as_attachment=True) except Exception: raise finally: + # Delete the file after it has been saved + os.remove(file_path) mutex.release() def _create_checksum(self, name=None): @@ -398,7 +400,7 @@ def _create_checksum(self, name=None): name, message = self._get_compute_package_name() if name is None: return False, message, "" - file_path = safe_join("/app/client/package/", name) # TODO: make configurable, perhaps in config.py or package.py + file_path = safe_join(os.getcwd(), name) # TODO: make configurable, perhaps in config.py or package.py try: sum = str(sha(file_path)) except FileNotFoundError: @@ -502,53 +504,10 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por :return: Config of the combiner as a json response. :rtype: :class:`flask.Response` """ - # TODO: Any more required check for config? Formerly based on status: "retry" - if not self.control.idle(): - return jsonify( - { - "success": False, - "status": "retry", - "message": "Conroller is not in idle state, try again later. ", - } - ) - # Check if combiner already exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - if secure_grpc == "True": - certificate, key = self.certificate_manager.get_or_create(address).get_keypair_raw() - _ = base64.b64encode(certificate) - _ = base64.b64encode(key) - - else: - certificate = None - key = None - - combiner_interface = CombinerInterface( - parent=self._to_dict(), - name=combiner_id, - address=address, - fqdn=fqdn, - port=port, - certificate=copy.deepcopy(certificate), - key=copy.deepcopy(key), - ip=remote_addr, - ) - - self.control.network.add_combiner(combiner_interface) - - # Check combiner now exists - combiner = self.control.network.get_combiner(combiner_id) - if not combiner: - return jsonify({"success": False, "message": "Combiner not added."}) - payload = { - "success": True, - "message": "Combiner added successfully.", - "status": "added", - "storage": self.statestore.get_storage_backend(), - "statestore": self.statestore.get_config(), - "certificate": combiner.get_certificate(), - "key": combiner.get_key(), + "success": False, + "message": "Adding combiner via REST API is obsolete. Include statestore and object store config in combiner config.", + "status": "abort", } return jsonify(payload) @@ -609,13 +568,6 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): # Add client to network self.control.network.add_client(client_config) - # Setup response containing information about the combiner for assinging the client - if combiner.certificate: - cert_b64 = base64.b64encode(combiner.certificate) - cert = str(cert_b64).split("'")[1] - else: - cert = None - payload = { "status": "assigned", "host": combiner.address, @@ -623,7 +575,6 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): "package": "remote", # TODO: Make this configurable "ip": combiner.ip, "port": combiner.port, - "certificate": cert, "helper_type": self.control.statestore.get_helper(), } return jsonify(payload) diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index 5e2f2ef91..c8716ad09 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -1,4 +1,4 @@ -import base64 +import os from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerInterface @@ -47,14 +47,14 @@ def get_combiners(self): data = self.statestore.get_combiners() combiners = [] for c in data["result"]: - if c["certificate"]: - cert = base64.b64decode(c["certificate"]) - key = base64.b64decode(c["key"]) + name = c["name"].upper() + if os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}"): + cert_path = os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}") + with open(cert_path, "rb") as f: + cert = f.read() else: cert = None - key = None - - combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, key=key, ip=c["ip"])) + combiners.append(CombinerInterface(c["parent"], c["name"], c["address"], c["fqdn"], c["port"], certificate=cert, ip=c["ip"])) return combiners diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index c9e54ff87..820ac93ac 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -625,8 +625,13 @@ def list_combiners_data(): if custom_url_prefix: app.add_url_rule(f"{custom_url_prefix}/list_combiners_data", view_func=list_combiners_data, methods=["POST"]) -if __name__ == "__main__": + +def start(): config = get_controller_config() port = config["port"] debug = config["debug"] app.run(debug=debug, port=port, host="0.0.0.0") + + +if __name__ == "__main__": + start() diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index e594d7b6d..fe1c7ef90 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -198,13 +198,7 @@ def connect(self, combiner_config): port = 443 logger.info(f"Initiating connection to combiner host at: {host}:{port}") - if combiner_config["certificate"]: - logger.info("Utilizing CA certificate for GRPC channel authentication.") - secure = True - cert = base64.b64decode(combiner_config["certificate"]) # .decode('utf-8') - credentials = grpc.ssl_channel_credentials(root_certificates=cert) - channel = grpc.secure_channel("{}:{}".format(host, str(port)), credentials) - elif os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): + if os.getenv("FEDN_GRPC_ROOT_CERT_PATH"): secure = True logger.info("Using root certificate from environment variable for GRPC channel.") with open(os.environ["FEDN_GRPC_ROOT_CERT_PATH"], "rb") as f: @@ -236,8 +230,6 @@ def connect(self, combiner_config): logger.info("Successfully established {} connection to {}:{}".format("secure" if secure else "insecure", host, port)) - logger.info("Using {} compute package.".format(combiner_config["package"])) - self._connected = True def disconnect(self): @@ -292,9 +284,8 @@ def _initialize_dispatcher(self, config): :type config: dict :return: """ + pr = PackageRuntime(self.run_path) if config["remote_compute_context"]: - pr = PackageRuntime(self.run_path) - retval = None tries = 10 @@ -333,18 +324,8 @@ def _initialize_dispatcher(self, config): logger.error(f"Caught exception: {type(e).__name__}") else: - # TODO: Deprecate - dispatch_config = { - "entry_points": { - "predict": {"command": "python3 predict.py"}, - "train": {"command": "python3 train.py"}, - "validate": {"command": "python3 validate.py"}, - } - } from_path = os.path.join(os.getcwd(), "client") - - copytree(from_path, self.run_path) - self.dispatcher = Dispatcher(dispatch_config, self.run_path) + self.dispatcher = pr.dispatcher(from_path) # Get or create python environment activate_cmd = self.dispatcher._get_or_create_python_env() if activate_cmd: diff --git a/fedn/network/clients/connect.py b/fedn/network/clients/connect.py index 59aaead35..1a24a5c53 100644 --- a/fedn/network/clients/connect.py +++ b/fedn/network/clients/connect.py @@ -110,17 +110,10 @@ def assign(self): if "message" in retval.json(): reason = retval.json()["message"] else: - reason = "Reducer was not ready. Try again later." + reason = "Controller was not ready. Try again later." return Status.TryAgain, reason - reducer_package = retval.json()["package"] - if reducer_package != self.package: - reason = "Unmatched config of compute package between client and reducer.\n" + "Reducer uses {} package and client uses {}.".format( - reducer_package, self.package - ) - return Status.UnMatchedConfig, reason - return Status.Assigned, retval.json() return Status.Unassigned, None diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 7f7b93548..af86b4a2e 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -1,24 +1,21 @@ -import base64 import json import queue import re import signal -import sys import threading import time import uuid from datetime import datetime, timedelta from enum import Enum +from typing import TypedDict import fedn.network.grpc.fedn_pb2 as fedn import fedn.network.grpc.fedn_pb2_grpc as rpc +from fedn.common.certificate.certificate import Certificate from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream -from fedn.network.combiner.connect import ConnectorCombiner, Status -from fedn.network.combiner.modelservice import ModelService from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler -from fedn.network.grpc.server import Server -from fedn.network.storage.s3.repository import Repository -from fedn.network.storage.statestore.mongostatestore import MongoStateStore +from fedn.network.combiner.shared import repository, statestore +from fedn.network.grpc.server import Server, ServerConfig VALID_NAME_REGEX = "^[a-zA-Z0-9_-]*$" @@ -50,6 +47,28 @@ def role_to_proto_role(role): return fedn.OTHER +class CombinerConfig(TypedDict): + """Configuration for the combiner.""" + + discover_host: str + discover_port: int + token: str + host: str + port: int + ip: str + parent: str + fqdn: str + name: str + secure: bool + verify: bool + cert_path: str + key_path: str + max_clients: int + network_id: str + logfile: str + verbosity: str + + class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, rpc.ControlServicer): """Combiner gRPC server. @@ -75,51 +94,54 @@ def __init__(self, config): self.max_clients = config["max_clients"] # Connector to announce combiner to discover service (reducer) - announce_client = ConnectorCombiner( - host=config["discover_host"], - port=config["discover_port"], - myhost=config["host"], - fqdn=config["fqdn"], - myport=config["port"], - token=config["token"], - name=config["name"], - secure=config["secure"], - verify=config["verify"], - ) + # announce_client = ConnectorCombiner( + # host=config["discover_host"], + # port=config["discover_port"], + # myhost=config["host"], + # fqdn=config["fqdn"], + # myport=config["port"], + # token=config["token"], + # name=config["name"], + # secure=config["secure"], + # verify=config["verify"], + # ) + + # while True: + # # Announce combiner to discover service + # status, response = announce_client.announce() + # if status == Status.TryAgain: + # logger.info(response) + # time.sleep(5) + # elif status == Status.Assigned: + # announce_config = response + # logger.info("COMBINER {0}: Announced successfully".format(self.id)) + # break + # elif status == Status.UnAuthorized: + # logger.info(response) + # logger.info("Status.UnAuthorized") + # sys.exit("Exiting: Unauthorized") + # elif status == Status.UnMatchedConfig: + # logger.info(response) + # logger.info("Status.UnMatchedConfig") + # sys.exit("Exiting: Missing config") - while True: - # Announce combiner to discover service - status, response = announce_client.announce() - if status == Status.TryAgain: - logger.info(response) - time.sleep(5) - elif status == Status.Assigned: - announce_config = response - logger.info("COMBINER {0}: Announced successfully".format(self.id)) - break - elif status == Status.UnAuthorized: - logger.info(response) - logger.info("Status.UnAuthorized") - sys.exit("Exiting: Unauthorized") - elif status == Status.UnMatchedConfig: - logger.info(response) - logger.info("Status.UnMatchedConfig") - sys.exit("Exiting: Missing config") - - cert = announce_config["certificate"] - key = announce_config["key"] - - if announce_config["certificate"]: - cert = base64.b64decode(announce_config["certificate"]) # .decode('utf-8') - key = base64.b64decode(announce_config["key"]) # .decode('utf-8') + # Set up model repository + self.repository = repository - # Set up gRPC server configuration - grpc_config = {"port": config["port"], "secure": config["secure"], "certificate": cert, "key": key} + self.statestore = statestore - # Set up model repository - self.repository = Repository(announce_config["storage"]["storage_config"]) + from fedn.network.combiner.interfaces import CombinerInterface - self.statestore = MongoStateStore(announce_config["statestore"]["network_id"], announce_config["statestore"]["mongo_config"]) + # Add combiner to statestore + interface_config = { + "port": config["port"], + "fqdn": config["fqdn"], + "name": config["name"], + "address": config["host"], + "parent": "localhost", + "ip": "", + } + self.statestore.set_combiner(interface_config) # Fetch all clients previously connected to the combiner # If a client and a combiner goes down at the same time, @@ -129,13 +151,19 @@ def __init__(self, config): for client in previous_clients: self.statestore.set_client({"name": client["name"], "status": "offline", "client_id": client["client_id"]}) - self.modelservice = ModelService() + # Set up gRPC server configuration + if config["secure"]: + cert = Certificate(key_path=config["key_path"], cert_path=config["cert_path"]) + certificate, key = cert.get_keypair_raw() + grpc_server_config = ServerConfig(port=config["port"], secure=True, key=key, certificate=certificate) + else: + grpc_server_config = ServerConfig(port=config["port"], secure=False) # Create gRPC server - self.server = Server(self, self.modelservice, grpc_config) + self.server = Server(self, grpc_server_config) # Set up round controller - self.round_handler = RoundHandler(self.repository, self, self.modelservice) + self.round_handler = RoundHandler(self) # Start thread for round controller threading.Thread(target=self.round_handler.run, daemon=True).start() diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index 935b75442..20da29d23 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -125,13 +125,6 @@ def to_dict(self): "key": None, "config": self.config, } - - if self.certificate: - cert_b64 = base64.b64encode(self.certificate) - key_b64 = base64.b64encode(self.key) - data["certificate"] = str(cert_b64).split("'")[1] - data["key"] = str(key_b64).split("'")[1] - return data def to_json(self): diff --git a/fedn/network/combiner/roundhandler.py b/fedn/network/combiner/roundhandler.py index ef9029de9..8cad774b0 100644 --- a/fedn/network/combiner/roundhandler.py +++ b/fedn/network/combiner/roundhandler.py @@ -9,6 +9,7 @@ from fedn.common.log_config import logger from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator from fedn.network.combiner.modelservice import load_model_from_BytesIO, serialize_model_to_BytesIO +from fedn.network.combiner.shared import modelservice, repository from fedn.utils.helpers.helpers import get_helper from fedn.utils.parameters import Parameters @@ -84,10 +85,10 @@ class RoundHandler: :type modelservice: class: `fedn.network.combiner.modelservice.ModelService` """ - def __init__(self, storage, server, modelservice): + def __init__(self, server): """Initialize the RoundHandler.""" self.round_configs = queue.Queue() - self.storage = storage + self.storage = repository self.server = server self.modelservice = modelservice diff --git a/fedn/network/combiner/shared.py b/fedn/network/combiner/shared.py new file mode 100644 index 000000000..5e5ee114c --- /dev/null +++ b/fedn/network/combiner/shared.py @@ -0,0 +1,13 @@ +from fedn.common.config import get_modelstorage_config, get_network_config, get_statestore_config +from fedn.network.combiner.modelservice import ModelService +from fedn.network.storage.s3.repository import Repository +from fedn.network.storage.statestore.mongostatestore import MongoStateStore + +statestore_config = get_statestore_config() +modelstorage_config = get_modelstorage_config() +network_id = get_network_config() + +statestore = MongoStateStore(network_id, statestore_config["mongo_config"]) +repository = Repository(modelstorage_config["storage_config"]) + +modelservice = ModelService() diff --git a/fedn/network/grpc/server.py b/fedn/network/grpc/server.py index a23691505..edd2fd6d5 100644 --- a/fedn/network/grpc/server.py +++ b/fedn/network/grpc/server.py @@ -1,17 +1,28 @@ from concurrent import futures +from typing import TypedDict import grpc from grpc_health.v1 import health, health_pb2_grpc import fedn.network.grpc.fedn_pb2_grpc as rpc from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream +from fedn.network.combiner.shared import modelservice from fedn.network.grpc.auth import JWTInterceptor +class ServerConfig(TypedDict): + port: int + secure: bool + key: str + certificate: str + logfile: str + verbosity: str + + class Server: """Class for configuring and launching the gRPC server.""" - def __init__(self, servicer, modelservicer, config): + def __init__(self, servicer, config: ServerConfig): set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) @@ -25,8 +36,8 @@ def __init__(self, servicer, modelservicer, config): rpc.add_ConnectorServicer_to_server(servicer, self.server) if isinstance(servicer, rpc.ReducerServicer): rpc.add_ReducerServicer_to_server(servicer, self.server) - if isinstance(modelservicer, rpc.ModelServiceServicer): - rpc.add_ModelServiceServicer_to_server(modelservicer, self.server) + if isinstance(modelservice, rpc.ModelServiceServicer): + rpc.add_ModelServiceServicer_to_server(modelservice, self.server) if isinstance(servicer, rpc.CombinerServicer): rpc.add_ControlServicer_to_server(servicer, self.server) diff --git a/fedn/network/storage/statestore/mongostatestore.py b/fedn/network/storage/statestore/mongostatestore.py index 7262e5554..1a7111fe3 100644 --- a/fedn/network/storage/statestore/mongostatestore.py +++ b/fedn/network/storage/statestore/mongostatestore.py @@ -6,7 +6,6 @@ from google.protobuf.json_format import MessageToDict from fedn.common.log_config import logger -from fedn.network.combiner.roundhandler import RoundConfig from fedn.network.state import ReducerStateToString, StringToReducerState @@ -872,7 +871,7 @@ def create_round(self, round_data): # TODO: Add check if round_id already exists self.rounds.insert_one(round_data) - def set_session_config(self, id: str, config: RoundConfig) -> None: + def set_session_config(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -883,7 +882,7 @@ def set_session_config(self, id: str, config: RoundConfig) -> None: self.sessions.update_one({"session_id": str(id)}, {"$push": {"session_config": config}}, True) # Added to accomodate new session config structure - def set_session_config_v2(self, id: str, config: RoundConfig) -> None: + def set_session_config_v2(self, id: str, config) -> None: """Set the session configuration. :param id: The session id @@ -910,7 +909,7 @@ def set_round_combiner_data(self, data): """ self.rounds.update_one({"round_id": str(data["round_id"])}, {"$push": {"combiners": data}}, True) - def set_round_config(self, round_id, round_config: RoundConfig): + def set_round_config(self, round_id, round_config): """Set round configuration. :param round_id: The round unique identifier From 79d415e54423b372f037dfe34e546b968d53ab94 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 9 Jul 2024 07:14:44 +0000 Subject: [PATCH 02/15] general vs specfic cert handling --- fedn/network/api/network.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fedn/network/api/network.py b/fedn/network/api/network.py index c8716ad09..542761f49 100644 --- a/fedn/network/api/network.py +++ b/fedn/network/api/network.py @@ -48,7 +48,12 @@ def get_combiners(self): combiners = [] for c in data["result"]: name = c["name"].upper() - if os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}"): + # General certificate handling, same for all combiners. + if os.environ.get("FEDN_GRPC_CERT_PATH"): + with open(os.environ.get("FEDN_GRPC_CERT_PATH"), "rb") as f: + cert = f.read() + # Specific certificate handling for each combiner. + elif os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}"): cert_path = os.environ.get(f"FEDN_GRPC_CERT_PATH_{name}") with open(cert_path, "rb") as f: cert = f.read() From 86da8d3adb258d277404dd3af82f9505807f1640 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Wed, 10 Jul 2024 09:33:23 +0000 Subject: [PATCH 03/15] remove comments --- fedn/network/combiner/combiner.py | 34 ------------------------------- 1 file changed, 34 deletions(-) diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index af86b4a2e..37c4c3264 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -93,45 +93,11 @@ def __init__(self, config): self.role = Role.COMBINER self.max_clients = config["max_clients"] - # Connector to announce combiner to discover service (reducer) - # announce_client = ConnectorCombiner( - # host=config["discover_host"], - # port=config["discover_port"], - # myhost=config["host"], - # fqdn=config["fqdn"], - # myport=config["port"], - # token=config["token"], - # name=config["name"], - # secure=config["secure"], - # verify=config["verify"], - # ) - - # while True: - # # Announce combiner to discover service - # status, response = announce_client.announce() - # if status == Status.TryAgain: - # logger.info(response) - # time.sleep(5) - # elif status == Status.Assigned: - # announce_config = response - # logger.info("COMBINER {0}: Announced successfully".format(self.id)) - # break - # elif status == Status.UnAuthorized: - # logger.info(response) - # logger.info("Status.UnAuthorized") - # sys.exit("Exiting: Unauthorized") - # elif status == Status.UnMatchedConfig: - # logger.info(response) - # logger.info("Status.UnMatchedConfig") - # sys.exit("Exiting: Missing config") - # Set up model repository self.repository = repository self.statestore = statestore - from fedn.network.combiner.interfaces import CombinerInterface - # Add combiner to statestore interface_config = { "port": config["port"], From 6467e4f37b04a88aa9281da14b8ec81e477fa54f Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Wed, 10 Jul 2024 09:36:01 +0000 Subject: [PATCH 04/15] unpin torch --- examples/mnist-pytorch/client/python_env.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index afdea926f..f43d2353d 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -4,6 +4,6 @@ build_dependencies: - setuptools - wheel dependencies: - - torch==2.3.1 - - torchvision==0.18.1 + - torch + - torchvision - fedn From cd84fdde298989446e8c3d47e54f3d57e1fa47cd Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Wed, 10 Jul 2024 09:36:30 +0000 Subject: [PATCH 05/15] add configs for native servers --- config/settings-client.yaml.local.template | 3 ++ config/settings-combiner.yaml.local.template | 31 +++++++++++++++++++ .../settings-controller.yaml.local.template | 24 ++++++++++++++ 3 files changed, 58 insertions(+) create mode 100644 config/settings-client.yaml.local.template create mode 100644 config/settings-combiner.yaml.local.template create mode 100644 config/settings-controller.yaml.local.template diff --git a/config/settings-client.yaml.local.template b/config/settings-client.yaml.local.template new file mode 100644 index 000000000..e48e779af --- /dev/null +++ b/config/settings-client.yaml.local.template @@ -0,0 +1,3 @@ +network_id: fedn-network +discover_host: localhost +discover_port: 8092 diff --git a/config/settings-combiner.yaml.local.template b/config/settings-combiner.yaml.local.template new file mode 100644 index 000000000..b49917389 --- /dev/null +++ b/config/settings-combiner.yaml.local.template @@ -0,0 +1,31 @@ +network_id: fedn-network + +name: combiner +host: localhost +address: localhost +port: 12080 +max_clients: 30 + +cert_path: tmp/server.crt +key_path: tmp/server.key + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + + diff --git a/config/settings-controller.yaml.local.template b/config/settings-controller.yaml.local.template new file mode 100644 index 000000000..a5266a38b --- /dev/null +++ b/config/settings-controller.yaml.local.template @@ -0,0 +1,24 @@ +network_id: fedn-network +controller: + host: localhost + port: 8092 + debug: True + +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: localhost + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: localhost + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False From c410a815de03fedfa17a471a04d74becff1c07a0 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 12 Jul 2024 11:17:21 +0000 Subject: [PATCH 06/15] unused imports --- fedn/network/clients/client.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index fe1c7ef90..6ea154605 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -1,4 +1,3 @@ -import base64 import io import json import os @@ -11,7 +10,6 @@ import uuid from datetime import datetime from io import BytesIO -from shutil import copytree import grpc import requests @@ -28,7 +26,6 @@ from fedn.network.clients.package import PackageRuntime from fedn.network.clients.state import ClientState, ClientStateToString from fedn.network.combiner.modelservice import get_tmp_path, upload_request_generator -from fedn.utils.dispatcher import Dispatcher from fedn.utils.helpers.helpers import get_helper CHUNK_SIZE = 1024 * 1024 From aa559375a5372f22015d8d4f8336796d08b9ce12 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 12 Jul 2024 11:47:01 +0000 Subject: [PATCH 07/15] add combiner config env var --- docker-compose.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index c3620e79d..26291748f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -87,6 +87,8 @@ services: environment: - PYTHONUNBUFFERED=0 - GET_HOSTS_FROM=dns + - STATESTORE_CONFIG=/app/config/settings-combiner.yaml + - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml build: context: . args: From e4085724597c096810d790bbfe9225fb810ce7f3 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 12 Jul 2024 11:55:04 +0000 Subject: [PATCH 08/15] add statestore and storage config to combiner --- config/settings-combiner.yaml.template | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/config/settings-combiner.yaml.template b/config/settings-combiner.yaml.template index 8cef6643a..11911cc6f 100644 --- a/config/settings-combiner.yaml.template +++ b/config/settings-combiner.yaml.template @@ -7,4 +7,23 @@ host: combiner port: 12080 max_clients: 30 +statestore: + type: MongoDB + mongo_config: + username: fedn_admin + password: password + host: mongo + port: 6534 + +storage: + storage_type: S3 + storage_config: + storage_hostname: minio + storage_port: 9000 + storage_access_key: fedn_admin + storage_secret_key: password + storage_bucket: fedn-models + context_bucket: fedn-context + storage_secure_mode: False + From e95e2485b4e9207119752ed9e7b0e94baa126912 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Fri, 12 Jul 2024 13:26:30 +0000 Subject: [PATCH 09/15] add manifest --- MANIFEST.in | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 MANIFEST.in diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..b1504311c --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include README.rst +include fedn/common/settings-controller.yaml.template \ No newline at end of file From 61ce17eecca8ada909f918e5b63f97dc2b18bbfb Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 16 Jul 2024 12:33:15 +0000 Subject: [PATCH 10/15] floating import --- fedn/cli/combiner_cmd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index 14d8c2d99..3e7753e80 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -3,8 +3,6 @@ import click import requests -from fedn.network.combiner.combiner import Combiner - from .main import main from .shared import CONTROLLER_DEFAULTS, apply_config, get_api_url, get_token, print_response @@ -59,6 +57,8 @@ def start_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, se click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() From c77c6d4eac875a95f21cc3068490bc69fd3751e2 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 16 Jul 2024 12:37:55 +0000 Subject: [PATCH 11/15] floating import --- fedn/cli/run_cmd.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 0aa069046..705c74511 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -4,10 +4,10 @@ import click import yaml + from fedn.common.exceptions import InvalidClientConfig from fedn.common.log_config import logger from fedn.network.clients.client import Client -from fedn.network.combiner.combiner import Combiner from fedn.utils.dispatcher import Dispatcher, _read_yaml_file from .client_cmd import validate_client_config @@ -40,15 +40,16 @@ def check_helper_config_file(config): @main.group("run") @click.pass_context def run_cmd(ctx): - """:param ctx: - """ + """:param ctx:""" pass + + @run_cmd.command("validate") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model" ) -@click.option("-o", "--output", required=True,help="Path to write the output JSON containing validation metrics") +@click.option("-i", "--input", required=True, help="Path to input model") +@click.option("-o", "--output", required=True, help="Path to write the output JSON containing validation metrics") @click.pass_context -def validate_cmd(ctx, path,input,output): +def validate_cmd(ctx, path, input, output): """Execute 'validate' entrypoint in fedn.yaml. :param ctx: @@ -75,12 +76,14 @@ def validate_cmd(ctx, path,input,output): if dispatcher.python_env_path: logger.info(f"Removing virtualenv {dispatcher.python_env_path}") shutil.rmtree(dispatcher.python_env_path) + + @run_cmd.command("train") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") -@click.option("-i", "--input", required=True, help="Path to input model parameters" ) -@click.option("-o", "--output", required=True,help="Path to write the updated model parameters ") +@click.option("-i", "--input", required=True, help="Path to input model parameters") +@click.option("-o", "--output", required=True, help="Path to write the updated model parameters ") @click.pass_context -def train_cmd(ctx, path,input,output): +def train_cmd(ctx, path, input, output): """Execute 'train' entrypoint in fedn.yaml. :param ctx: @@ -107,6 +110,8 @@ def train_cmd(ctx, path,input,output): if dispatcher.python_env_path: logger.info(f"Removing virtualenv {dispatcher.python_env_path}") shutil.rmtree(dispatcher.python_env_path) + + @run_cmd.command("startup") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") @click.pass_context @@ -138,6 +143,7 @@ def startup_cmd(ctx, path): logger.info(f"Removing virtualenv {dispatcher.python_env_path}") shutil.rmtree(dispatcher.python_env_path) + @run_cmd.command("build") @click.option("-p", "--path", required=True, help="Path to package directory containing fedn.yaml") @click.pass_context @@ -320,5 +326,7 @@ def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, click.echo(f"\nCombiner configuration loaded from file: {init}") click.echo("Values set in file override defaults and command line arguments...\n") + from fedn.network.combiner.combiner import Combiner + combiner = Combiner(config) combiner.run() From 3d91b5d02f5fd0700d92ec2aa6a004a219272541 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 16 Jul 2024 12:52:45 +0000 Subject: [PATCH 12/15] exclude run and combiner cmd --- .github/workflows/code-checks.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index 8c48a3015..7098b0732 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -26,6 +26,8 @@ jobs: --exclude-dir='flower-client' --exclude='tests.py' --exclude='controller_cmd.py' + --exclude='combiner_cmd.py' + --exclude='run_cmd.py' --exclude='README.rst' '^[ \t]+(import|from) ' -I . From 7060069b0d44cf125a70a65be863c584aa9b36b5 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 3 Sep 2024 15:07:17 +0000 Subject: [PATCH 13/15] allow local package --- fedn/common/config.py | 2 ++ fedn/network/api/interface.py | 30 +++++++++++++++++++++--------- fedn/network/clients/connect.py | 2 +- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/fedn/common/config.py b/fedn/common/config.py index 23d873ff7..517e57d94 100644 --- a/fedn/common/config.py +++ b/fedn/common/config.py @@ -15,6 +15,8 @@ FEDN_AUTH_REFRESH_TOKEN = os.environ.get("FEDN_AUTH_REFRESH_TOKEN", False) FEDN_CUSTOM_URL_PREFIX = os.environ.get("FEDN_CUSTOM_URL_PREFIX", "") + +FEDN_ALLOW_LOCAL_PACKAGE = os.environ.get("FEDN_ALLOW_LOCAL_PACKAGE", False) FEDN_PACKAGE_EXTRACT_DIR = os.environ.get("FEDN_PACKAGE_EXTRACT_DIR", "package") diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index f1815f39f..d81af66bf 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -7,7 +7,7 @@ from werkzeug.security import safe_join from werkzeug.utils import secure_filename -from fedn.common.config import get_controller_config, get_network_config +from fedn.common.config import FEDN_ALLOW_LOCAL_PACKAGE, get_controller_config, get_network_config from fedn.common.log_config import logger from fedn.network.combiner.interfaces import CombinerUnavailableError from fedn.network.state import ReducerState, ReducerStateToString @@ -512,7 +512,7 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por return jsonify(payload) - def add_client(self, client_id, preferred_combiner, remote_addr, name): + def add_client(self, client_id, preferred_combiner, remote_addr, name, package="remote"): """Add a client to the network. :param client_id: The client id to add. @@ -522,19 +522,31 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): :return: A json response with combiner assignment config. :rtype: :class:`flask.Response` """ - # Check if package has been set - package_object = self.statestore.get_compute_package() - if package_object is None: + if package == "remote": + package_object = self.statestore.get_compute_package() + if package_object is None: + return ( + jsonify( + { + "success": False, + "status": "retry", + "message": "No compute package found. Set package in controller.", + } + ), + 203, + ) + elif package == "local" and FEDN_ALLOW_LOCAL_PACKAGE is False: return ( jsonify( { "success": False, - "status": "retry", - "message": "No compute package found. Set package in controller.", + "message": "Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.", } ), - 203, + 400, ) + elif package == "local" and FEDN_ALLOW_LOCAL_PACKAGE: + pass # Assign client to combiner if preferred_combiner: @@ -572,7 +584,7 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name): "status": "assigned", "host": combiner.address, "fqdn": combiner.fqdn, - "package": "remote", # TODO: Make this configurable + "package": package, "ip": combiner.ip, "port": combiner.port, "helper_type": self.control.statestore.get_helper(), diff --git a/fedn/network/clients/connect.py b/fedn/network/clients/connect.py index 1a24a5c53..bd7262936 100644 --- a/fedn/network/clients/connect.py +++ b/fedn/network/clients/connect.py @@ -74,7 +74,7 @@ def assign(self): """ try: retval = None - payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner} + payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner, "package": self.package} retval = requests.post( self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client", json=payload, From bb1ac8947f9bf23cd3306d20cea0d786ae3d09e9 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 3 Sep 2024 15:51:41 +0000 Subject: [PATCH 14/15] fix --- fedn/network/api/interface.py | 16 +++++++++++----- fedn/network/api/server.py | 6 ++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index d81af66bf..3ffaadecf 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -512,7 +512,7 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por return jsonify(payload) - def add_client(self, client_id, preferred_combiner, remote_addr, name, package="remote"): + def add_client(self, client_id, preferred_combiner, remote_addr, name, package): """Add a client to the network. :param client_id: The client id to add. @@ -522,6 +522,10 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name, package=" :return: A json response with combiner assignment config. :rtype: :class:`flask.Response` """ + local_package = FEDN_ALLOW_LOCAL_PACKAGE + if local_package: + local_package = True + if package == "remote": package_object = self.statestore.get_compute_package() if package_object is None: @@ -535,7 +539,9 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name, package=" ), 203, ) - elif package == "local" and FEDN_ALLOW_LOCAL_PACKAGE is False: + helper_type = self.control.statestore.get_helper() + elif package == "local" and local_package is False: + print("Local package not allowed. Set FEDN_ALLOW_LOCAL_PACKAGE=True in controller config.") return ( jsonify( { @@ -545,8 +551,8 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name, package=" ), 400, ) - elif package == "local" and FEDN_ALLOW_LOCAL_PACKAGE: - pass + elif package == "local" and local_package is True: + helper_type = "" # Assign client to combiner if preferred_combiner: @@ -587,7 +593,7 @@ def add_client(self, client_id, preferred_combiner, remote_addr, name, package=" "package": package, "ip": combiner.ip, "port": combiner.port, - "helper_type": self.control.statestore.get_helper(), + "helper_type": helper_type, } return jsonify(payload) diff --git a/fedn/network/api/server.py b/fedn/network/api/server.py index d56c3ab0b..d2b3c82b8 100644 --- a/fedn/network/api/server.py +++ b/fedn/network/api/server.py @@ -591,9 +591,11 @@ def add_client(): remote_addr = request.remote_addr try: response = api.add_client(**json_data, remote_addr=remote_addr) - except TypeError: + except TypeError as e: + print(e) return jsonify({"success": False, "message": "Invalid data provided"}), 400 - except Exception: + except Exception as e: + print(e) return jsonify({"success": False, "message": "An unexpected error occurred"}), 500 return response From ad5584c0d1883c8ecf6fe59478728c4a854f05a7 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Wed, 4 Sep 2024 14:14:54 +0000 Subject: [PATCH 15/15] fix --- fedn/network/clients/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index 6ea154605..8508291ff 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -248,7 +248,11 @@ def _initialize_helper(self, combiner_config): :return: """ if "helper_type" in combiner_config.keys(): - self.helper = get_helper(combiner_config["helper_type"]) + if not combiner_config["helper_type"]: + # Default to numpyhelper + self.helper = get_helper("numpyhelper") + else: + self.helper = get_helper(combiner_config["helper_type"]) def _subscribe_to_combiner(self, config): """Listen to combiner message stream and start all processing threads.