diff --git a/nebula/addons/__init__.py b/nebula/addons/__init__.py index e69de29b..48244e83 100755 --- a/nebula/addons/__init__.py +++ b/nebula/addons/__init__.py @@ -0,0 +1,25 @@ +""" +This package consists of several modules that handle different aspects of the network simulation: + +1. `env.py`: + - Manages the environment configuration and settings. + - It initializes the system environment, loads configuration parameters, and ensures correct operation of other components based on the simulation's settings. + +2. `functions.py`: + - Contains utility functions that are used across different parts of the simulation. + - It provides helper methods for common operations like data processing, mathematical calculations, and other reusable functionalities. + +3. `mobility.py`: + - Models and simulates the mobility of nodes within the network. + - It handles dynamic aspects of the simulation, such as node movement and position updates, based on mobility models and the simulation's configuration. + +4. `reporter.py`: + - Responsible for collecting and reporting data during the simulation. + - It tracks various system metrics, including node status and network performance, and periodically sends updates to a controller or dashboard for analysis and monitoring. + +5. `topologymanager.py`: + - Manages the topology of the network. + - It handles the creation and maintenance of the network's structure (e.g., nodes and their connections), including generating different types of topologies like ring, random, or fully connected based on simulation parameters. + +Each of these modules plays a critical role in simulating a network environment, enabling real-time tracking, topology management, mobility simulation, and efficient reporting of results. +""" diff --git a/nebula/addons/env.py b/nebula/addons/env.py index 1c51cc36..cf272a18 100755 --- a/nebula/addons/env.py +++ b/nebula/addons/env.py @@ -10,10 +10,29 @@ def check_version(): - # Check version of NEBULA (__version__ is defined in __init__.py) and compare with __version__ in https://raw.githubusercontent.com/CyberDataLab/nebula/main/nebula/__init__.py + """ + Checks the current version of NEBULA and compares it with the latest version available in the repository. + + This function retrieves the latest NEBULA version from the specified GitHub repository and compares + it with the version defined in the local NEBULA package. If the versions do not match, it logs a message + prompting the user to update to the latest version. + + Returns: + None + + Raises: + SystemExit: If the version check fails or an exception occurs during the request. + + Notes: + - The version information is expected to be defined in the `__init__.py` file of the NEBULA package + using the `__version__` variable. + - If the latest version is not the same as the local version, the program will exit after logging + the necessary information. + - An exception during the request will be logged, and the program will also exit. + """ logging.info("Checking NEBULA version...") try: - r = requests.get("https://raw.githubusercontent.com/CyberDataLab/nebula/main/nebula/__init__.py") + r = requests.get("https://raw.githubusercontent.com/CyberDataLab/nebula/main/nebula/__init__.py", timeout=5) if r.status_code == 200: version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', r.text, re.MULTILINE).group(1) if version != __version__: @@ -26,12 +45,30 @@ def check_version(): sys.exit(0) else: logging.info(f"Your NEBULA version is {__version__} and it is the latest version.") - except Exception as e: - logging.exception(f"Error while checking NEBULA version: {e}") + except Exception: + logging.exception("Error while checking NEBULA version") sys.exit(0) def check_environment(): + """ + Logs the current environment configuration for the NEBULA platform. + + This function gathers and logs information about the operating system, hardware, Python version, + PyTorch version (if installed), CPU configuration, and GPU configuration (if applicable). It provides + insights into the system's capabilities and current usage statistics. + + Returns: + None + + Notes: + - The function logs the NEBULA platform version using the `__version__` variable. + - It checks the system's CPU load, available memory, and detailed GPU statistics using the `pynvml` + library if running on Windows or Linux. + - If any of the libraries required for gathering information (like `torch`, `psutil`, or `pynvml`) + are not installed, appropriate log messages will be generated indicating the absence of that information. + - If any unexpected error occurs during execution, it will be logged as an exception. + """ logging.info(f"NEBULA Platform version: {__version__}") # check_version() @@ -46,7 +83,7 @@ def check_environment(): logging.info("PyTorch version: " + torch.__version__) except ImportError: logging.info("PyTorch is not installed properly") - except Exception: + except Exception: # noqa: S110 pass logging.info("======== CPU Configuration ========") @@ -62,7 +99,7 @@ def check_environment(): ) except ImportError: logging.info("No CPU information available") - except Exception: + except Exception: # noqa: S110 pass if sys.platform == "win32" or sys.platform == "linux": @@ -93,7 +130,7 @@ def check_environment(): logging.info(f"GPU{i} fan speed: {gpu_fan_speed}") except ImportError: logging.info("pynvml module not found, GPU information unavailable") - except Exception: + except Exception: # noqa: S110 pass else: logging.info("GPU information unavailable") diff --git a/nebula/addons/functions.py b/nebula/addons/functions.py index d0e43eb3..cb4ff097 100755 --- a/nebula/addons/functions.py +++ b/nebula/addons/functions.py @@ -2,14 +2,36 @@ def print_msg_box(msg, indent=1, width=None, title=None, logger_name=None): - """Print message-box with optional title.""" - if logger_name: - logger = logging.getLogger(logger_name) - else: - logger = logging.getLogger() + """ + Prints a formatted message box to the logger with an optional title. + + This function creates a visually appealing message box format for logging messages. + It allows for indentation, custom width, and inclusion of a title. If the message is + multiline, each line will be included in the box. + + Args: + msg (str): The message to be displayed inside the box. Must be a string. + indent (int, optional): The number of spaces to indent the message box. Default is 1. + width (int, optional): The width of the message box. If not provided, it will be calculated + based on the longest line of the message and the title (if provided). + title (str, optional): An optional title for the message box. Must be a string if provided. + logger_name (str, optional): The name of the logger to use. If not provided, the root logger + will be used. + + Raises: + TypeError: If `msg` or `title` is not a string. + + Returns: + None + + Notes: + - The message box is bordered with decorative characters to enhance visibility in the logs. + - If the `width` is not provided, it will automatically adjust to fit the content. + """ + logger = logging.getLogger(logger_name) if logger_name else logging.getLogger() if not isinstance(msg, str): - raise TypeError("msg parameter must be a string") + raise TypeError("msg parameter must be a string") # noqa: TRY003 lines = msg.split("\n") space = " " * indent @@ -20,7 +42,7 @@ def print_msg_box(msg, indent=1, width=None, title=None, logger_name=None): box = f"\n╔{'═' * (width + indent * 2)}╗\n" # upper_border if title: if not isinstance(title, str): - raise TypeError("title parameter must be a string") + raise TypeError("title parameter must be a string") # noqa: TRY003 box += f"║{space}{title:<{width}}{space}║\n" # title box += f"║{space}{'-' * len(title):<{width}}{space}║\n" # underscore box += "".join([f"║{space}{line:<{width}}{space}║\n" for line in lines]) diff --git a/nebula/addons/mobility.py b/nebula/addons/mobility.py index 89dbb2dd..de3d922e 100755 --- a/nebula/addons/mobility.py +++ b/nebula/addons/mobility.py @@ -13,6 +13,40 @@ class Mobility: def __init__(self, config, cm: "CommunicationsManager"): + """ + Initializes the mobility module with specified configuration and communication manager. + + This method sets up the mobility parameters required for the module, including grace time, + geographical change interval, mobility type, and other network conditions based on distance. + It also logs the initialized settings for the mobility system. + + Args: + config (Config): Configuration object containing mobility parameters and settings. + cm (CommunicationsManager): An instance of the CommunicationsManager class used for handling + communication-related tasks within the mobility module. + + Attributes: + grace_time (float): Time allocated for mobility processes to stabilize. + period (float): Interval at which geographic changes are made. + mobility (bool): Flag indicating whether mobility is enabled. + mobility_type (str): Type of mobility strategy to be used (e.g., random, nearest). + radius_federation (float): Radius for federation in meters. + scheme_mobility (str): Scheme to be used for managing mobility. + round_frequency (int): Number of rounds after which mobility changes are applied. + max_distance_with_direct_connections (float): Maximum distance for direct connections in meters. + max_movement_random_strategy (float): Maximum movement distance for the random strategy in meters. + max_movement_nearest_strategy (float): Maximum movement distance for the nearest strategy in meters. + max_initiate_approximation (float): Maximum distance for initiating approximation calculations. + network_conditions (dict): A dictionary containing network conditions (bandwidth and delay) + based on distance. + current_network_conditions (dict): A dictionary mapping addresses to their current network conditions. + + Logs: + Mobility information upon initialization to provide insights into the current setup. + + Raises: + KeyError: If the expected mobility configuration keys are not found in the provided config. + """ logging.info("Starting mobility module...") self.config = config self.cm = cm @@ -43,12 +77,59 @@ def __init__(self, config, cm: "CommunicationsManager"): @property def round(self): + """ + Gets the current round number from the Communications Manager. + + This property retrieves the current round number that is being managed by the + CommunicationsManager instance associated with this module. It provides an + interface to access the ongoing round of the communication process without + directly exposing the underlying method in the CommunicationsManager. + + Returns: + int: The current round number managed by the CommunicationsManager. + """ return self.cm.get_round() async def start(self): - asyncio.create_task(self.run_mobility()) + """ + Initiates the mobility process by starting the associated task. + + This method creates and schedules an asynchronous task to run the + `run_mobility` coroutine, which handles the mobility operations + for the module. It allows the mobility operations to run concurrently + without blocking the execution of other tasks. + + Returns: + asyncio.Task: An asyncio Task object representing the scheduled + `run_mobility` operation. + """ + task = asyncio.create_task(self.run_mobility()) + return task async def run_mobility(self): + """ + Executes the mobility operations in a continuous loop. + + This coroutine manages the mobility behavior of the module. It first + checks whether mobility is enabled. If mobility is not enabled, the + function returns immediately. + + If mobility is enabled, the function will wait for the specified + grace time before entering an infinite loop where it performs the + following operations: + + 1. Changes the geographical location by calling the `change_geo_location` method. + 2. Adjusts connections based on the current distance by calling + the `change_connections_based_on_distance` method. + 3. Sleeps for a specified period (`self.period`) before repeating the operations. + + This allows for periodic updates to the module's geographical location + and network connections as per the defined mobility strategy. + + Raises: + Exception: May raise exceptions if `change_geo_location` or + `change_connections_based_on_distance` encounters errors. + """ if not self.mobility: return await asyncio.sleep(self.grace_time) @@ -58,11 +139,31 @@ async def run_mobility(self): await asyncio.sleep(self.period) async def change_geo_location_random_strategy(self, latitude, longitude): + """ + Changes the geographical location of the entity using a random strategy. + + This coroutine modifies the current geographical location by randomly + selecting a new position within a specified radius around the given + latitude and longitude. The new location is determined using polar + coordinates, where a random distance (radius) and angle are calculated. + + Args: + latitude (float): The current latitude of the entity. + longitude (float): The current longitude of the entity. + + Raises: + Exception: May raise exceptions if the `set_geo_location` method encounters errors. + + Notes: + - The maximum movement distance is determined by `self.max_movement_random_strategy`. + - The calculated radius is converted from meters to degrees based on an approximate + conversion factor (1 degree is approximately 111 kilometers). + """ logging.info("📍 Changing geo location randomly") # radius_in_degrees = self.radius_federation / 111000 max_radius_in_degrees = self.max_movement_random_strategy / 111000 - radius = random.uniform(0, max_radius_in_degrees) - angle = random.uniform(0, 2 * math.pi) + radius = random.uniform(0, max_radius_in_degrees) # noqa: S311 + angle = random.uniform(0, 2 * math.pi) # noqa: S311 latitude += radius * math.cos(angle) longitude += radius * math.sin(angle) await self.set_geo_location(latitude, longitude) @@ -70,6 +171,30 @@ async def change_geo_location_random_strategy(self, latitude, longitude): async def change_geo_location_nearest_neighbor_strategy( self, distance, latitude, longitude, neighbor_latitude, neighbor_longitude ): + """ + Changes the geographical location of the entity towards the nearest neighbor. + + This coroutine updates the current geographical location by calculating the direction + and distance to the nearest neighbor's coordinates. The movement towards the neighbor + is scaled based on the distance and the maximum movement allowed. + + Args: + distance (float): The distance to the nearest neighbor. + latitude (float): The current latitude of the entity. + longitude (float): The current longitude of the entity. + neighbor_latitude (float): The latitude of the nearest neighbor. + neighbor_longitude (float): The longitude of the nearest neighbor. + + Raises: + Exception: May raise exceptions if the `set_geo_location` method encounters errors. + + Notes: + - The movement is scaled based on the maximum allowed distance defined by + `self.max_movement_nearest_strategy`. + - The angle to the neighbor is calculated using the arctangent of the difference in + coordinates to determine the direction of movement. + - The conversion from meters to degrees is based on approximate geographical conversion factors. + """ logging.info("📍 Changing geo location towards the nearest neighbor") scale_factor = min(1, self.max_movement_nearest_strategy / distance) # Calcular el ángulo hacia el vecino @@ -88,6 +213,26 @@ async def change_geo_location_nearest_neighbor_strategy( await self.set_geo_location(new_latitude, new_longitude) async def set_geo_location(self, latitude, longitude): + """ + Sets the geographical location of the entity to the specified latitude and longitude. + + This coroutine updates the latitude and longitude values in the configuration. If the + provided coordinates are out of bounds (latitude must be between -90 and 90, and + longitude must be between -180 and 180), the previous location is retained. + + Args: + latitude (float): The new latitude to set. + longitude (float): The new longitude to set. + + Raises: + None: This function does not raise any exceptions but retains the previous coordinates + if the new ones are invalid. + + Notes: + - The new location is logged for tracking purposes. + - The coordinates are expected to be in decimal degrees format. + """ + if latitude < -90 or latitude > 90 or longitude < -180 or longitude > 180: # If the new location is out of bounds, we keep the old location latitude = self.config.participant["mobility_args"]["latitude"] @@ -98,6 +243,27 @@ async def set_geo_location(self, latitude, longitude): logging.info(f"📍 New geo location: {latitude}, {longitude}") async def change_geo_location(self): + """ + Changes the geographical location of the entity based on the current mobility strategy. + + This coroutine checks the mobility type and decides whether to move towards the nearest neighbor + or change the geo location randomly. It uses the communications manager to obtain the current + connections and their distances. + + If the number of undirected connections is greater than directed connections, the method will + attempt to find the nearest neighbor and move towards it if the distance exceeds a certain threshold. + Otherwise, it will randomly change the geo location. + + Args: + None: This function does not take any arguments. + + Raises: + Exception: If the neighbor's location or distance cannot be found. + + Notes: + - The method expects the mobility type to be either "topology" or "both". + - It logs actions taken during the execution for tracking and debugging purposes. + """ if self.mobility and (self.mobility_type == "topology" or self.mobility_type == "both"): random.seed(time.time() + self.config.participant["device_args"]["idx"]) latitude = float(self.config.participant["mobility_args"]["latitude"]) @@ -137,6 +303,28 @@ async def change_geo_location(self): return async def change_connections_based_on_distance(self): + """ + Changes the connections of the entity based on the distance to neighboring nodes. + + This coroutine evaluates the current connections in the topology and adjusts their status to + either direct or undirected based on their distance from the entity. If a neighboring node is + within a certain distance, it is marked as a direct connection; otherwise, it is marked as + undirected. + + Additionally, it updates the network conditions for each connection based on the distance, + ensuring that the current state is reflected accurately. + + Args: + None: This function does not take any arguments. + + Raises: + KeyError: If a connection address is not found during the process. + Exception: For any other errors that may occur while changing connections. + + Notes: + - The method expects the mobility type to be either "topology" or "both". + - It logs the distance evaluations and changes made for tracking and debugging purposes. + """ if self.mobility and (self.mobility_type == "topology" or self.mobility_type == "both"): try: # logging.info(f"📍 Checking connections based on distance") @@ -192,15 +380,36 @@ async def change_connections_based_on_distance(self): reordering="0%", ) self.current_network_conditions[addr] = conditions - except KeyError as e: + except KeyError: # Except when self.cm.connections[addr] is not found (disconnected during the process) - logging.exception(f"📍 Connection {addr} not found: {e}") + logging.exception(f"📍 Connection {addr} not found") return - except Exception as e: - logging.exception(f"📍 Error changing connections based on distance: {e}") + except Exception: + logging.exception("📍 Error changing connections based on distance") return async def change_connections(self): + """ + Changes the connections of the entity based on the specified mobility scheme. + + This coroutine evaluates the current and potential connections at specified intervals (based + on the round frequency) and makes adjustments according to the mobility scheme in use. If + the mobility type is appropriate and the current round is a multiple of the round frequency, + it will proceed to change connections. + + Args: + None: This function does not take any arguments. + + Raises: + None: This function does not raise exceptions, but it logs errors related to connection counts + and unsupported mobility schemes. + + Notes: + - The function currently supports a "random" mobility scheme, where it randomly selects + a current connection to disconnect and a potential connection to connect. + - If there are insufficient connections available, an error will be logged. + - All actions and decisions made by the function are logged for tracking purposes. + """ if ( self.mobility and (self.mobility_type == "topology" or self.mobility_type == "both") @@ -217,8 +426,8 @@ async def change_connections(self): return if self.scheme_mobility == "random": - random_neighbor = random.choice(current_connections) - random_potential_neighbor = random.choice(potential_connections) + random_neighbor = random.choice(current_connections) # noqa: S311 + random_potential_neighbor = random.choice(potential_connections) # noqa: S311 logging.info(f"📍 Selected node(s) to disconnect: {random_neighbor}") logging.info(f"📍 Selected node(s) to connect: {random_potential_neighbor}") await self.cm.disconnect(random_neighbor, mutual_disconnection=True) diff --git a/nebula/addons/reporter.py b/nebula/addons/reporter.py index 4c3c22be..95e40c61 100755 --- a/nebula/addons/reporter.py +++ b/nebula/addons/reporter.py @@ -15,6 +15,38 @@ class Reporter: def __init__(self, config, trainer, cm: "CommunicationsManager"): + """ + Initializes the reporter module for sending periodic updates to a dashboard controller. + + This initializer sets up the configuration parameters required to report metrics and statistics + about the network, participant, and trainer. It connects to a specified URL endpoint where + these metrics will be logged, and it initializes values used for tracking network traffic. + + Args: + config (dict): The configuration dictionary containing all setup parameters. + trainer (Trainer): The trainer object responsible for managing training sessions. + cm (CommunicationsManager): The communications manager handling network connections + and interactions. + + Attributes: + frequency (int): The frequency at which the reporter sends updates. + grace_time (int): Grace period before starting the reporting. + data_queue (Queue): An asyncio queue for managing data to be reported. + url (str): The endpoint URL for reporting updates. + counter (int): Counter for tracking the number of reports sent. + first_net_metrics (bool): Flag indicating if this is the first collection of network metrics. + prev_bytes_sent (int), prev_bytes_recv (int), prev_packets_sent (int), prev_packets_recv (int): + Metrics for tracking network data sent and received. + acc_bytes_sent (int), acc_bytes_recv (int), acc_packets_sent (int), acc_packets_recv (int): + Accumulators for network traffic. + + Raises: + None + + Notes: + - Logs the start of the reporter module. + - Initializes both current and accumulated metrics for traffic monitoring. + """ logging.info("Starting reporter module") self.config = config self.trainer = trainer @@ -37,18 +69,65 @@ def __init__(self, config, trainer, cm: "CommunicationsManager"): self.acc_packets_recv = 0 async def enqueue_data(self, name, value): + """ + Asynchronously enqueues data for reporting. + + This function adds a named data value pair to the data queue, which will later be processed + and sent to the designated reporting endpoint. The queue enables handling of reporting tasks + independently of other processes. + + Args: + name (str): The name or identifier for the data item. + value (Any): The value of the data item to be reported. + + Returns: + None + + Notes: + - This function is asynchronous to allow non-blocking data enqueueing. + - Uses asyncio's queue to manage data, ensuring concurrency. + """ await self.data_queue.put((name, value)) async def start(self): + """ + Starts the reporter module after a grace period. + + This asynchronous function initiates the reporting process following a designated grace period. + It creates a background task to run the reporting loop, allowing data to be reported at defined intervals. + + Returns: + asyncio.Task: The task for the reporter loop, which handles the data reporting asynchronously. + + Notes: + - The grace period allows for a delay before the first reporting cycle. + - The reporter loop runs in the background, ensuring continuous data updates. + """ await asyncio.sleep(self.grace_time) - asyncio.create_task(self.run_reporter()) + task = asyncio.create_task(self.run_reporter()) + return task async def run_reporter(self): + """ + Runs the continuous reporting loop. + + This asynchronous function performs periodic reporting tasks such as reporting resource usage, + data queue contents, and, optionally, status updates to the controller. The loop runs indefinitely, + updating the counter with each cycle to track the frequency of specific tasks. + + Key Actions: + - Regularly reports the resource status. + - Reloads the configuration file every 50 cycles to reflect any updates. + + Notes: + - Status reporting to the controller is currently disabled. + - The reporting frequency is determined by the 'report_frequency' setting in the config file. + """ while True: # NOTE: currently disabled - # if self.config.participant["scenario_args"]["controller"] != "nebula-test": - # await self.__report_status_to_controller() - # await self.__report_data_queue() + if self.config.participant["scenario_args"]["controller"] != "nebula-test": + await self.__report_status_to_controller() + await self.__report_data_queue() await self.__report_resources() self.counter += 1 if self.counter % 50 == 0: @@ -57,6 +136,28 @@ async def run_reporter(self): await asyncio.sleep(self.frequency) async def report_scenario_finished(self): + """ + Reports the scenario completion status to the controller. + + This asynchronous function notifies the scenario controller that the participant has finished + its tasks. It sends a POST request to the designated controller URL, including the participant's + ID in the JSON payload. + + URL Construction: + - The URL is dynamically built using the controller address and scenario name + from the configuration settings. + + Parameters: + - idx (int): The unique identifier for this participant, sent in the request data. + + Returns: + - bool: True if the report was successful (status 200), False otherwise. + + Error Handling: + - Logs an error if the response status is not 200, indicating that the controller + might be temporarily overloaded. + - Logs exceptions if the connection attempt to the controller fails. + """ url = f"http://{self.config.participant['scenario_args']['controller']}/nebula/dashboard/{self.config.participant['scenario_args']['name']}/node/done" data = json.dumps({"idx": self.config.participant["device_args"]["idx"]}) headers = { @@ -64,60 +165,123 @@ async def report_scenario_finished(self): "User-Agent": f"NEBULA Participant {self.config.participant['device_args']['idx']}", } try: - async with aiohttp.ClientSession() as session: - async with session.post(url, data=data, headers=headers) as response: - if response.status != 200: - logging.error( - f"Error received from controller: {response.status} (probably there is overhead in the controller, trying again in the next round)" - ) - text = await response.text() - logging.debug(text) - else: - logging.info( - f"Participant {self.config.participant['device_args']['idx']} reported scenario finished" - ) - return True - except aiohttp.ClientError as e: - logging.exception(f"Error connecting to the controller at {url}: {e}") + async with aiohttp.ClientSession() as session, session.post(url, data=data, headers=headers) as response: + if response.status != 200: + logging.error( + f"Error received from controller: {response.status} (probably there is overhead in the controller, trying again in the next round)" + ) + text = await response.text() + logging.debug(text) + else: + logging.info( + f"Participant {self.config.participant['device_args']['idx']} reported scenario finished" + ) + return True + except aiohttp.ClientError: + logging.exception(f"Error connecting to the controller at {url}") return False async def __report_data_queue(self): + """ + Processes and reports queued data entries. + + This asynchronous function iterates over the data queue, retrieving each name-value pair + and sending it to the trainer's logging mechanism. Once logged, each item is marked as done. + + Functionality: + - Retrieves and logs all entries in the data queue until it is empty. + - Assumes that `log_data` can handle asynchronous execution for optimal performance. + + Parameters: + - name (str): The identifier for the data entry (e.g., metric name). + - value (Any): The value of the data entry to be logged. + + Returns: + - None + + Notes: + - Each processed item is marked as done in the queue. + """ + while not self.data_queue.empty(): name, value = await self.data_queue.get() await self.trainer.logger.log_data({name: value}) # Assuming log_data can be made async self.data_queue.task_done() async def __report_status_to_controller(self): + """ + Sends the participant's status to the controller. + + This asynchronous function transmits the current participant configuration to the controller's + URL endpoint. It handles both client and general exceptions to ensure robust communication + with the controller, retrying in case of errors. + + Functionality: + - Initiates a session to post participant data to the controller. + - Logs the response status, indicating issues when status is non-200. + - Retries after a short delay in case of connection errors or unhandled exceptions. + + Parameters: + - None (uses internal `self.config.participant` data to build the payload). + + Returns: + - None + + Notes: + - Uses the participant index to specify the User-Agent in headers. + - Delays for 5 seconds upon general exceptions to avoid rapid retry loops. + """ try: - async with aiohttp.ClientSession() as session: - async with session.post( + async with ( + aiohttp.ClientSession() as session, + session.post( self.url, data=json.dumps(self.config.participant), headers={ "Content-Type": "application/json", "User-Agent": f"NEBULA Participant {self.config.participant['device_args']['idx']}", }, - ) as response: - if response.status != 200: - logging.error( - f"Error received from controller: {response.status} (probably there is overhead in the controller, trying again in the next round)" - ) - text = await response.text() - logging.debug(text) - except aiohttp.ClientError as e: - logging.exception(f"Error connecting to the controller at {self.url}: {e}") - except Exception as e: - logging.exception(f"Error sending status to controller, will try again in a few seconds: {e}") + ) as response, + ): + if response.status != 200: + logging.error( + f"Error received from controller: {response.status} (probably there is overhead in the controller, trying again in the next round)" + ) + text = await response.text() + logging.debug(text) + except aiohttp.ClientError: + logging.exception(f"Error connecting to the controller at {self.url}") + except Exception: + logging.exception("Error sending status to controller, will try again in a few seconds") await asyncio.sleep(5) async def __report_resources(self): + """ + Reports system resource usage metrics. + + This asynchronous function gathers and logs CPU usage data for the participant's device, + and attempts to retrieve the CPU temperature (Linux systems only). Additionally, it measures + CPU usage specifically for the current process. + + Functionality: + - Gathers total CPU usage (percentage) and attempts to retrieve CPU temperature. + - Uses `psutil` for non-blocking access to system data on Linux. + - Records CPU usage of the current process for finer monitoring. + + Parameters: + - None + + Notes: + - On non-Linux platforms, CPU temperature will default to 0. + - Uses `asyncio.to_thread` to call CPU and sensor readings without blocking the event loop. + """ cpu_percent = psutil.cpu_percent() cpu_temp = 0 try: if sys.platform == "linux": sensors = await asyncio.to_thread(psutil.sensors_temperatures) cpu_temp = sensors.get("coretemp")[0].current if sensors.get("coretemp") else 0 - except Exception: + except Exception: # noqa: S110 pass pid = os.getpid() @@ -211,5 +375,5 @@ async def __report_resources(self): f"GPU/GPU{i} fan speed": gpu_fan_speed, } self.trainer.logger.log_data(gpu_info) - except Exception: + except Exception: # noqa: S110 pass diff --git a/nebula/addons/topologymanager.py b/nebula/addons/topologymanager.py index 50cf3322..5100d4a4 100755 --- a/nebula/addons/topologymanager.py +++ b/nebula/addons/topologymanager.py @@ -3,15 +3,14 @@ import matplotlib import matplotlib.pyplot as plt - -matplotlib.use("Agg") -plt.switch_backend("Agg") - import networkx as nx import numpy as np from nebula.core.role import Role +matplotlib.use("Agg") +plt.switch_backend("Agg") + class TopologyManager: def __init__( @@ -22,6 +21,32 @@ def __init__( undirected_neighbor_num=5, topology=None, ): + """ + Initializes a network topology for the scenario. + + This constructor sets up a network topology with a given number of nodes, neighbors, and other parameters. + It includes options to specify whether the topology should be symmetric and the number of undirected neighbors for each node. + It also checks for constraints on the number of neighbors and the structure of the network. + + Parameters: + - scenario_name (str, optional): Name of the scenario. + - n_nodes (int): Number of nodes in the network (default 5). + - b_symmetric (bool): Whether the topology is symmetric (default True). + - undirected_neighbor_num (int): Number of undirected neighbors for each node (default 5). + - topology (list, optional): Predefined topology, a list of nodes and connections (default None). + + Raises: + - ValueError: If `undirected_neighbor_num` is less than 2. + + Attributes: + - scenario_name (str): Name of the scenario. + - n_nodes (int): Number of nodes in the network. + - b_symmetric (bool): Whether the topology is symmetric. + - undirected_neighbor_num (int): Number of undirected neighbors. + - topology (list): Topology of the network. + - nodes (np.ndarray): Array of nodes initialized with zeroes. + - b_fully_connected (bool): Flag indicating if the topology is fully connected. + """ self.scenario_name = scenario_name if topology is None: topology = [] @@ -34,12 +59,25 @@ def __init__( self.b_fully_connected = False if self.undirected_neighbor_num < 2: - raise ValueError("undirected_neighbor_num must be greater than 2") + raise ValueError("undirected_neighbor_num must be greater than 2") # noqa: TRY003 # If the number of neighbors is larger than the number of nodes, then the topology is fully connected if self.undirected_neighbor_num >= self.n_nodes - 1 and self.b_symmetric: self.b_fully_connected = True def __getstate__(self): + """ + Serializes the object state for saving. + + This method defines which attributes of the class should be serialized when the object is pickled (saved to a file). + It returns a dictionary containing the attributes that need to be preserved. + + Returns: + dict: A dictionary containing the relevant attributes of the object for serialization. + - scenario_name (str): Name of the scenario. + - n_nodes (int): Number of nodes in the network. + - topology (list): Topology of the network. + - nodes (np.ndarray): Array of nodes in the network. + """ # Return the attributes of the class that should be serialized return { "scenario_name": self.scenario_name, @@ -49,13 +87,87 @@ def __getstate__(self): } def __setstate__(self, state): + """ + Restores the object state from the serialized data. + + This method is called during deserialization (unpickling) to restore the object's state + by setting the attributes using the provided state dictionary. + + Args: + state (dict): A dictionary containing the serialized data, including: + - scenario_name (str): Name of the scenario. + - n_nodes (int): Number of nodes in the network. + - topology (list): Topology of the network. + - nodes (np.ndarray): Array of nodes in the network. + """ # Set the attributes of the class from the serialized state self.scenario_name = state["scenario_name"] self.n_nodes = state["n_nodes"] self.topology = state["topology"] self.nodes = state["nodes"] + def get_node_color(self, role): + """ + Returns the color associated with a given role. + + The method maps roles to specific colors for visualization or representation purposes. + + Args: + role (Role): The role for which the color is to be determined. + + Returns: + str: The color associated with the given role. Defaults to "red" if the role is not recognized. + """ + role_colors = { + Role.AGGREGATOR: "orange", + Role.SERVER: "green", + Role.TRAINER: "#6182bd", + Role.PROXY: "purple", + } + return role_colors.get(role, "red") + + def add_legend(self, roles): + """ + Adds a legend to the plot for different roles, associating each role with a color. + + The method iterates through the provided roles and assigns the corresponding color to each one. + The colors are predefined in the legend_map, which associates each role with a specific color. + + Args: + roles (iterable): A collection of roles for which the legend should be displayed. + + Returns: + None: The function modifies the plot directly by adding the legend. + """ + legend_map = { + Role.AGGREGATOR: "orange", + Role.SERVER: "green", + Role.TRAINER: "#6182bd", + Role.PROXY: "purple", + Role.IDLE: "red", + } + for role, color in legend_map.items(): + if role in roles: + plt.scatter([], [], c=color, label=role) + plt.legend() + def draw_graph(self, plot=False, path=None): + """ + Draws the network graph based on the topology and saves it as an image. + + This method generates a visualization of the network's topology using NetworkX and Matplotlib. + It assigns colors to the nodes based on their role, draws the network's nodes and edges, + adds labels to the nodes, and includes a legend for clarity. + The resulting plot is saved as an image file. + + Args: + plot (bool, optional): Whether to display the plot. Default is False. + path (str, optional): The file path where the image will be saved. If None, the image is saved + to a default location based on the scenario name. + + Returns: + None: The method saves the plot as an image at the specified path. + """ g = nx.from_numpy_array(self.topology) # pos = nx.layout.spectral_layout(g) # pos = nx.spring_layout(g, pos=pos, iterations=50) @@ -68,42 +180,24 @@ def draw_graph(self, plot=False, path=None): # ax.axis('off') labels = {} color_map = [] - server = False for k in range(self.n_nodes): - if str(self.nodes[k][2]) == Role.AGGREGATOR: - color_map.append("orange") - elif str(self.nodes[k][2]) == Role.SERVER: - server = True - color_map.append("green") - elif str(self.nodes[k][2]) == Role.TRAINER: - color_map.append("#6182bd") - elif str(self.nodes[k][2]) == Role.PROXY: - color_map.append("purple") - else: - color_map.append("red") + role = str(self.nodes[k][2]) + color_map.append(self.get_node_color(role)) labels[k] = f"P{k}\n" + str(self.nodes[k][0]) + ":" + str(self.nodes[k][1]) + # nx.draw_networkx_nodes(g, pos_shadow, node_color='k', alpha=0.5) nx.draw_networkx_nodes(g, pos, node_color=color_map, linewidths=2) nx.draw_networkx_labels(g, pos, labels, font_size=10, font_weight="bold") nx.draw_networkx_edges(g, pos, width=2) # plt.margins(0.0) - roles = [str(i[2]) for i in self.nodes] - if Role.AGGREGATOR in roles: - plt.scatter([], [], c="orange", label="Aggregator") - if Role.SERVER in roles: - plt.scatter([], [], c="green", label="Server") - if Role.TRAINER in roles: - plt.scatter([], [], c="#6182bd", label="Trainer") - if Role.PROXY in roles: - plt.scatter([], [], c="purple", label="Proxy") - if Role.IDLE in roles: - plt.scatter([], [], c="red", label="Idle") + + self.add_legend([str(node[2]) for node in self.nodes]) + # plt.scatter([], [], c="green", label='Central Server') # plt.scatter([], [], c="orange", label='Aggregator') # plt.scatter([], [], c="#6182bd", label='Trainer') # plt.scatter([], [], c="purple", label='Proxy') # plt.scatter([], [], c="red", label='Idle') - plt.legend() # import sys # if path is None: # if not os.path.exists(f"{sys.path[0]}/logs/{self.scenario_name}"): @@ -115,6 +209,19 @@ def draw_graph(self, plot=False, path=None): plt.close() def generate_topology(self): + """ + Generates the network topology based on the configured settings. + + This method generates the network topology for the given scenario. It checks whether the topology + should be fully connected, symmetric, or asymmetric and then generates the network accordingly. + + - If the topology is fully connected, all nodes will be directly connected to each other. + - If the topology is symmetric, neighbors will be chosen symmetrically between nodes. + - If the topology is asymmetric, neighbors will be picked randomly without symmetry. + + Returns: + None: The method modifies the internal topology of the network. + """ if self.b_fully_connected: self.__fully_connected() return @@ -125,18 +232,73 @@ def generate_topology(self): self.__randomly_pick_neighbors_asymmetric() def generate_server_topology(self): + """ + Generates a server topology where the first node (usually the server) is connected to all other nodes. + + This method initializes a topology matrix where the first node (typically the server) is connected to + every other node in the network. The first row and the first column of the matrix are set to 1, representing + connections to and from the server. The diagonal is set to 0 to indicate that no node is connected to itself. + + Returns: + None: The method modifies the internal `self.topology` matrix. + """ self.topology = np.zeros((self.n_nodes, self.n_nodes), dtype=np.float32) self.topology[0, :] = 1 self.topology[:, 0] = 1 np.fill_diagonal(self.topology, 0) def generate_ring_topology(self, increase_convergence=False): + """ + Generates a ring topology for the network. + + In a ring topology, each node is connected to two other nodes in a circular fashion, forming a closed loop. + This method uses a private method to generate the topology, with an optional parameter to control whether + the convergence speed of the network should be increased. + + Args: + increase_convergence (bool): Optional flag to increase the convergence speed in the topology. + Defaults to False. + + Returns: + None: The method modifies the internal `self.topology` matrix to reflect the generated ring topology. + """ self.__ring_topology(increase_convergence=increase_convergence) def generate_custom_topology(self, topology): + """ + Sets the network topology to a custom topology provided by the user. + + This method allows for the manual configuration of the network topology by directly assigning + the `topology` argument to the internal `self.topology` attribute. + + Args: + topology (numpy.ndarray): A 2D array representing the custom network topology. + The array should have dimensions (n_nodes, n_nodes) where `n_nodes` + is the number of nodes in the network. + + Returns: + None: The method modifies the internal `self.topology` to the provided custom topology. + """ self.topology = topology def get_matrix_adjacency_from_neighbors(self, neighbors): + """ + Generates an adjacency matrix from a list of neighbors. + + This method constructs an adjacency matrix for the network based on the provided list of neighbors + for each node. A 1 in the matrix at position (i, j) indicates that node i is a neighbor of node j, + while a 0 indicates no connection. + + Args: + neighbors (list of lists): A list of lists where each sublist contains the indices of the neighbors + for the corresponding node. The length of the outer list should be equal + to the number of nodes in the network (`self.n_nodes`). + + Returns: + numpy.ndarray: A 2D adjacency matrix of shape (n_nodes, n_nodes), where n_nodes is the total number + of nodes in the network. The matrix contains 1s where there is a connection and 0s + where there is no connection. + """ matrix_adjacency = np.zeros((self.n_nodes, self.n_nodes), dtype=np.float32) for i in range(self.n_nodes): for j in range(self.n_nodes): @@ -145,40 +307,118 @@ def get_matrix_adjacency_from_neighbors(self, neighbors): return matrix_adjacency def get_topology(self): + """ + Returns the network topology. + + This method retrieves the current topology of the network. The behavior of the method depends on whether + the network is symmetric or asymmetric. For both cases in this implementation, it simply returns the + `self.topology`. + + Returns: + numpy.ndarray: The current topology of the network as a 2D numpy array. The topology represents the + connectivity between nodes, where a value of 1 indicates a connection and 0 indicates + no connection between the nodes. + """ if self.b_symmetric: return self.topology else: return self.topology def get_nodes(self): + """ + Returns the nodes in the network. + + This method retrieves the current list of nodes in the network. Each node is represented by an array of + three values (such as coordinates or identifiers) in the `self.nodes` attribute. + + Returns: + numpy.ndarray: A 2D numpy array representing the nodes in the network. Each row represents a node, + and the columns may represent different properties (e.g., position, identifier, etc.). + """ return self.nodes @staticmethod def get_coordinates(random_geo=True): + """ + Generates random geographical coordinates within predefined bounds for either Spain or Switzerland. + + The method returns a random geographical coordinate (latitude, longitude). The bounds for random coordinates are + defined for two regions: Spain and Switzerland. The region is chosen randomly, and then the latitude and longitude + are selected within the corresponding bounds. + + Parameters: + random_geo (bool): If set to True, the method generates random coordinates within the predefined bounds + for Spain or Switzerland. If set to False, this method could be modified to return fixed + coordinates. + + Returns: + tuple: A tuple containing the latitude and longitude of the generated point. + """ if random_geo: - if random.randint(0, 1) == 0: - # España - bounds = (36.0, 43.0, -9.0, 3.3) # min_lat, max_lat, min_lon, max_lon - else: - # Suiza - bounds = (45.8, 47.8, 5.9, 10.5) # min_lat, max_lat, min_lon, max_lon + # España min_lat, max_lat, min_lon, max_lon Suiza min_lat, max_lat, min_lon, max_lon + bounds = (36.0, 43.0, -9.0, 3.3) if random.randint(0, 1) == 0 else (45.8, 47.8, 5.9, 10.5) # noqa: S311 min_latitude, max_latitude, min_longitude, max_longitude = bounds - latitude = random.uniform(min_latitude, max_latitude) - longitude = random.uniform(min_longitude, max_longitude) + latitude = random.uniform(min_latitude, max_latitude) # noqa: S311 + longitude = random.uniform(min_longitude, max_longitude) # noqa: S311 return latitude, longitude def add_nodes(self, nodes): + """ + Sets the nodes of the topology. + + This method updates the `nodes` attribute with the given list or array of nodes. + + Parameters: + nodes (array-like): The new set of nodes to be assigned to the topology. It should be in a format compatible + with the existing `nodes` structure, typically an array or list. + + Returns: + None + """ self.nodes = nodes def update_nodes(self, config_participants): + """ + Updates the nodes of the topology based on the provided configuration. + + This method assigns a new set of nodes to the `nodes` attribute, typically based on the configuration of the participants. + + Parameters: + config_participants (array-like): A new set of nodes, usually derived from the participants' configuration, to be assigned to the topology. + + Returns: + None + """ self.nodes = config_participants def get_node(self, node_idx): + """ + Retrieves the node information based on the given index. + + This method returns the details of a specific node from the `nodes` attribute using its index. + + Parameters: + node_idx (int): The index of the node to retrieve from the `nodes` list. + + Returns: + numpy.ndarray: A tuple or array containing the node's information at the given index. + """ return self.nodes[node_idx] def get_neighbors_string(self, node_idx): + """ + Retrieves the neighbors of a given node as a string representation. + + This method checks the `topology` attribute to find the neighbors of the node at the specified index (`node_idx`). It then returns a string that lists the coordinates of each neighbor. + + Parameters: + node_idx (int): The index of the node for which neighbors are to be retrieved. + + Returns: + str: A space-separated string of neighbors' coordinates in the format "latitude:longitude". + """ # logging.info(f"Topology: {self.topology}") # logging.info(f"Nodes: {self.nodes}") neighbors_data = [] @@ -192,6 +432,17 @@ def get_neighbors_string(self, node_idx): return neighbors_data_string def __ring_topology(self, increase_convergence=False): + """ + Generates a ring topology for the nodes. + + This method creates a ring topology for the network using the Watts-Strogatz model. Each node is connected to two neighbors, forming a ring. Optionally, additional random connections are added to increase convergence, making the network more connected. + + Parameters: + increase_convergence (bool): If set to True, random connections will be added between nodes to increase the network's connectivity. + + Returns: + None: The `topology` attribute of the class is updated with the generated ring topology. + """ topology_ring = np.array( nx.to_numpy_matrix(nx.watts_strogatz_graph(self.n_nodes, 2, 0)), dtype=np.float32, @@ -201,15 +452,25 @@ def __ring_topology(self, increase_convergence=False): # Create random links between nodes in topology_ring for i in range(self.n_nodes): for j in range(self.n_nodes): - if topology_ring[i][j] == 0: - if random.random() < 0.1: - topology_ring[i][j] = 1 - topology_ring[j][i] = 1 + if topology_ring[i][j] == 0 and random.random() < 0.1: # noqa: S311 + topology_ring[i][j] = 1 + topology_ring[j][i] = 1 np.fill_diagonal(topology_ring, 0) self.topology = topology_ring def __randomly_pick_neighbors_symmetric(self): + """ + Generates a symmetric random topology by combining a ring topology with additional random links. + + This method first creates a ring topology using the Watts-Strogatz model, where each node is connected to two neighbors. Then, it randomly adds links to each node (up to the specified number of neighbors) to form a symmetric topology. The result is a topology where each node has a fixed number of undirected neighbors, and the connections are symmetric between nodes. + + Parameters: + None + + Returns: + None: The `topology` attribute of the class is updated with the generated symmetric topology. + """ # First generate a ring topology topology_ring = np.array( nx.to_numpy_matrix(nx.watts_strogatz_graph(self.n_nodes, 2, 0)), @@ -238,6 +499,17 @@ def __randomly_pick_neighbors_symmetric(self): self.topology = topology_symmetric def __randomly_pick_neighbors_asymmetric(self): + """ + Generates an asymmetric random topology by combining a ring topology with additional random links and random deletions. + + This method first creates a ring topology using the Watts-Strogatz model, where each node is connected to two neighbors. Then, it randomly adds links to each node to create a topology with a specified number of undirected neighbors. After that, it randomly deletes some of the links to introduce asymmetry. The result is a topology where nodes have a varying number of directed and undirected links, and the structure is asymmetric. + + Parameters: + None + + Returns: + None: The `topology` attribute of the class is updated with the generated asymmetric topology. + """ # randomly add some links for each node (symmetric) k = self.undirected_neighbor_num topology_random_link = np.array( @@ -284,6 +556,17 @@ def __randomly_pick_neighbors_asymmetric(self): self.topology = topology_ring def __fully_connected(self): + """ + Generates a fully connected topology where each node is connected to every other node. + + This method creates a fully connected network by generating a Watts-Strogatz graph with the number of nodes set to `n_nodes` and the number of neighbors set to `n_nodes - 1`. The resulting graph is then converted into a numpy matrix and all missing links (i.e., non-ones in the adjacency matrix) are set to 1 to ensure complete connectivity. The diagonal elements are filled with zeros to avoid self-loops. + + Parameters: + None + + Returns: + None: The `topology` attribute of the class is updated with the generated fully connected topology. + """ topology_fully_connected = np.array( nx.to_numpy_matrix(nx.watts_strogatz_graph(self.n_nodes, self.n_nodes - 1, 0)), dtype=np.float32,