Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-946 | Graceful failing if new container is not present #733

Merged
merged 27 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b8fb56a
wip hooks
viktorvaladi Jun 13, 2024
7449c66
working
viktorvaladi Jul 26, 2024
d5b41a7
Merge remote-tracking branch 'origin/master' into feature/SK-946
viktorvaladi Jul 26, 2024
fdf89fb
rebuild proto
viktorvaladi Jul 26, 2024
1f58028
linter fix
viktorvaladi Jul 26, 2024
7315127
Merge remote-tracking branch 'origin/master' into feature/SK-946
viktorvaladi Aug 28, 2024
76b5949
add custom aggregator and metadata
viktorvaladi Sep 2, 2024
af36e6f
linter fix
viktorvaladi Sep 2, 2024
756e8de
add proper error logging
viktorvaladi Sep 2, 2024
baf11ec
Merge remote-tracking branch 'origin/master' into feature/SK-946
viktorvaladi Sep 2, 2024
dd9646c
Refactor hooks and updatehandler
viktorvaladi Oct 8, 2024
4e85b23
update handler and refactor
viktorvaladi Oct 8, 2024
ed468cf
fix test
viktorvaladi Oct 8, 2024
3b5a144
fix test
viktorvaladi Oct 8, 2024
cfac8a9
cleanup
viktorvaladi Oct 8, 2024
68160c4
cleanup
viktorvaladi Oct 8, 2024
3d82e45
fix linting
viktorvaladi Oct 8, 2024
8e587e4
add server-functions to test
viktorvaladi Oct 8, 2024
0e3a487
cleanup
viktorvaladi Oct 8, 2024
083a92c
update README
viktorvaladi Oct 15, 2024
49b5b9f
bug fix and code improvements
viktorvaladi Oct 16, 2024
556c7c3
remove logs
viktorvaladi Oct 28, 2024
f7f839c
Merge remote-tracking branch 'origin/master' into feature/SK-946
viktorvaladi Oct 29, 2024
bdf5054
generate grpc
viktorvaladi Oct 29, 2024
c1af873
grpc improvements etc
viktorvaladi Oct 30, 2024
2df7920
fail gracefully
viktorvaladi Nov 1, 2024
73711ce
merge master
viktorvaladi Nov 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions fedn/network/combiner/hooks/hook_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ class CombinerHookInterface:

def __init__(self):
"""Initialize CombinerHookInterface client."""
self.hook_service_host = os.getenv("HOOK_SERVICE_HOST", "hook:12081")
self.channel = grpc.insecure_channel(
self.hook_service_host,
options=[
("grpc.keepalive_time_ms", 30000), # 30 seconds ping interval
("grpc.keepalive_timeout_ms", 5000), # 5 seconds timeout for a response
("grpc.keepalive_permit_without_calls", 1), # allow keepalives even with no active calls
("grpc.enable_retries", 1), # automatic retries
("grpc.initial_reconnect_backoff_ms", 1000), # initial delay before retrying
("grpc.max_reconnect_backoff_ms", 5000), # maximum delay before retrying
],
)
self.stub = rpc.FunctionServiceStub(self.channel)
try:
self.hook_service_host = os.getenv("HOOK_SERVICE_HOST", "hook:12081")
self.channel = grpc.insecure_channel(
self.hook_service_host,
options=[
("grpc.keepalive_time_ms", 30000), # 30 seconds ping interval
("grpc.keepalive_timeout_ms", 5000), # 5 seconds timeout for a response
("grpc.keepalive_permit_without_calls", 1), # allow keepalives even with no active calls
("grpc.enable_retries", 1), # automatic retries
("grpc.initial_reconnect_backoff_ms", 1000), # initial delay before retrying
("grpc.max_reconnect_backoff_ms", 5000), # maximum delay before retrying
],
)
self.stub = rpc.FunctionServiceStub(self.channel)
except Exception as e:
logger.warning(f"Failed to initialize connection to hooks container with error {e}")

def provided_functions(self, server_functions: str):
"""Communicates to hook container and asks which functions are available.
Expand All @@ -39,10 +42,14 @@ def provided_functions(self, server_functions: str):
:return: dictionary specifing which functions are implemented.
:rtype: dict
"""
request = fedn.ProvidedFunctionsRequest(function_code=server_functions)
try:
request = fedn.ProvidedFunctionsRequest(function_code=server_functions)

response = self.stub.HandleProvidedFunctions(request)
return response.available_functions
response = self.stub.HandleProvidedFunctions(request)
return response.available_functions
except Exception as e:
logger.warning(f"Was not able to communicate to hooks container due to: {e}")
return {}

def client_settings(self, global_model) -> dict:
"""Communicates to hook container to get a client config.
Expand Down
8 changes: 4 additions & 4 deletions fedn/network/combiner/roundhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def push_round_config(self, round_config: RoundConfig) -> str:
raise
return round_config["_job_id"]

def _training_round(self, config, clients, provided_functions):
def _training_round(self, config: dict, clients: list, provided_functions: dict):
"""Send model update requests to clients and aggregate results.

:param config: The round config object (passed to the client).
Expand All @@ -141,7 +141,7 @@ def _training_round(self, config, clients, provided_functions):
session_id = config["session_id"]
model_id = config["model_id"]

if provided_functions["client_settings"]:
if provided_functions.get("client_settings", False):
global_model_bytes = self.modelservice.temp_model_storage.get(model_id)
client_settings = self.hook_interface.client_settings(global_model_bytes)
config["client_settings"] = client_settings
Expand Down Expand Up @@ -172,7 +172,7 @@ def _training_round(self, config, clients, provided_functions):
parameters = Parameters(dict_parameters)
else:
parameters = None
if provided_functions["aggregate"]:
if provided_functions.get("aggregate", False):
previous_model_bytes = self.modelservice.temp_model_storage.get(model_id)
model, data = self.hook_interface.aggregate(previous_model_bytes, self.update_handler, helper, delete_models=delete_models)
else:
Expand Down Expand Up @@ -326,7 +326,7 @@ def execute_training_round(self, config):

provided_functions = self.hook_interface.provided_functions(self.server_functions)

if provided_functions["client_selection"]:
if provided_functions.get("client_selection", False):
clients = self.hook_interface.client_selection(clients=self.server.get_active_trainers())
else:
clients = self._assign_round_clients(self.server.max_clients)
Expand Down
Loading