diff --git a/tests/parallel/test_parallel.py b/tests/parallel/test_parallel.py index 22c4b40823..d16603f719 100644 --- a/tests/parallel/test_parallel.py +++ b/tests/parallel/test_parallel.py @@ -1,96 +1,120 @@ -""" -The purpose of this test module is to run the test in a suite parallely - -Syntax: - - tests: - -test: - name: Parallel run - module: test_parallel.py - parallel: - - test: - ... - - test: - ... - desc: Running tests parallely - -Requirement parameters - ceph_nodes: The list of node participating in the RHCS environment. - config: The test configuration - parallel: Consist of test which needs to be executed parallely - -Entry Point: - def run(**kwargs): -""" - import importlib +import logging import os +from multiprocessing import Manager from time import sleep from ceph.parallel import parallel from utility.log import Log +from utility.utils import magna_url log = Log(__name__) +LOG_FORMAT = "%(asctime)s (%(name)s) - %(module)s:%(lineno)d - %(funcName)s - [%(levelname)s] - %(message)s" def run(**kwargs): - results = {} - parallel_tests = kwargs["parallel"] - - with parallel() as p: - for test in parallel_tests: - p.spawn(execute, test, kwargs, results) - sleep(1) - - test_rc = 0 - for key, value in results.items(): - log.info(f"{key} test result is {'PASS' if value == 0 else 'FAILED'}") - if value != 0: - test_rc = value - - return test_rc - - -def execute(test, args, results: dict): + # Use a Manager dictionary for shared state across processes + with Manager() as manager: + results = manager.dict() + parallel_tests = kwargs["parallel"] + max_time = kwargs.get("config", {}).get("max_time", None) + wait_till_complete = kwargs.get("config", {}).get("wait_till_complete", True) + cancel_pending = kwargs.get("config", {}).get("cancel_pending", False) + + with parallel( + thread_pool=False, + timeout=max_time, + shutdown_wait=wait_till_complete, + shutdown_cancel_pending=cancel_pending, + ) as p: + for test in parallel_tests: + p.spawn(execute, test, kwargs, results) + sleep(1) + + # Convert results to a regular dictionary to inspect + results = dict(results) + log.info(f"Final test results: {results}") + + test_rc = 0 + for key, value in results.items(): + log.info(f"{key} test result is {'PASS' if value == 0 else 'FAILED'}") + if value != 0: + test_rc = value + + return test_rc + + +def execute(test, args, results): """ - Executes the test under parallel in module named 'Parallel run' parallely. - - It involves the following steps - - Importing of test module based on test - - Running the test module + Executes the test under parallel in module named 'Parallel run' parallely. Args: test: The test module which needs to be executed - cluster: Ceph cluster participating in the test. - config: The key/value pairs passed by the tester. - results: results in dictionary - - Returns: - int: non-zero on failure, zero on pass + args: Arguments passed to the test + results: Shared dictionary to store test results """ - test = test.get("test") - if "clusters" in test: - log.info("Its a multisite setup") - cluster = test["clusters"].get( - "ceph-rgw1", test["clusters"][list(test["clusters"].keys())[0]] - ) - config = cluster["config"] - else: - config = test.get("config", dict()) - config.update(args["config"]) - file_name = test.get("module") - mod_file_name = os.path.splitext(file_name)[0] - test_mod = importlib.import_module(mod_file_name) - testcase_name = test.get("name", "Test Case Unknown") - - rc = test_mod.run( - ceph_cluster=args["ceph_cluster"], - ceph_nodes=args["ceph_nodes"], - config=config, - test_data=args["test_data"], - ceph_cluster_dict=args["ceph_cluster_dict"], - clients=args["clients"], + test_name = test.get("name", "unknown_test") + module_name = os.path.splitext(test.get("module"))[0] + run_dir = args["run_config"]["log_dir"] + url_base = ( + magna_url + run_dir.split("/")[-1] + if "/ceph/cephci-jenkins" in run_dir + else run_dir + ) + log_url = f"{url_base}/{test_name}.log" + log_file = os.path.join(run_dir, f"{test_name}.log") + log.info(f"Log File location for test {test_name} is {log_url}") + # Configure logger for this test + test_logger = Log(module_name) + for handler in logging.getLogger().handlers[:]: + logging.getLogger().removeHandler(handler) + logging.basicConfig( + handlers=[logging.FileHandler(log_file)], level=logging.INFO, format=LOG_FORMAT ) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter(logging.Formatter(test_logger.log_format)) + if not any( + isinstance(h, logging.StreamHandler) for h in test_logger.logger.handlers + ): + test_logger.logger.addHandler(console_handler) + + test_logger.info(f"Starting test: {test_name}") + try: + # Import and execute the test module + file_name = test.get("module") + mod_file_name = os.path.splitext(file_name)[0] + test_mod = importlib.import_module(mod_file_name) + run_config = { + "log_dir": run_dir, + "run_id": args["run_config"]["run_id"], + } + rc = test_mod.run( + ceph_cluster=args.get("ceph_cluster"), + ceph_nodes=args.get("ceph_nodes"), + config=args.get("config"), + parallel=args.get("parallel"), + test_data=args.get("test_data"), + ceph_cluster_dict=args.get("ceph_cluster_dict"), + clients=args.get("clients"), + run_config=run_config, + ) - file_string = f"{testcase_name}" - results.update({file_string: rc}) + results[test_name] = rc + test_logger.info( + f"Test {test_name} completed with result: {'PASS' if rc == 0 else 'FAILED'}" + ) + except KeyError as e: + log.error(f"Missing required argument: {e}") + raise + except Exception as e: + log.error(f"An error occurred while running the test module: {e}") + raise + except Exception as e: + test_logger.error(f"Test {test_name} failed with error: {e}") + results[test_name] = 1 + finally: + # Reset root logger configuration to avoid conflicts + for handler in logging.getLogger().handlers[:]: + logging.getLogger().removeHandler(handler) diff --git a/utility/log.py b/utility/log.py index 902c304ce2..a750df3096 100644 --- a/utility/log.py +++ b/utility/log.py @@ -1,66 +1,55 @@ -# -*- coding: utf-8 -*- -""" -Implements CephCI logging interface. - -In this module, we implement a singleton LOG object that can be used by all components -of CephCI. It supports the below logging methods - - - error - - warning - - info - - debug - -Along with the above, it provides support for pushing events to - - - local file - - logstash server - -Initial Log format will be 'datetime - level - message' -later updating log format with 'datetime - level -filename:line_number - message' -""" -import inspect import logging import logging.handlers import os -from copy import deepcopy -from typing import Any, Dict +from typing import Dict from .config import TestMetaData -LOG_FORMAT = "%(asctime)s (%(name)s) [%(levelname)s] - %(message)s" +LOG_FORMAT = "%(asctime)s (%(name)s) - %(module)s:%(lineno)d - %(funcName)s - [%(levelname)s] - %(message)s" + magna_server = "http://magna002.ceph.redhat.com" magna_url = f"{magna_server}/cephci-jenkins/" -class LoggerInitializationException: +class LoggerInitializationException(Exception): + """Exception raised for logger initialization errors.""" + pass -class Log: +class Log(logging.Logger): """CephCI Logger object to help streamline logging.""" def __init__(self, name=None) -> None: - """Initializes the logging mechanism based on the inputs provided.""" - self._logger = logging.getLogger("cephci") + """ + Initializes the logging mechanism. + Args: + name (str): Logger name (module name or other identifier). + """ + super().__init__(name) logging.basicConfig(format=LOG_FORMAT, level=logging.INFO) + # self._logger = logging.getLogger(name) + self._logger = logging.getLogger("cephci") + # Set logger name if name: - self._logger.name = f"cephci.{name}" + self.name = f"cephci.{name}" - self._log_level = self._logger.getEffectiveLevel() + # Additional attributes + self._log_level = self.getEffectiveLevel() self._log_dir = None - self._log_errors = [] self.log_format = LOG_FORMAT + self._log_errors = [] + self.info = self._logger.info + self.debug = self._logger.debug + self.warning = self._logger.warning + self.error = self._logger.error + self.exception = self._logger.exception @property def rp_logger(self): return self.config.get("rp_logger") - @property - def logger(self) -> logging.Logger: - """Return the logger.""" - return self._logger - @property def log_dir(self) -> str: """Return the absolute path to the logging folder.""" @@ -71,6 +60,11 @@ def log_level(self) -> int: """Return the logging level.""" return self._log_level + @property + def logger(self) -> logging.Logger: + """Return the logger.""" + return self._logger + @property def config(self) -> Dict: """Return the CephCI run configuration.""" @@ -94,113 +88,17 @@ def metadata(self) -> Dict: } ) - def _log(self, level: str, message: Any, *args, **kwargs) -> None: - """ - Log the given message using the provided level along with the metadata. - updating LOG_FORMAT with filename:line_number - message - ex: 2022-11-15 11:37:00,346 - DEBUG - cephci.utility.log.py:227 - Completed log configuration - - *Args: - level (str): Log level - message (Any): The message that needs to be logged - **kwargs: - metadata (dict): Extra information to be appended to logstash - - Returns: - None. - """ - log = { - "info": self._logger.info, - "debug": self._logger.debug, - "warning": self._logger.warning, - "error": self._logger.error, - "exception": self._logger.exception, - } - extra = deepcopy(self.metadata) - extra.update(kwargs.get("metadata", {})) - calling_frame = inspect.stack()[2].frame - trace = inspect.getframeinfo(calling_frame) - file_path = trace.filename.split("/") - files = file_path if len(file_path) == 1 else file_path[5:] - extra.update({"LINENUM": trace.lineno, "FILENAME": ".".join(files)}) - log[level]( - f"cephci.{extra['FILENAME']}:{extra['LINENUM']} - {message}", - *args, - extra=extra, - **kwargs, - ) - - def info(self, message: Any, *args, **kwargs) -> None: - """Log with info level the provided message and extra data. - - Args: - message (Any): The message to be logged. - args (Any): Dynamic list of supported arguments. - kwargs (Any): Dynamic list of supported keyword arguments. - - Returns: - None + def log_error(self, message: str) -> None: """ - self._log("info", message, *args, **kwargs) - - def debug(self, message: Any, *args, **kwargs) -> None: - """Log with debug level the provided message and extra data. + Logs an error and appends it to the internal error tracker. Args: - message (str): The message to be logged. - args (Any): Dynamic list of supported arguments. - kwargs (Any): Dynamic list of supported keyword arguments. - - Returns: - None + message (str): The error message to log and track. """ - - self._log("debug", message, *args, **kwargs) - - def warning(self, message: Any, *args, **kwargs) -> None: - """Log with warning level the provided message and extra data. - - Args: - message (Any): The message to be logged. - args (Any): Dynamic list of supported arguments. - kwargs (Any): Dynamic list of supported keyword arguments. - - Returns: - None - """ - self._log("warning", message, *args, **kwargs) - - def error(self, message: Any, *args, **kwargs) -> None: - """Log with error level the provided message and extra data. - - Args: - message (Any): The message to be logged. - args (Any): Dynamic list of supported arguments. - kwargs (Any): Dynamic list of supported keyword arguments. - - Returns: - None - """ - if self.rp_logger: - self.rp_logger.log(message=message, level="ERROR") - - self._log("error", message, *args, **kwargs) self._log_errors.append(message) + self.error(message) - def exception(self, message: Any, *args, **kwargs) -> None: - """Log the given message under exception log level. - - Args: - message (Any): Message or record to be emitted. - args (Any): Dynamic list of supported arguments. - kwargs (Any): Dynamic list of supported keyword arguments. - Returns: - None - """ - kwargs["exc_info"] = kwargs.get("exc_info", True) - self._log("exception", message, *args, **kwargs) - - def configure_logger(self, test_name, run_dir, disable_console_log): + def configure_logger(self, test_name, run_dir, disable_console_log, **kwargs): """ Configures a new FileHandler for the root logger. Args: @@ -218,7 +116,7 @@ def configure_logger(self, test_name, run_dir, disable_console_log): log_format = logging.Formatter(self.log_format) full_log_name = f"{test_name}.log" test_logfile = os.path.join(run_dir, full_log_name) - self.info(f"Test logfile: {test_logfile}") + self._logger.info(f"Test logfile: {test_logfile}") if disable_console_log: self._logger.propagate = False _handler = logging.FileHandler(test_logfile) @@ -242,18 +140,16 @@ def configure_logger(self, test_name, run_dir, disable_console_log): else run_dir ) log_url = f"{url_base}/{full_log_name}" - self.debug("Completed log configuration") + self._logger.debug("Completed log configuration") return log_url def close_and_remove_filehandlers(self): """ - Close FileHandlers and then remove them from the loggers handlers list. - Returns: - None + Close FileHandlers and then remove them from the logger's handlers list. """ handlers = self._logger.handlers[:] - for h in handlers: - if isinstance(h, logging.FileHandler): - h.close() - self._logger.removeHandler(h) + for handler in handlers: + if isinstance(handler, logging.FileHandler): + handler.close() + self._logger.removeHandler(handler)