Skip to content

Commit

Permalink
Feature/SK-751 | Clients should have unique IDs (#638)
Browse files Browse the repository at this point in the history
  • Loading branch information
niklastheman authored Jun 25, 2024
1 parent 27a0bf2 commit ab50645
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 199 deletions.
2 changes: 1 addition & 1 deletion fedn/genprot.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
echo "Generating protocol"
python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. fedn/network/grpc/*.proto
python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. network/grpc/*.proto
echo "DONE"
5 changes: 3 additions & 2 deletions fedn/network/api/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def add_combiner(self, combiner_id, secure_grpc, address, remote_addr, fqdn, por

return jsonify(payload)

def add_client(self, client_id, preferred_combiner, remote_addr):
def add_client(self, client_id, preferred_combiner, remote_addr, name):
"""Add a client to the network.
:param client_id: The client id to add.
Expand Down Expand Up @@ -599,7 +599,8 @@ def add_client(self, client_id, preferred_combiner, remote_addr):
)

client_config = {
"name": client_id,
"client_id": client_id,
"name": name,
"combiner_preferred": preferred_combiner,
"combiner": combiner.name,
"ip": remote_addr,
Expand Down
4 changes: 2 additions & 2 deletions fedn/network/api/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ def add_client(self, client):
:type client: dict
:return: None
"""
if self.get_client(client["name"]):
if self.get_client(client["client_id"]):
return

logger.info("adding client {}".format(client["name"]))
logger.info("adding client {}".format(client["client_id"]))
self.statestore.set_client(client)

def get_client(self, name):
Expand Down
7 changes: 5 additions & 2 deletions fedn/network/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def __init__(self, config):
set_log_level_from_string(config.get("verbosity", "INFO"))
set_log_stream(config.get("logfile", None))

self.id = config["client_id"] or str(uuid.uuid4())

self.connector = ConnectorClient(
host=config["discover_host"],
port=config["discover_port"],
Expand All @@ -72,7 +74,7 @@ def __init__(self, config):
force_ssl=config["force_ssl"],
verify=config["verify"],
combiner=config["preferred_combiner"],
id=config["client_id"],
id=self.id,
)

# Validate client name
Expand Down Expand Up @@ -420,6 +422,7 @@ def _listen_to_task_stream(self):
r = fedn.ClientAvailableMessage()
r.sender.name = self.name
r.sender.role = fedn.WORKER
r.sender.client_id = self.id
# Add client to metadata
self._add_grpc_metadata("client", self.name)

Expand Down Expand Up @@ -762,7 +765,7 @@ def _send_heartbeat(self, update_frequency=2.0):
:rtype: None
"""
while True:
heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER))
heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER, client_id=self.id))
try:
self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata)
self._missed_heartbeat = 0
Expand Down
2 changes: 1 addition & 1 deletion fedn/network/clients/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def assign(self):
"""
try:
retval = None
payload = {"client_id": self.name, "preferred_combiner": self.preferred_combiner}
payload = {"name": self.name, "client_id": self.id, "preferred_combiner": self.preferred_combiner}
retval = requests.post(
self.connect_string + FEDN_CUSTOM_URL_PREFIX + "/add_client",
json=payload,
Expand Down
24 changes: 12 additions & 12 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, config):
# Set the status to offline for previous clients.
previous_clients = self.statestore.clients.find({"combiner": config["name"]})
for client in previous_clients:
self.statestore.set_client({"name": client["name"], "status": "offline"})
self.statestore.set_client({"name": client["name"], "status": "offline", "client_id": client["client_id"]})

self.modelservice = ModelService()

Expand Down Expand Up @@ -244,7 +244,7 @@ def _send_request_type(self, request_type, session_id, model_id, config=None, cl

request.sender.name = self.id
request.sender.role = fedn.COMBINER
request.receiver.name = client
request.receiver.client_id = client
request.receiver.role = fedn.WORKER
# Set the request data, not used in validation
if request_type == fedn.StatusType.INFERENCE:
Expand Down Expand Up @@ -290,9 +290,9 @@ def __join_client(self, client):
:param client: the client to add
:type client: :class:`fedn.network.grpc.fedn_pb2.Client`
"""
if client.name not in self.clients.keys():
if client.client_id not in self.clients.keys():
# The status is set to offline by default, and will be updated once _list_active_clients is called.
self.clients[client.name] = {"lastseen": datetime.now(), "status": "offline"}
self.clients[client.client_id] = {"last_seen": datetime.now(), "status": "offline"}

def _subscribe_client_to_queue(self, client, queue_name):
"""Subscribe a client to the queue.
Expand All @@ -303,8 +303,8 @@ def _subscribe_client_to_queue(self, client, queue_name):
:type queue_name: str
"""
self.__join_client(client)
if queue_name not in self.clients[client.name].keys():
self.clients[client.name][queue_name] = queue.Queue()
if queue_name not in self.clients[client.client_id].keys():
self.clients[client.client_id][queue_name] = queue.Queue()

def __get_queue(self, client, queue_name):
"""Get the queue for a client.
Expand All @@ -319,7 +319,7 @@ def __get_queue(self, client, queue_name):
:raises KeyError: if the queue does not exist
"""
try:
return self.clients[client.name][queue_name]
return self.clients[client.client_id][queue_name]
except KeyError:
raise

Expand Down Expand Up @@ -354,7 +354,7 @@ def _list_active_clients(self, channel):
for client in self._list_subscribed_clients(channel):
status = self.clients[client]["status"]
now = datetime.now()
then = self.clients[client]["lastseen"]
then = self.clients[client]["last_seen"]
if (now - then) < timedelta(seconds=10):
clients["active_clients"].append(client)
# If client has changed status, update statestore
Expand Down Expand Up @@ -600,7 +600,7 @@ def SendHeartbeat(self, heartbeat: fedn.Heartbeat, context):
# Update the clients dict with the last seen timestamp.
client = heartbeat.sender
self.__join_client(client)
self.clients[client.name]["lastseen"] = datetime.now()
self.clients[client.client_id]["last_seen"] = datetime.now()

response = fedn.Response()
response.sender.name = heartbeat.sender.name
Expand Down Expand Up @@ -636,15 +636,15 @@ def TaskStream(self, response, context):
self._send_status(status)

# Set client status to online
self.clients[client.name]["status"] = "online"
self.statestore.set_client({"name": client.name, "status": "online"})
self.clients[client.client_id]["status"] = "online"
self.statestore.set_client({"name": client.name, "status": "online", "client_id": client.client_id, "last_seen": datetime.now()})

# Keep track of the time context has been active
start_time = time.time()
while context.is_active():
# Check if the context has been active for more than 10 seconds
if time.time() - start_time > 10:
self.clients[client.name]["lastseen"] = datetime.now()
self.clients[client.client_id]["last_seen"] = datetime.now()
# Reset the start time
start_time = time.time()
try:
Expand Down
1 change: 1 addition & 0 deletions fedn/network/grpc/fedn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ enum Role {
message Client {
Role role = 1;
string name = 2;
string client_id = 3;
}

message ReassignRequest {
Expand Down
Loading

0 comments on commit ab50645

Please sign in to comment.