From 4a3404495246df7eca487cb55b976feda6009dcd Mon Sep 17 00:00:00 2001 From: Jingyi Zhu <44248750+jenny011@users.noreply.github.com> Date: Tue, 14 May 2024 23:34:59 +0200 Subject: [PATCH] Add DataDriftTrigger: supports one Evidently metric (#409) This is a clean version of PR#367. 1. Add DataDriftTrigger class to supervisor. Supports one configurable Evidently metric. Launches drift detection every N new data points. Data used in detection are data trained in the previous trigger and all the untriggered new data. 2. Update Trigger interface. `Trigger.inform()` returns a Generator instead of List. 3. Add a generic ModelDownloader in supervisor. 4. Add example pipelines using DataDriftTrigger. 5. Add Evidently to pylint known third party. 6. Change ModelDownloader to embedding encoder utils. The downloader sets up and returns the model. The DataDriftTrigger owns the model. Future 1. Support multiple configurable Evidently metric. #416 2. Support Alibi-Detect. #414 3. Support custom embedding encoder. #417 4. Support different windowing for detection data, e.g. compare with all previously trained data. #418 5. Common DataLoaderInfo #415 --- .../data_drift_trigger/arxiv_datadrift.yaml | 71 +++++ .../huffpost_datadrift.yaml | 73 +++++ .../yearbook_datadrift.yaml | 76 +++++ environment.yml | 1 + modyn/config/schema/pipeline.py | 2 - .../pipeline_executor/pipeline_executor.py | 32 +- .../supervisor/internal/triggers/__init__.py | 1 + .../internal/triggers/amounttrigger.py | 6 +- .../internal/triggers/datadrifttrigger.py | 272 +++++++++++++++++ .../embedding_encoder_utils/__init__.py | 12 + .../embedding_encoder.py | 54 ++++ .../embedding_encoder_downloader.py | 82 +++++ .../internal/triggers/timetrigger.py | 13 +- modyn/supervisor/internal/triggers/trigger.py | 28 +- .../triggers/trigger_datasets/__init__.py | 13 + .../trigger_datasets/dataloader_info.py | 31 ++ .../trigger_datasets/fixed_keys_dataset.py | 144 +++++++++ .../online_trigger_dataset.py | 63 ++++ modyn/supervisor/internal/triggers/utils.py | 142 +++++++++ .../test_pipeline_executor.py | 7 +- .../internal/triggers/test_amounttrigger.py | 30 +- .../triggers/test_datadrifttrigger.py | 211 +++++++++++++ .../internal/triggers/test_timetrigger.py | 24 +- .../test_fixed_keys_dataset.py | 288 ++++++++++++++++++ .../test_online_trigger_dataset.py | 84 +++++ 25 files changed, 1709 insertions(+), 51 deletions(-) create mode 100644 benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/arxiv_datadrift.yaml create mode 100644 benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/huffpost_datadrift.yaml create mode 100644 benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/yearbook_datadrift.yaml create mode 100644 modyn/supervisor/internal/triggers/datadrifttrigger.py create mode 100644 modyn/supervisor/internal/triggers/embedding_encoder_utils/__init__.py create mode 100644 modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder.py create mode 100644 modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder_downloader.py create mode 100644 modyn/supervisor/internal/triggers/trigger_datasets/__init__.py create mode 100644 modyn/supervisor/internal/triggers/trigger_datasets/dataloader_info.py create mode 100644 modyn/supervisor/internal/triggers/trigger_datasets/fixed_keys_dataset.py create mode 100644 modyn/supervisor/internal/triggers/trigger_datasets/online_trigger_dataset.py create mode 100644 modyn/supervisor/internal/triggers/utils.py create mode 100644 modyn/tests/supervisor/internal/triggers/test_datadrifttrigger.py create mode 100644 modyn/tests/supervisor/internal/triggers/trigger_datasets/test_fixed_keys_dataset.py create mode 100644 modyn/tests/supervisor/internal/triggers/trigger_datasets/test_online_trigger_dataset.py diff --git a/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/arxiv_datadrift.yaml b/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/arxiv_datadrift.yaml new file mode 100644 index 000000000..c34b3ad6d --- /dev/null +++ b/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/arxiv_datadrift.yaml @@ -0,0 +1,71 @@ +pipeline: + name: ArXiv dataset Test Pipeline + description: Example pipeline + version: 1.0.0 +model: + id: ArticleNet + config: + num_classes: 172 +model_storage: + full_model_strategy: + name: "PyTorchFullModel" +training: + gpus: 1 + device: "cuda:0" + dataloader_workers: 2 + use_previous_model: True + initial_model: random + batch_size: 96 + optimizers: + - name: "default" + algorithm: "SGD" + source: "PyTorch" + param_groups: + - module: "model" + config: + lr: 0.00002 + momentum: 0.9 + weight_decay: 0.01 + optimization_criterion: + name: "CrossEntropyLoss" + checkpointing: + activated: False + selection_strategy: + name: NewDataStrategy + maximum_keys_in_memory: 10000 + config: + storage_backend: "database" + limit: -1 + reset_after_trigger: True + seed: 42 + epochs_per_trigger: 1 +data: + dataset_id: arxiv_train + bytes_parser_function: | + def bytes_parser_function(data: bytes) -> str: + return str(data, "utf8") + tokenizer: DistilBertTokenizerTransform + +trigger: + id: DataDriftTrigger + trigger_config: + data_points_for_detection: 100000 + sample_size: 5000 + +evaluation: + device: "cuda:0" + result_writers: ["json"] + datasets: + - dataset_id: arxiv_test + bytes_parser_function: | + def bytes_parser_function(data: bytes) -> str: + return str(data, "utf8") + tokenizer: DistilBertTokenizerTransform + batch_size: 96 + dataloader_workers: 2 + metrics: + - name: "Accuracy" + evaluation_transformer_function: | + import torch + def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor: + return torch.argmax(model_output, dim=-1) \ No newline at end of file diff --git a/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/huffpost_datadrift.yaml b/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/huffpost_datadrift.yaml new file mode 100644 index 000000000..96d269723 --- /dev/null +++ b/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/huffpost_datadrift.yaml @@ -0,0 +1,73 @@ +pipeline: + name: Huffpost dataset Test Pipeline + description: Example pipeline + version: 1.0.0 +model: + id: ArticleNet + config: + num_classes: 55 +model_storage: + full_model_strategy: + name: "PyTorchFullModel" +training: + gpus: 1 + device: "cuda:0" + dataloader_workers: 2 + use_previous_model: True + initial_model: random + batch_size: 64 + optimizers: + - name: "default" + algorithm: "SGD" + source: "PyTorch" + param_groups: + - module: "model" + config: + lr: 0.00002 + momentum: 0.9 + weight_decay: 0.01 + optimization_criterion: + name: "CrossEntropyLoss" + checkpointing: + activated: False + selection_strategy: + name: NewDataStrategy + maximum_keys_in_memory: 1000 + config: + storage_backend: "database" + limit: -1 + reset_after_trigger: True + seed: 42 + epochs_per_trigger: 1 +data: + dataset_id: huffpost_train + bytes_parser_function: | + def bytes_parser_function(data: bytes) -> str: + return str(data, "utf8") + tokenizer: DistilBertTokenizerTransform + +trigger: + id: DataDriftTrigger + trigger_config: + data_points_for_detection: 5000 + metric_name: mmd + metric_config: + bootstrap: False + +evaluation: + device: "cuda:0" + result_writers: ["json"] + datasets: + - dataset_id: huffpost_test + bytes_parser_function: | + def bytes_parser_function(data: bytes) -> str: + return str(data, "utf8") + tokenizer: DistilBertTokenizerTransform + batch_size: 64 + dataloader_workers: 2 + metrics: + - name: "Accuracy" + evaluation_transformer_function: | + import torch + def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor: + return torch.argmax(model_output, dim=-1) \ No newline at end of file diff --git a/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/yearbook_datadrift.yaml b/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/yearbook_datadrift.yaml new file mode 100644 index 000000000..b3aea4a33 --- /dev/null +++ b/benchmark/wildtime_benchmarks/example_pipelines/data_drift_trigger/yearbook_datadrift.yaml @@ -0,0 +1,76 @@ +pipeline: + name: Yearbook Test Pipeline + description: Example pipeline + version: 1.0.0 +model: + id: YearbookNet + config: + num_input_channels: 1 + num_classes: 2 +model_storage: + full_model_strategy: + name: "PyTorchFullModel" +training: + gpus: 1 + device: "cuda:0" + dataloader_workers: 2 + use_previous_model: True + initial_model: random + batch_size: 64 + optimizers: + - name: "default" + algorithm: "SGD" + source: "PyTorch" + param_groups: + - module: "model" + config: + lr: 0.001 + momentum: 0.9 + optimization_criterion: + name: "CrossEntropyLoss" + checkpointing: + activated: False + selection_strategy: + name: NewDataStrategy + maximum_keys_in_memory: 1000 + config: + storage_backend: "database" + limit: -1 + reset_after_trigger: True + seed: 42 + epochs_per_trigger: 1 +data: + dataset_id: yearbook_train + transformations: [] + bytes_parser_function: | + import torch + import numpy as np + def bytes_parser_function(data: bytes) -> torch.Tensor: + return torch.from_numpy(np.frombuffer(data, dtype=np.float32)).reshape(1, 32, 32) + +trigger: + id: DataDriftTrigger + trigger_config: + data_points_for_detection: 1000 + metric_name: model + metric_config: + threshold: 0.7 + +evaluation: + device: "cuda:0" + result_writers: ["json"] + datasets: + - dataset_id: yearbook_test + bytes_parser_function: | + import torch + import numpy as np + def bytes_parser_function(data: bytes) -> torch.Tensor: + return torch.from_numpy(np.frombuffer(data, dtype=np.float32)).reshape(1, 32, 32) + batch_size: 64 + dataloader_workers: 2 + metrics: + - name: "Accuracy" + evaluation_transformer_function: | + import torch + def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor: + return torch.argmax(model_output, dim=-1) \ No newline at end of file diff --git a/environment.yml b/environment.yml index b0d8d5358..a7091ca21 100644 --- a/environment.yml +++ b/environment.yml @@ -22,6 +22,7 @@ dependencies: - grpcio>=1.63 - protobuf==5.26.* - types-protobuf==5.26.* + - evidently - jsonschema - psycopg2 - sqlalchemy>=2.0 diff --git a/modyn/config/schema/pipeline.py b/modyn/config/schema/pipeline.py index 8f2dfff48..eaa161f4c 100644 --- a/modyn/config/schema/pipeline.py +++ b/modyn/config/schema/pipeline.py @@ -164,7 +164,6 @@ class _BaseSelectionStrategyConfig(BaseModel): class FreshnessSamplingStrategyConfig(_BaseSelectionStrategyConfig): - unused_data_ratio: float = Field( 0.0, description=( @@ -175,7 +174,6 @@ class FreshnessSamplingStrategyConfig(_BaseSelectionStrategyConfig): class NewDataSelectionStrategyConfig(_BaseSelectionStrategyConfig): - limit_reset: LimitResetStrategy = Field( description=( "Strategy to follow for respecting the limit in case of reset. Only used when reset_after_trigger == true." diff --git a/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py b/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py index 162eb2584..3f7d59cc0 100644 --- a/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py @@ -6,7 +6,7 @@ import sys import traceback from time import sleep -from typing import Any, Optional +from typing import Any, Generator, Optional from modyn.common.benchmark import Stopwatch from modyn.supervisor.internal.evaluation_result_writer import LogResultWriter @@ -113,6 +113,9 @@ def _setup_trigger(self) -> None: trigger_module = dynamic_module_import("modyn.supervisor.internal.triggers") self.trigger: Trigger = getattr(trigger_module, trigger_id)(trigger_config) + self.trigger.init_trigger(self.pipeline_id, self.pipeline_config, self.modyn_config, self.eval_directory) + if self.previous_model_id is not None: + self.trigger.inform_previous_model(self.previous_model_id) assert self.trigger is not None, "Error during trigger initialization" @@ -177,15 +180,15 @@ def _handle_new_data(self, new_data: list[tuple[int, int, int]]) -> bool: def _handle_new_data_batch(self, batch: list[tuple[int, int, int]]) -> bool: self._sw.start("trigger_inform", overwrite=True) - triggering_indices = self.trigger.inform(batch) - num_triggers = len(triggering_indices) - self.pipeline_log["supervisor"]["num_triggers"] += len(triggering_indices) + triggering_indices: Generator[int, None, None] = self.trigger.inform(batch) + num_triggers = self._handle_triggers_within_batch(batch, triggering_indices) + + logger.info(f"There are {num_triggers} triggers in this batch.") + self.pipeline_log["supervisor"]["num_triggers"] += num_triggers self.pipeline_log["supervisor"]["trigger_batch_times"].append( {"batch_size": len(batch), "time": self._sw.stop("trigger_inform"), "num_triggers": num_triggers} ) - logger.info(f"There are {num_triggers} triggers in this batch.") - self._handle_triggers_within_batch(batch, triggering_indices) return num_triggers > 0 def _run_training(self, trigger_id: int) -> None: @@ -223,6 +226,7 @@ def _run_training(self, trigger_id: int) -> None: # We store the trained model for evaluation in any case. self._sw.start("store_trained_model", overwrite=True) model_id = self.grpc.store_trained_model(self.current_training_id) + self.trigger.inform_previous_model(model_id) self.pipeline_log["supervisor"]["triggers"][trigger_id]["store_trained_model_time"] = self._sw.stop() # Only if the pipeline actually wants to continue the training on it, we set previous model. @@ -270,12 +274,17 @@ def _get_trigger_timespan( return first_timestamp, last_timestamp - def _handle_triggers_within_batch(self, batch: list[tuple[int, int, int]], triggering_indices: list[int]) -> None: + def _handle_triggers_within_batch( + self, batch: list[tuple[int, int, int]], triggering_indices: Generator[int, None, None] + ) -> int: previous_trigger_idx = 0 logger.info("Handling triggers within batch.") self._update_pipeline_stage_and_enqueue_msg(PipelineStage.HANDLE_TRIGGERS_WITHIN_BATCH, MsgType.GENERAL) + triggering_idx_list = [] + for i, triggering_idx in enumerate(triggering_indices): + triggering_idx_list.append(triggering_idx) self._update_pipeline_stage_and_enqueue_msg(PipelineStage.INFORM_SELECTOR_AND_TRIGGER, MsgType.GENERAL) triggering_data = batch[previous_trigger_idx : triggering_idx + 1] previous_trigger_idx = triggering_idx + 1 @@ -294,6 +303,7 @@ def _handle_triggers_within_batch(self, batch: list[tuple[int, int, int]], trigg num_samples_in_trigger = self.grpc.get_number_of_samples(self.pipeline_id, trigger_id) if num_samples_in_trigger > 0: + self.trigger.inform_previous_trigger_and_data_points(trigger_id, num_samples_in_trigger) first_timestamp, last_timestamp = self._get_trigger_timespan(i == 0, triggering_data) self.pipeline_log["supervisor"]["triggers"][trigger_id]["first_timestamp"] = first_timestamp self.pipeline_log["supervisor"]["triggers"][trigger_id]["last_timestamp"] = last_timestamp @@ -309,13 +319,13 @@ def _handle_triggers_within_batch(self, batch: list[tuple[int, int, int]], trigg self.num_triggers = self.num_triggers + 1 if self.maximum_triggers is not None and self.num_triggers >= self.maximum_triggers: - break + return len(triggering_idx_list) # we have to inform the Selector about the remaining data in this batch. - if len(triggering_indices) == 0: + if len(triggering_idx_list) == 0: remaining_data = batch else: - remaining_data = batch[triggering_indices[-1] + 1 :] + remaining_data = batch[triggering_idx_list[-1] + 1 :] logger.info(f"There are {len(remaining_data)} data points remaining after the trigger.") if len(remaining_data) > 0: @@ -335,6 +345,8 @@ def _handle_triggers_within_batch(self, batch: list[tuple[int, int, int]], trigg else: self.remaining_data_range = None + return len(triggering_idx_list) + def _init_evaluation_writer(self, name: str, trigger_id: int) -> LogResultWriter: return self.supervisor_supported_eval_result_writers[name](self.pipeline_id, trigger_id, self.eval_directory) diff --git a/modyn/supervisor/internal/triggers/__init__.py b/modyn/supervisor/internal/triggers/__init__.py index ac411500e..7c296b5ed 100644 --- a/modyn/supervisor/internal/triggers/__init__.py +++ b/modyn/supervisor/internal/triggers/__init__.py @@ -5,6 +5,7 @@ import os from .amounttrigger import DataAmountTrigger # noqa: F401 +from .datadrifttrigger import DataDriftTrigger # noqa: F401 from .timetrigger import TimeTrigger # noqa: F401 from .trigger import Trigger # noqa: F401 diff --git a/modyn/supervisor/internal/triggers/amounttrigger.py b/modyn/supervisor/internal/triggers/amounttrigger.py index af2bc8d8e..76ea28b14 100644 --- a/modyn/supervisor/internal/triggers/amounttrigger.py +++ b/modyn/supervisor/internal/triggers/amounttrigger.py @@ -1,3 +1,5 @@ +from typing import Generator + from modyn.supervisor.internal.triggers.trigger import Trigger @@ -14,7 +16,7 @@ def __init__(self, trigger_config: dict): super().__init__(trigger_config) - def inform(self, new_data: list[tuple[int, int, int]]) -> list[int]: + def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, None]: assert self.remaining_data_points < self.data_points_for_trigger, "Inconsistent remaining datapoints" first_idx = self.data_points_for_trigger - self.remaining_data_points - 1 @@ -22,4 +24,4 @@ def inform(self, new_data: list[tuple[int, int, int]]) -> list[int]: self.remaining_data_points = (self.remaining_data_points + len(new_data)) % self.data_points_for_trigger - return triggering_indices + yield from triggering_indices diff --git a/modyn/supervisor/internal/triggers/datadrifttrigger.py b/modyn/supervisor/internal/triggers/datadrifttrigger.py new file mode 100644 index 000000000..a6b375dd8 --- /dev/null +++ b/modyn/supervisor/internal/triggers/datadrifttrigger.py @@ -0,0 +1,272 @@ +import logging +import pathlib +from typing import Generator, Optional + +import pandas as pd +from evidently import ColumnMapping +from evidently.report import Report +from modyn.supervisor.internal.triggers.embedding_encoder_utils import EmbeddingEncoder, EmbeddingEncoderDownloader + +# pylint: disable-next=no-name-in-module +from modyn.supervisor.internal.triggers.trigger import Trigger +from modyn.supervisor.internal.triggers.trigger_datasets import DataLoaderInfo +from modyn.supervisor.internal.triggers.utils import ( + convert_tensor_to_df, + get_embeddings, + get_evidently_metrics, + prepare_trigger_dataloader_by_trigger, + prepare_trigger_dataloader_fixed_keys, +) + +logger = logging.getLogger(__name__) + + +class DataDriftTrigger(Trigger): + """Triggers when a certain number of data points have been used.""" + + def __init__(self, trigger_config: dict): + self.pipeline_id: Optional[int] = None + self.pipeline_config: Optional[dict] = None + self.modyn_config: Optional[dict] = None + self.base_dir: Optional[pathlib.Path] = None + + self.previous_trigger_id: Optional[int] = None + self.previous_model_id: Optional[int] = None + self.previous_data_points: Optional[int] = None + self.model_updated: bool = False + + self.dataloader_info: Optional[DataLoaderInfo] = None + self.encoder_downloader: Optional[EmbeddingEncoderDownloader] = None + self.embedding_encoder: Optional[EmbeddingEncoder] = None + + self.detection_interval: int = 1000 + self.sample_size: Optional[int] = None + self.evidently_column_mapping_name = "data" + self.metrics: Optional[list] = None + + self.data_cache: list[tuple[int, int, int]] = [] + self.leftover_data_points = 0 + + if len(trigger_config) > 0: + self._parse_trigger_config(trigger_config) + + super().__init__(trigger_config) + + def _parse_trigger_config(self, trigger_config: dict) -> None: + if "data_points_for_detection" in trigger_config.keys(): + self.detection_interval = trigger_config["data_points_for_detection"] + assert self.detection_interval > 0, "data_points_for_detection needs to be at least 1" + + if "sample_size" in trigger_config.keys(): + self.sample_size = trigger_config["sample_size"] + assert self.sample_size is None or self.sample_size > 0, "sample_size needs to be at least 1" + + self.metrics = get_evidently_metrics(self.evidently_column_mapping_name, trigger_config) + + def _init_dataloader_info(self) -> None: + assert self.pipeline_id is not None + assert self.pipeline_config is not None + assert self.modyn_config is not None + + training_config = self.pipeline_config["training"] + data_config = self.pipeline_config["data"] + + if "num_prefetched_partitions" in training_config: + num_prefetched_partitions = training_config["num_prefetched_partitions"] + else: + if "prefetched_partitions" in training_config: + raise ValueError( + "Found `prefetched_partitions` instead of `num_prefetched_partitions`in training configuration." + + " Please rename/remove that configuration" + ) + logger.warning("Number of prefetched partitions not explicitly given in training config - defaulting to 1.") + num_prefetched_partitions = 1 + + if "parallel_prefetch_requests" in training_config: + parallel_prefetch_requests = training_config["parallel_prefetch_requests"] + else: + logger.warning( + "Number of parallel prefetch requests not explicitly given in training config - defaulting to 1." + ) + parallel_prefetch_requests = 1 + + if "tokenizer" in data_config: + tokenizer = data_config["tokenizer"] + else: + tokenizer = None + + if "transformations" in data_config: + transform_list = data_config["transformations"] + else: + transform_list = [] + + self.dataloader_info = DataLoaderInfo( + self.pipeline_id, + dataset_id=data_config["dataset_id"], + num_dataloaders=training_config["dataloader_workers"], + batch_size=training_config["batch_size"], + bytes_parser=data_config["bytes_parser_function"], + transform_list=transform_list, + storage_address=f"{self.modyn_config['storage']['hostname']}:{self.modyn_config['storage']['port']}", + selector_address=f"{self.modyn_config['selector']['hostname']}:{self.modyn_config['selector']['port']}", + num_prefetched_partitions=num_prefetched_partitions, + parallel_prefetch_requests=parallel_prefetch_requests, + tokenizer=tokenizer, + ) + + def _init_encoder_downloader(self) -> None: + assert self.pipeline_id is not None + assert self.pipeline_config is not None + assert self.modyn_config is not None + assert self.base_dir is not None + + self.encoder_downloader = EmbeddingEncoderDownloader( + self.modyn_config, + self.pipeline_id, + self.base_dir, + f"{self.modyn_config['model_storage']['hostname']}:{self.modyn_config['model_storage']['port']}", + ) + + def init_trigger(self, pipeline_id: int, pipeline_config: dict, modyn_config: dict, base_dir: pathlib.Path) -> None: + self.pipeline_id = pipeline_id + self.pipeline_config = pipeline_config + self.modyn_config = modyn_config + self.base_dir = base_dir + + self._init_dataloader_info() + self._init_encoder_downloader() + + def run_detection(self, reference_embeddings_df: pd.DataFrame, current_embeddings_df: pd.DataFrame) -> bool: + assert self.dataloader_info is not None + + # Run Evidently detection + # ColumnMapping is {mapping name: column indices}, + # an Evidently way of identifying (sub)columns to use in the detection. + # e.g. {"even columns": [0,2,4]}. + column_mapping = ColumnMapping(embeddings={self.evidently_column_mapping_name: reference_embeddings_df.columns}) + + # https://docs.evidentlyai.com/user-guide/customization/embeddings-drift-parameters + report = Report(metrics=self.metrics) + report.run( + reference_data=reference_embeddings_df, current_data=current_embeddings_df, column_mapping=column_mapping + ) + result = report.as_dict() + result_print = [ + (x["result"]["drift_score"], x["result"]["method_name"], x["result"]["drift_detected"]) + for x in result["metrics"] + ] + logger.info( + f"[DataDriftDetector][Prev Trigger {self.previous_trigger_id}][Dataset {self.dataloader_info.dataset_id}]" + + f"[Result] {result_print}" + ) + + return result["metrics"][0]["result"]["drift_detected"] + + def detect_drift(self, idx_start: int, idx_end: int) -> bool: + """Compare current data against reference data. + current data: all untriggered samples in the sliding window in inform(). + reference data: the training samples of the previous trigger. + Get the dataloaders, download the embedding encoder model if necessary, + compute embeddings of current and reference data, then run detection on the embeddings. + """ + assert self.previous_trigger_id is not None + assert self.previous_data_points is not None and self.previous_data_points > 0 + assert self.previous_model_id is not None + assert self.dataloader_info is not None + assert self.encoder_downloader is not None + assert self.pipeline_config is not None + + reference_dataloader = prepare_trigger_dataloader_by_trigger( + self.previous_trigger_id, + self.dataloader_info, + data_points_in_trigger=self.previous_data_points, + sample_size=self.sample_size, + ) + + current_keys, _, _ = zip(*self.data_cache[idx_start:idx_end]) # type: ignore + current_dataloader = prepare_trigger_dataloader_fixed_keys( + self.previous_trigger_id + 1, + self.dataloader_info, + current_keys, # type: ignore + sample_size=self.sample_size, + ) + + # Download previous model as embedding encoder + # TODO(417) Support custom model as embedding encoder + if self.model_updated: + self.embedding_encoder = self.encoder_downloader.setup_encoder( + self.previous_model_id, self.pipeline_config["training"]["device"] + ) + self.model_updated = False + + # Compute embeddings + assert self.embedding_encoder is not None + reference_embeddings = get_embeddings(self.embedding_encoder, reference_dataloader) + current_embeddings = get_embeddings(self.embedding_encoder, current_dataloader) + reference_embeddings_df = convert_tensor_to_df(reference_embeddings, "col_") + current_embeddings_df = convert_tensor_to_df(current_embeddings, "col_") + + drift_detected = self.run_detection(reference_embeddings_df, current_embeddings_df) + + return drift_detected + + def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, None]: + """The DataDriftTrigger takes a batch of new data as input. It adds the new data to its data_cache. + Then, it iterates through the data_cache with a sliding window of current data for drift detection. + The sliding window is determined by two pointers: detection_idx_start and detection_idx_end. + We fix the start pointer and advance the end pointer by detection_interval in every iteration. + In every iteration we run data drift detection on the sliding window of current data. + Note, if we have remaining untriggered data from the previous batch of new data, + we include all of them in the first drift detection. + The remaining untriggered data has been processed in the previous new data batch, + so there's no need to run detection only on remaining data in this batch. + If a retraining is triggered, all data in the sliding window becomes triggering data. Advance the start ptr. + After traversing the data_cache, we remove all the triggered data from the cache + and record the number of remaining untriggered samples. + Use Generator here because this data drift trigger + needs to wait for the previous trigger to finish and get the model. + """ + # add new data to data_cache + self.data_cache.extend(new_data) + + unvisited_data_points = len(self.data_cache) + untriggered_data_points = unvisited_data_points + # the sliding window of data points for detection + detection_idx_start = 0 + detection_idx_end = 0 + + while unvisited_data_points >= self.detection_interval: + unvisited_data_points -= self.detection_interval + detection_idx_end += self.detection_interval + if detection_idx_end <= self.leftover_data_points: + continue + + # trigger_id doesn't always start from 0 + if self.previous_trigger_id is None: + # if no previous trigger exists, always trigger retraining + triggered = True + else: + # if exist previous trigger, detect drift + triggered = self.detect_drift(detection_idx_start, detection_idx_end) + + if triggered: + trigger_data_points = detection_idx_end - detection_idx_start + # Index of the last sample of the trigger. Index is relative to the new_data list. + trigger_idx = len(new_data) - (untriggered_data_points - trigger_data_points) - 1 + + # update bookkeeping and sliding window + untriggered_data_points -= trigger_data_points + detection_idx_start = detection_idx_end + yield trigger_idx + + # remove triggered data from cache + del self.data_cache[:detection_idx_start] + self.leftover_data_points = detection_idx_end - detection_idx_start + + def inform_previous_trigger_and_data_points(self, previous_trigger_id: int, data_points: int) -> None: + self.previous_trigger_id = previous_trigger_id + self.previous_data_points = data_points + + def inform_previous_model(self, previous_model_id: int) -> None: + self.previous_model_id = previous_model_id + self.model_updated = True diff --git a/modyn/supervisor/internal/triggers/embedding_encoder_utils/__init__.py b/modyn/supervisor/internal/triggers/embedding_encoder_utils/__init__.py new file mode 100644 index 000000000..982eaa1ca --- /dev/null +++ b/modyn/supervisor/internal/triggers/embedding_encoder_utils/__init__.py @@ -0,0 +1,12 @@ +"""Supervisor module. The supervisor initiates a pipeline and coordinates all components. + +""" + +import os + +from .embedding_encoder import EmbeddingEncoder # noqa: F401 +from .embedding_encoder_downloader import EmbeddingEncoderDownloader # noqa: F401 + +files = os.listdir(os.path.dirname(__file__)) +files.remove("__init__.py") +__all__ = [f[:-3] for f in files if f.endswith(".py")] diff --git a/modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder.py b/modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder.py new file mode 100644 index 000000000..45b53c675 --- /dev/null +++ b/modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder.py @@ -0,0 +1,54 @@ +import io +import json +import logging +import pathlib + +import torch +from modyn.models.coreset_methods_support import CoresetSupportingModule +from modyn.utils import dynamic_module_import + +logger = logging.getLogger(__name__) + + +class EmbeddingEncoder: + """The EmbeddingEncoder stores a model and its metadata. + DataDriftTrigger uses EmbeddingEncoder to run the model. + """ + + def __init__( + self, + model_id: int, + model_class_name: str, + model_config: str, + device: str, + amp: bool, + ): + self.model_id = model_id + self.device = device + self.device_type = "cuda" if "cuda" in self.device else "cpu" + self.amp = amp + + self.model_class_name = model_class_name + model_module = dynamic_module_import("modyn.models") + self.model_handler = getattr(model_module, model_class_name) + assert self.model_handler is not None + + self.model_configuration_dict = json.loads(model_config) + + self._model = self.model_handler(self.model_configuration_dict, device, amp) + assert self._model is not None + # The model must be able to record embeddings. + assert isinstance(self._model.model, CoresetSupportingModule) + + def _load_state(self, path: pathlib.Path) -> None: + assert path.exists(), "Cannot load state from non-existing file" + + logger.info(f"Loading model state from {path}") + with open(path, "rb") as state_file: + checkpoint = torch.load(io.BytesIO(state_file.read()), map_location=torch.device("cpu")) + + assert "model" in checkpoint + self._model.model.load_state_dict(checkpoint["model"]) + + # delete trained model from disk + path.unlink() diff --git a/modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder_downloader.py b/modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder_downloader.py new file mode 100644 index 000000000..cde0c9167 --- /dev/null +++ b/modyn/supervisor/internal/triggers/embedding_encoder_utils/embedding_encoder_downloader.py @@ -0,0 +1,82 @@ +import logging +import pathlib +from typing import Optional + +import grpc +from modyn.common.ftp import download_trained_model +from modyn.metadata_database.metadata_database_connection import MetadataDatabaseConnection +from modyn.metadata_database.models import TrainedModel + +# pylint: disable-next=no-name-in-module +from modyn.model_storage.internal.grpc.generated.model_storage_pb2 import FetchModelRequest, FetchModelResponse +from modyn.model_storage.internal.grpc.generated.model_storage_pb2_grpc import ModelStorageStub +from modyn.supervisor.internal.triggers.embedding_encoder_utils import EmbeddingEncoder +from modyn.utils.utils import grpc_connection_established + +logger = logging.getLogger(__name__) + + +class EmbeddingEncoderDownloader: + """The embedding encoder downloader provides a simple interface setup_encoder() to the DataDriftTrigger. + Given a model_id and a device, it creates an EmbeddingEncoder, + downloads model parameters and loads model state. + """ + + def __init__( + self, + modyn_config: dict, + pipeline_id: int, + base_dir: pathlib.Path, + model_storage_address: str, + ): + self.modyn_config = modyn_config + self.pipeline_id = pipeline_id + self.base_dir = base_dir + assert self.base_dir.exists(), f"Temporary Directory {self.base_dir} should have been created." + self._model_storage_stub = self.connect_to_model_storage(model_storage_address) + + @staticmethod + def connect_to_model_storage(model_storage_address: str) -> ModelStorageStub: + model_storage_channel = grpc.insecure_channel(model_storage_address) + assert model_storage_channel is not None + if not grpc_connection_established(model_storage_channel): + raise ConnectionError( + f"Could not establish gRPC connection to model storage at address {model_storage_address}." + ) + return ModelStorageStub(model_storage_channel) + + def configure(self, model_id: int, device: str) -> Optional[EmbeddingEncoder]: + with MetadataDatabaseConnection(self.modyn_config) as database: + trained_model: Optional[TrainedModel] = database.session.get(TrainedModel, model_id) + if not trained_model: + logger.error(f"Trained model {model_id} does not exist!") + return None + model_class_name, model_config, amp = database.get_model_configuration(trained_model.pipeline_id) + + embedding_encoder = EmbeddingEncoder(model_id, model_class_name, model_config, device, amp) + return embedding_encoder + + def download(self, model_id: int) -> pathlib.Path: + fetch_request = FetchModelRequest(model_id=model_id, load_metadata=False) + fetch_resp: FetchModelResponse = self._model_storage_stub.FetchModel(fetch_request) + + if not fetch_resp.success: + logger.error(f"Trained model {model_id} cannot be fetched from model storage. ") + raise Exception("Failed to fetch trained model") # pylint: disable=broad-exception-raised + trained_model_path = download_trained_model( + logger=logger, + model_storage_config=self.modyn_config["model_storage"], + remote_path=pathlib.Path(fetch_resp.model_path), + checksum=fetch_resp.checksum, + identifier=self.pipeline_id, + base_directory=self.base_dir, + ) + assert trained_model_path is not None + return trained_model_path + + def setup_encoder(self, model_id: int, device: str) -> EmbeddingEncoder: + embedding_encoder = self.configure(model_id, device) + assert embedding_encoder is not None + trained_model_path = self.download(model_id) + embedding_encoder._load_state(trained_model_path) + return embedding_encoder diff --git a/modyn/supervisor/internal/triggers/timetrigger.py b/modyn/supervisor/internal/triggers/timetrigger.py index ed944164c..ee74f03d9 100644 --- a/modyn/supervisor/internal/triggers/timetrigger.py +++ b/modyn/supervisor/internal/triggers/timetrigger.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Generator, Optional from modyn.supervisor.internal.triggers.trigger import Trigger from modyn.utils import convert_timestr_to_seconds, validate_timestr @@ -25,19 +25,22 @@ def __init__(self, trigger_config: dict): super().__init__(trigger_config) - def inform(self, new_data: list[tuple[int, int, int]]) -> list[int]: + def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, None]: if self.next_trigger_at is None: if len(new_data) > 0: self.next_trigger_at = new_data[0][1] + self.trigger_every_s # new_data is sorted else: - return [] + return max_timestamp = new_data[-1][1] # new_data is sorted triggering_indices = [] while self.next_trigger_at <= max_timestamp: # The next line gets the first item which has a timestamp larger or equal to the triggering timestamp - idx = next(idx for (idx, (_, timestamp, _)) in enumerate(new_data) if timestamp >= self.next_trigger_at) + try: + idx = next(idx for (idx, (_, timestamp, _)) in enumerate(new_data) if timestamp >= self.next_trigger_at) + except StopIteration: + break # This index `idx` describes the first item not belonging to the trigger. # Hence, the previous item causes a trigger. # If this is the first item, then we need to emit a trigger for index -1. @@ -49,4 +52,4 @@ def inform(self, new_data: list[tuple[int, int, int]]) -> list[int]: triggering_indices.append(idx - 1) self.next_trigger_at += self.trigger_every_s - return triggering_indices + yield from triggering_indices diff --git a/modyn/supervisor/internal/triggers/trigger.py b/modyn/supervisor/internal/triggers/trigger.py index 642fdf0cf..097e37d1b 100644 --- a/modyn/supervisor/internal/triggers/trigger.py +++ b/modyn/supervisor/internal/triggers/trigger.py @@ -1,16 +1,25 @@ +import pathlib from abc import ABC, abstractmethod +from typing import Generator class Trigger(ABC): def __init__(self, trigger_config: dict) -> None: assert trigger_config is not None, "trigger_config cannot be None." + # pylint: disable=unnecessary-pass + def init_trigger(self, pipeline_id: int, pipeline_config: dict, modyn_config: dict, base_dir: pathlib.Path) -> None: + """The supervisor initializes the concrete Trigger with Trigger-type-specific configurations + base_dir: the base directory to store Trigger outputs. A location at the supervisor. + """ + pass + @abstractmethod - def inform(self, new_data: list[tuple[int, int, int]]) -> list[int]: - """The supervisor informs the trigger about new data. - In case the concrete trigger implementation decides to trigger, we return a list of _indices into new_data_. + def inform(self, new_data: list[tuple[int, int, int]]) -> Generator[int, None, None]: + """The supervisor informs the Trigger about new data. + In case the concrete Trigger implementation decides to trigger, we return a list of _indices into new_data_. This list contains the indices of all data points that cause a trigger. - The list might be empty or only contain a single element, which concrete triggers need to respect. + The list might be empty or only contain a single element, which concrete Triggers need to respect. Parameters: new_data (list[tuple[str, int, int]]): List of new data (keys, timestamps, labels). Can be empty. @@ -18,3 +27,14 @@ def inform(self, new_data: list[tuple[int, int, int]]) -> list[int]: Returns: triggering_indices (list[int]): List of all indices that trigger training """ + + # pylint: disable=unnecessary-pass + def inform_previous_trigger_and_data_points(self, previous_trigger_id: int, data_points: int) -> None: + """The supervisor informs the Trigger about the previous trigger_id + and data points in the previous trigger.""" + pass + + # pylint: disable=unnecessary-pass + def inform_previous_model(self, previous_model_id: int) -> None: + """The supervisor informs the Trigger about the model_id of the previous trigger""" + pass diff --git a/modyn/supervisor/internal/triggers/trigger_datasets/__init__.py b/modyn/supervisor/internal/triggers/trigger_datasets/__init__.py new file mode 100644 index 000000000..87f98de46 --- /dev/null +++ b/modyn/supervisor/internal/triggers/trigger_datasets/__init__.py @@ -0,0 +1,13 @@ +"""Supervisor module. The supervisor initiates a pipeline and coordinates all components. + +""" + +import os + +from .dataloader_info import DataLoaderInfo # noqa: F401 +from .fixed_keys_dataset import FixedKeysDataset # noqa: F401 +from .online_trigger_dataset import OnlineTriggerDataset # noqa: F401 + +files = os.listdir(os.path.dirname(__file__)) +files.remove("__init__.py") +__all__ = [f[:-3] for f in files if f.endswith(".py")] diff --git a/modyn/supervisor/internal/triggers/trigger_datasets/dataloader_info.py b/modyn/supervisor/internal/triggers/trigger_datasets/dataloader_info.py new file mode 100644 index 000000000..68a2a5b04 --- /dev/null +++ b/modyn/supervisor/internal/triggers/trigger_datasets/dataloader_info.py @@ -0,0 +1,31 @@ +from typing import Optional + + +# TODO(415): Unify with similar classes in trainer_server and evaluator +class DataLoaderInfo: + def __init__( + self, + pipeline_id: int, + dataset_id: str, + num_dataloaders: int, + batch_size: int, + bytes_parser: str, + transform_list: list[str], + storage_address: str, + selector_address: str, + num_prefetched_partitions: int, + parallel_prefetch_requests: int, + tokenizer: Optional[None], + ): + self.pipeline_id = pipeline_id + self.dataset_id = dataset_id + self.num_dataloaders = num_dataloaders + self.batch_size = batch_size + self.bytes_parser = bytes_parser + self.transform_list = transform_list + self.storage_address = storage_address + self.selector_address = selector_address + self.num_prefetched_partitions = num_prefetched_partitions + self.parallel_prefetch_requests = parallel_prefetch_requests + self.tokenizer = tokenizer + self.training_id = -1 diff --git a/modyn/supervisor/internal/triggers/trigger_datasets/fixed_keys_dataset.py b/modyn/supervisor/internal/triggers/trigger_datasets/fixed_keys_dataset.py new file mode 100644 index 000000000..fc094a3e0 --- /dev/null +++ b/modyn/supervisor/internal/triggers/trigger_datasets/fixed_keys_dataset.py @@ -0,0 +1,144 @@ +import logging +import math +from typing import Callable, Generator, Iterable, Optional + +import grpc +from modyn.storage.internal.grpc.generated.storage_pb2 import ( # pylint: disable=no-name-in-module + GetRequest, + GetResponse, +) +from modyn.storage.internal.grpc.generated.storage_pb2_grpc import StorageStub +from modyn.utils import ( + BYTES_PARSER_FUNC_NAME, + MAX_MESSAGE_SIZE, + deserialize_function, + grpc_connection_established, + instantiate_class, +) +from torch.utils.data import IterableDataset, get_worker_info +from torchvision import transforms + +logger = logging.getLogger(__name__) + + +# TODO(#275): inherit common abstraction of dataset +class FixedKeysDataset(IterableDataset): + """The FixedKeysDataset is created given a list of fixed sample keys. It fetches samples by the given sample keys. + It can used when sample keys are known, but the corresponding trigger_id is unknown. + It can also be used when user wants a dataset containing samples from multiple triggers if the keys are known. + In DataDriftTrigger, for example, FixedKeysDataset is used for current untriggered samples + because they belong to a future trigger whose trigger_id is unknown. + """ + + # pylint: disable=too-many-instance-attributes, abstract-method + + def __init__( + self, + dataset_id: str, + bytes_parser: str, + serialized_transforms: list[str], + storage_address: str, + trigger_id: int, + keys: list[int], + tokenizer: Optional[str] = None, + ): + self._trigger_id = trigger_id + self._dataset_id = dataset_id + self._first_call = True + + self._bytes_parser = bytes_parser + self._serialized_transforms = serialized_transforms + self._storage_address = storage_address + self._transform_list: list[Callable] = [] + self._transform: Optional[Callable] = None + self._storagestub: StorageStub = None + self._bytes_parser_function: Optional[Callable] = None + + # tokenizer for NLP tasks + self._tokenizer = None + self._tokenizer_name = tokenizer + if tokenizer is not None: + self._tokenizer = instantiate_class("modyn.models.tokenizers", tokenizer) + + self._keys = keys + + logger.debug("Initialized FixedKeysDataset.") + + def _init_transforms(self) -> None: + self._bytes_parser_function = deserialize_function(self._bytes_parser, BYTES_PARSER_FUNC_NAME) + self._transform = self._bytes_parser_function + self._setup_composed_transform() + + def _setup_composed_transform(self) -> None: + assert self._bytes_parser_function is not None + + self._transform_list = [self._bytes_parser_function] + for transform in self._serialized_transforms: + function = eval(transform) # pylint: disable=eval-used + self._transform_list.append(function) + + if self._tokenizer is not None: + self._transform_list.append(self._tokenizer) + + if len(self._transform_list) > 0: + self._transform = transforms.Compose(self._transform_list) + + def _init_grpc(self) -> None: + storage_channel = grpc.insecure_channel( + self._storage_address, + options=[ + ("grpc.max_receive_message_length", MAX_MESSAGE_SIZE), + ("grpc.max_send_message_length", MAX_MESSAGE_SIZE), + ], + ) + if not grpc_connection_established(storage_channel): + raise ConnectionError(f"Could not establish gRPC connection to storage at address {self._storage_address}.") + self._storagestub = StorageStub(storage_channel) + + def _info(self, msg: str, worker_id: Optional[int]) -> None: # pragma: no cover + logger.info(f"[Trigger {self._trigger_id}][Worker {worker_id}] {msg}") + + def _debug(self, msg: str, worker_id: Optional[int]) -> None: # pragma: no cover + logger.debug(f"[Trigger {self._trigger_id}][Worker {worker_id}] {msg}") + + @staticmethod + def _silence_pil() -> None: # pragma: no cover + pil_logger = logging.getLogger("PIL") + pil_logger.setLevel(logging.INFO) # by default, PIL on DEBUG spams the console + + def _get_data_from_storage(self, keys: list[int]) -> Iterable[list[tuple[int, bytes, int]]]: + request = GetRequest(dataset_id=self._dataset_id, keys=keys) + response: GetResponse + for response in self._storagestub.Get(request): + yield list(zip(response.keys, response.samples, response.labels)) + + def __iter__(self) -> Generator: + worker_info = get_worker_info() + if worker_info is None: + # Non-multi-threaded data loading. We use worker_id 0. + worker_id = 0 + num_workers = 1 + else: + worker_id = worker_info.id + num_workers = worker_info.num_workers + + if self._first_call: + self._first_call = False + self._debug("This is the first run of iter, making gRPC connections.", worker_id) + # We have to initialize transformations and gRPC connections here to do it per dataloader worker, + # otherwise the transformations/gRPC connections cannot be pickled for the new processes. + self._init_transforms() + self._init_grpc() + FixedKeysDataset._silence_pil() + self._debug("gRPC initialized.", worker_id) + + assert self._transform is not None + + total_samples = len(self._keys) + keys_per_worker = int(math.ceil(total_samples / num_workers)) + worker_keys = self._keys[worker_id * keys_per_worker : min(total_samples, (worker_id + 1) * keys_per_worker)] + + # TODO(#175): we might want to do/accelerate prefetching here. + for data in self._get_data_from_storage(worker_keys): + for key, sample, label in data: + yield key, self._transform(sample), label diff --git a/modyn/supervisor/internal/triggers/trigger_datasets/online_trigger_dataset.py b/modyn/supervisor/internal/triggers/trigger_datasets/online_trigger_dataset.py new file mode 100644 index 000000000..0816bc2da --- /dev/null +++ b/modyn/supervisor/internal/triggers/trigger_datasets/online_trigger_dataset.py @@ -0,0 +1,63 @@ +import logging +import random +from typing import Generator, Optional + +from modyn.trainer_server.internal.dataset.online_dataset import OnlineDataset +from torch.utils.data import IterableDataset + +logger = logging.getLogger(__name__) + + +# TODO(#275): inherit common abstraction of dataset +class OnlineTriggerDataset(OnlineDataset, IterableDataset): + """The OnlineTriggerDataset is a wrapper around OnlineDataset in trainer_server. + It uses logic in OnlineDataset obtain samples by trigger_id. + It supports random sampling to reduce the number of samples if the sample_prob is provided. + Random sampling is needed for example in DataDriftTrigger to reduce the number of samples + processed in data drift detection in case there are too many untriggered samples. + """ + + # pylint: disable=too-many-instance-attributes, abstract-method + + def __init__( + self, + pipeline_id: int, + trigger_id: int, + dataset_id: str, + bytes_parser: str, + serialized_transforms: list[str], + storage_address: str, + selector_address: str, + training_id: int, + num_prefetched_partitions: int, + parallel_prefetch_requests: int, + tokenizer: Optional[str] = None, + sample_prob: Optional[float] = None, + ): + OnlineDataset.__init__( + self, + pipeline_id, + trigger_id, + dataset_id, + bytes_parser, + serialized_transforms, + storage_address, + selector_address, + training_id, + num_prefetched_partitions, + parallel_prefetch_requests, + tokenizer, + None, + ) + self._sample_prob = sample_prob + + # pylint: disable=too-many-locals, too-many-branches, too-many-statements + + def __iter__(self) -> Generator: + for transformed_tuple in OnlineDataset.__iter__(self): + if self._sample_prob is not None: + prob = random.random() + if prob < self._sample_prob: + yield transformed_tuple + else: + yield transformed_tuple diff --git a/modyn/supervisor/internal/triggers/utils.py b/modyn/supervisor/internal/triggers/utils.py new file mode 100644 index 000000000..9b087fc70 --- /dev/null +++ b/modyn/supervisor/internal/triggers/utils.py @@ -0,0 +1,142 @@ +import logging +import random +from typing import Optional, Union + +import pandas as pd +import torch +from evidently.metrics import EmbeddingsDriftMetric +from evidently.metrics.data_drift import embedding_drift_methods +from modyn.supervisor.internal.triggers.embedding_encoder_utils import EmbeddingEncoder +from modyn.supervisor.internal.triggers.trigger_datasets import DataLoaderInfo, FixedKeysDataset, OnlineTriggerDataset +from torch.utils.data import DataLoader + +logger = logging.getLogger(__name__) + + +def get_evidently_metrics( + column_mapping_name: str, trigger_config: Optional[dict] = None +) -> list[EmbeddingsDriftMetric]: + """This function instantiates an Evidently metric given metric configuration. + If we want to support multiple metrics in the future, we can change the code to looping through the configurations. + + Evidently metric configurations follow exactly the four DriftMethods defined in embedding_drift_methods: + model, distance, mmd, ratio + If metric_name not given, we use the default 'model' metric. + Otherwise, we use the metric given by metric_name, with optional metric configuration specific to the metric. + """ + if trigger_config is None: + trigger_config = {} + + metric_name: str = "model" + metric_config: dict = {} + + if "metric_name" in trigger_config.keys(): + metric_name = trigger_config["metric_name"] + if "metric_config" in trigger_config.keys(): + metric_config = trigger_config["metric_config"] + + metric = getattr(embedding_drift_methods, metric_name)(**metric_config) + + metrics = [ + EmbeddingsDriftMetric( + column_mapping_name, + drift_method=metric, + ) + ] + return metrics + + +def prepare_trigger_dataloader_by_trigger( + trigger_id: int, + dataloader_info: DataLoaderInfo, + data_points_in_trigger: Optional[int] = None, + sample_size: Optional[int] = None, +) -> DataLoader: + sample_prob: Optional[float] = None + if data_points_in_trigger is not None and sample_size is not None: + sample_prob = sample_size / data_points_in_trigger + + train_set = OnlineTriggerDataset( + dataloader_info.pipeline_id, + trigger_id, + dataloader_info.dataset_id, + dataloader_info.bytes_parser, + dataloader_info.transform_list, + dataloader_info.storage_address, + dataloader_info.selector_address, + dataloader_info.training_id, + dataloader_info.num_prefetched_partitions, + dataloader_info.parallel_prefetch_requests, + dataloader_info.tokenizer, + sample_prob, + ) + + logger.debug("Creating online trigger DataLoader.") + return DataLoader(train_set, batch_size=dataloader_info.batch_size, num_workers=dataloader_info.num_dataloaders) + + +def prepare_trigger_dataloader_fixed_keys( + trigger_id: int, + dataloader_info: DataLoaderInfo, + keys: list[int], + sample_size: Optional[int] = None, +) -> DataLoader: + if sample_size is not None: + keys = random.sample(keys, min(len(keys), sample_size)) + + train_set = FixedKeysDataset( + dataloader_info.dataset_id, + dataloader_info.bytes_parser, + dataloader_info.transform_list, + dataloader_info.storage_address, + trigger_id, + keys, + dataloader_info.tokenizer, + ) + + logger.debug("Creating fixed keys DataLoader.") + return DataLoader(train_set, batch_size=dataloader_info.batch_size, num_workers=dataloader_info.num_dataloaders) + + +def get_embeddings(embedding_encoder: EmbeddingEncoder, dataloader: DataLoader) -> torch.Tensor: + """ + input: embedding_encoder with downloaded model + output: embeddings Tensor + """ + assert embedding_encoder._model is not None + all_embeddings: Optional[torch.Tensor] = None + + embedding_encoder._model.model.eval() + embedding_encoder._model.model.embedding_recorder.start_recording() + + with torch.no_grad(): + for batch in dataloader: + data: Union[torch.Tensor, dict] + if isinstance(batch[1], torch.Tensor): + data = batch[1].to(embedding_encoder.device) + elif isinstance(batch[1], dict): + data: dict[str, torch.Tensor] = {} # type: ignore[no-redef] + for name, tensor in batch[1].items(): + data[name] = tensor.to(embedding_encoder.device) + else: + raise ValueError(f"data type {type(batch[1])} not supported") + + with torch.autocast(embedding_encoder.device_type, enabled=embedding_encoder.amp): + embedding_encoder._model.model(data) + embeddings = embedding_encoder._model.model.embedding_recorder.embedding + if all_embeddings is None: + all_embeddings = embeddings + else: + all_embeddings = torch.cat((all_embeddings, embeddings), 0) + + embedding_encoder._model.model.embedding_recorder.end_recording() + + return all_embeddings + + +def convert_tensor_to_df(t: torch.Tensor, column_name_prefix: Optional[str] = None) -> pd.DataFrame: + matrix_numpy = t.cpu().detach().numpy() + df = pd.DataFrame(matrix_numpy).astype("float64") + if column_name_prefix is not None: + df.columns = [column_name_prefix + str(x) for x in df.columns] + return df diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py index 3da959e25..5a102d616 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py @@ -301,7 +301,12 @@ def test__handle_new_data_batch_no_triggers(test_inform_selector: MagicMock): pe.pipeline_id = 42 batch = [(10, 1), (11, 2)] - with patch.object(pe.trigger, "inform", return_value=[]) as inform_mock: + with patch.object(pe.trigger, "inform") as inform_mock: + + def fake(self): + yield from [] + + inform_mock.side_effect = fake assert not pe._handle_new_data_batch(batch) inform_mock.assert_called_once_with(batch) diff --git a/modyn/tests/supervisor/internal/triggers/test_amounttrigger.py b/modyn/tests/supervisor/internal/triggers/test_amounttrigger.py index 7b681f1f5..40ef8e312 100644 --- a/modyn/tests/supervisor/internal/triggers/test_amounttrigger.py +++ b/modyn/tests/supervisor/internal/triggers/test_amounttrigger.py @@ -19,24 +19,24 @@ def test_init_fails_if_invalid() -> None: def test_inform() -> None: trigger = DataAmountTrigger({"data_points_for_trigger": 1}) # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([]) == [] - assert trigger.inform([(10, 1)]) == [0] - assert trigger.inform([(10, 1), (10, 1)]) == [0, 1] - assert trigger.inform([(10, 1), (10, 1), (10, 1)]) == [0, 1, 2] + assert list(trigger.inform([])) == [] + assert list(trigger.inform([(10, 1)])) == [0] + assert list(trigger.inform([(10, 1), (10, 1)])) == [0, 1] + assert list(trigger.inform([(10, 1), (10, 1), (10, 1)])) == [0, 1, 2] trigger = DataAmountTrigger({"data_points_for_trigger": 2}) # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([(10, 1)]) == [] - assert trigger.inform([(10, 1)]) == [0] - assert trigger.inform([(10, 1), (10, 1)]) == [1] - assert trigger.inform([(10, 1), (10, 1), (10, 1), (10, 1)]) == [1, 3] - assert trigger.inform([(10, 1), (10, 1), (10, 1)]) == [1] - assert trigger.inform([(10, 1)]) == [0] - assert trigger.inform([(10, 1), (10, 1), (10, 1), (10, 1), (10, 1)]) == [1, 3] - assert trigger.inform([(10, 1)]) == [0] + assert list(trigger.inform([(10, 1)])) == [] + assert list(trigger.inform([(10, 1)])) == [0] + assert list(trigger.inform([(10, 1), (10, 1)])) == [1] + assert list(trigger.inform([(10, 1), (10, 1), (10, 1), (10, 1)])) == [1, 3] + assert list(trigger.inform([(10, 1), (10, 1), (10, 1)])) == [1] + assert list(trigger.inform([(10, 1)])) == [0] + assert list(trigger.inform([(10, 1), (10, 1), (10, 1), (10, 1), (10, 1)])) == [1, 3] + assert list(trigger.inform([(10, 1)])) == [0] trigger = DataAmountTrigger({"data_points_for_trigger": 5}) # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([(10, 1), (10, 1), (10, 1), (10, 1)]) == [] - assert trigger.inform([(10, 1), (10, 1), (10, 1)]) == [0] - assert trigger.inform([(10, 1), (10, 1), (10, 1)]) == [2] + assert list(trigger.inform([(10, 1), (10, 1), (10, 1), (10, 1)])) == [] + assert list(trigger.inform([(10, 1), (10, 1), (10, 1)])) == [0] + assert list(trigger.inform([(10, 1), (10, 1), (10, 1)])) == [2] diff --git a/modyn/tests/supervisor/internal/triggers/test_datadrifttrigger.py b/modyn/tests/supervisor/internal/triggers/test_datadrifttrigger.py new file mode 100644 index 000000000..6ab793f29 --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/test_datadrifttrigger.py @@ -0,0 +1,211 @@ +# pylint: disable=unused-argument, no-name-in-module, no-value-for-parameter +import os +import pathlib +from typing import Optional +from unittest.mock import patch + +import pytest +from modyn.supervisor.internal.triggers import DataDriftTrigger +from modyn.supervisor.internal.triggers.embedding_encoder_utils import EmbeddingEncoderDownloader +from modyn.supervisor.internal.triggers.trigger_datasets import DataLoaderInfo + +BASEDIR: pathlib.Path = pathlib.Path(os.path.realpath(__file__)).parent / "test_eval_dir" +PIPELINE_ID = 42 +SAMPLE = (10, 1, 1) + + +def get_minimal_training_config() -> dict: + return { + "gpus": 1, + "device": "cpu", + "dataloader_workers": 1, + "use_previous_model": True, + "initial_model": "random", + "learning_rate": 0.1, + "batch_size": 42, + "optimizers": [ + {"name": "default1", "algorithm": "SGD", "source": "PyTorch", "param_groups": [{"module": "model"}]}, + ], + "optimization_criterion": {"name": "CrossEntropyLoss"}, + "checkpointing": {"activated": False}, + "selection_strategy": {"name": "NewDataStrategy", "maximum_keys_in_memory": 10}, + } + + +def get_minimal_evaluation_config() -> dict: + return { + "device": "cpu", + "datasets": [ + { + "dataset_id": "MNIST_eval", + "bytes_parser_function": "def bytes_parser_function(data: bytes) -> bytes:\n\treturn data", + "dataloader_workers": 2, + "batch_size": 64, + "metrics": [{"name": "Accuracy"}], + } + ], + } + + +def get_minimal_trigger_config() -> dict: + return {} + + +def get_minimal_pipeline_config() -> dict: + return { + "pipeline": {"name": "Test"}, + "model": {"id": "ResNet18"}, + "model_storage": {"full_model_strategy": {"name": "PyTorchFullModel"}}, + "training": get_minimal_training_config(), + "data": {"dataset_id": "test", "bytes_parser_function": "def bytes_parser_function(x):\n\treturn x"}, + "trigger": {"id": "DataDriftTrigger", "trigger_config": get_minimal_trigger_config()}, + } + + +def get_simple_system_config() -> dict: + return { + "storage": {"hostname": "test", "port": 42}, + "selector": {"hostname": "test", "port": 42}, + "model_storage": {"hostname": "test", "port": 42}, + } + + +def noop(self) -> None: + pass + + +def noop_embedding_encoder_downloader_constructor_mock( + self, + modyn_config: dict, + pipeline_id: int, + base_dir: pathlib.Path, + model_storage_address: str, +) -> None: + pass + + +def noop_dataloader_info_constructor_mock( + self, + pipeline_id: int, + dataset_id: str, + num_dataloaders: int, + batch_size: int, + bytes_parser: str, + transform_list: list[str], + storage_address: str, + selector_address: str, + num_prefetched_partitions: int, + parallel_prefetch_requests: int, + tokenizer: Optional[None], +) -> None: + pass + + +def test_initialization() -> None: + trigger = DataDriftTrigger({"data_points_for_detection": 42}) + assert trigger.detection_interval == 42 + assert trigger.previous_trigger_id is None + assert trigger.previous_model_id is None + assert not trigger.model_updated + assert not trigger.data_cache + assert trigger.leftover_data_points == 0 + + +def test_init_fails_if_invalid() -> None: + with pytest.raises(AssertionError, match="data_points_for_detection needs to be at least 1"): + DataDriftTrigger({"data_points_for_detection": 0}) + + with pytest.raises(AssertionError, match="sample_size needs to be at least 1"): + DataDriftTrigger({"sample_size": 0}) + + +@patch.object(EmbeddingEncoderDownloader, "__init__", noop_embedding_encoder_downloader_constructor_mock) +@patch.object(DataLoaderInfo, "__init__", noop_dataloader_info_constructor_mock) +def test_init_trigger() -> None: + trigger = DataDriftTrigger(get_minimal_trigger_config()) + # pylint: disable-next=use-implicit-booleaness-not-comparison + with patch("os.makedirs", return_value=None): + pipeline_config = get_minimal_pipeline_config() + modyn_config = get_simple_system_config() + trigger.init_trigger(PIPELINE_ID, pipeline_config, modyn_config, BASEDIR) + assert trigger.pipeline_id == PIPELINE_ID + assert trigger.pipeline_config == pipeline_config + assert trigger.modyn_config == modyn_config + assert trigger.base_dir == BASEDIR + assert isinstance(trigger.dataloader_info, DataLoaderInfo) + assert isinstance(trigger.encoder_downloader, EmbeddingEncoderDownloader) + + +def test_inform_previous_trigger_and_data_points() -> None: + trigger = DataDriftTrigger(get_minimal_trigger_config()) + # pylint: disable-next=use-implicit-booleaness-not-comparison + trigger.inform_previous_trigger_and_data_points(42, 42) + assert trigger.previous_trigger_id == 42 + assert trigger.previous_data_points == 42 + + +def test_inform_previous_model_id() -> None: + trigger = DataDriftTrigger(get_minimal_trigger_config()) + # pylint: disable-next=use-implicit-booleaness-not-comparison + trigger.inform_previous_model(42) + assert trigger.previous_model_id == 42 + + +@patch.object(DataDriftTrigger, "detect_drift", return_value=True) +def test_inform_always_drift(test_detect_drift) -> None: + trigger = DataDriftTrigger({"data_points_for_detection": 1}) + num_triggers = 0 + for _ in trigger.inform([SAMPLE, SAMPLE, SAMPLE, SAMPLE, SAMPLE]): + num_triggers += 1 + trigger.inform_previous_trigger_and_data_points(num_triggers, 42) + trigger.inform_previous_model(num_triggers) + # pylint: disable-next=use-implicit-booleaness-not-comparison + assert num_triggers == 5 + + trigger = DataDriftTrigger({"data_points_for_detection": 2}) + num_triggers = 0 + for _ in trigger.inform([SAMPLE, SAMPLE, SAMPLE, SAMPLE, SAMPLE]): + num_triggers += 1 + trigger.inform_previous_trigger_and_data_points(num_triggers, 42) + trigger.inform_previous_model(num_triggers) + # pylint: disable-next=use-implicit-booleaness-not-comparison + assert num_triggers == 2 + + trigger = DataDriftTrigger({"data_points_for_detection": 5}) + num_triggers = 0 + for _ in trigger.inform([SAMPLE, SAMPLE, SAMPLE, SAMPLE, SAMPLE]): + num_triggers += 1 + trigger.inform_previous_trigger_and_data_points(num_triggers, 42) + trigger.inform_previous_model(num_triggers) + # pylint: disable-next=use-implicit-booleaness-not-comparison + assert num_triggers == 1 + + +@patch.object(DataDriftTrigger, "detect_drift", return_value=False) +def test_inform_no_drift(test_detect_no_drift) -> None: + trigger = DataDriftTrigger({"data_points_for_detection": 1}) + num_triggers = 0 + for _ in trigger.inform([SAMPLE, SAMPLE, SAMPLE, SAMPLE, SAMPLE]): + num_triggers += 1 + trigger.inform_previous_trigger_and_data_points(num_triggers, 42) + trigger.inform_previous_model(num_triggers) + # pylint: disable-next=use-implicit-booleaness-not-comparison + assert num_triggers == 1 + + trigger = DataDriftTrigger({"data_points_for_detection": 2}) + num_triggers = 0 + for _ in trigger.inform([SAMPLE, SAMPLE, SAMPLE, SAMPLE, SAMPLE]): + num_triggers += 1 + trigger.inform_previous_trigger_and_data_points(num_triggers, 42) + trigger.inform_previous_model(num_triggers) + # pylint: disable-next=use-implicit-booleaness-not-comparison + assert num_triggers == 1 + + trigger = DataDriftTrigger({"data_points_for_detection": 5}) + num_triggers = 0 + for _ in trigger.inform([SAMPLE, SAMPLE, SAMPLE, SAMPLE, SAMPLE]): + num_triggers += 1 + trigger.inform_previous_trigger_and_data_points(num_triggers, 42) + trigger.inform_previous_model(num_triggers) + # pylint: disable-next=use-implicit-booleaness-not-comparison + assert num_triggers == 1 diff --git a/modyn/tests/supervisor/internal/triggers/test_timetrigger.py b/modyn/tests/supervisor/internal/triggers/test_timetrigger.py index 306759df2..56ae266e6 100644 --- a/modyn/tests/supervisor/internal/triggers/test_timetrigger.py +++ b/modyn/tests/supervisor/internal/triggers/test_timetrigger.py @@ -20,26 +20,26 @@ def test_inform() -> None: trigger = TimeTrigger({"trigger_every": "1000s"}) LABEL = 2 # pylint: disable=invalid-name # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([]) == [] + assert list(trigger.inform([])) == [] # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([(10, 0, LABEL)]) == [] + assert list(trigger.inform([(10, 0, LABEL)])) == [] # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([(10, 500, LABEL)]) == [] + assert list(trigger.inform([(10, 500, LABEL)])) == [] # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([(10, 999, LABEL)]) == [] - assert trigger.inform([(10, 1000, LABEL)]) == [-1] # Trigger includes 0, 500, 900, but not 1000 - assert trigger.inform([(10, 1500, LABEL), (10, 1600, LABEL), (10, 2000, LABEL)]) == [ + assert list(trigger.inform([(10, 999, LABEL)])) == [] + assert list(trigger.inform([(10, 1000, LABEL)])) == [-1] # Trigger includes 0, 500, 900, but not 1000 + assert list(trigger.inform([(10, 1500, LABEL), (10, 1600, LABEL), (10, 2000, LABEL)])) == [ 1 ] # 2000 enables us to know that 1600 should trigger! - assert trigger.inform([(10, 3000, LABEL), (10, 4000, LABEL)]) == [-1, 0] + assert list(trigger.inform([(10, 3000, LABEL), (10, 4000, LABEL)])) == [-1, 0] # pylint: disable-next=use-implicit-booleaness-not-comparison - assert trigger.inform([(10, 4100, LABEL), (10, 4200, LABEL)]) == [] - assert trigger.inform([(10, 5000, LABEL)]) == [-1] - assert trigger.inform([(10, 6000, LABEL), (10, 7000, LABEL), (10, 8000, LABEL), (10, 9000, LABEL)]) == [ + assert list(trigger.inform([(10, 4100, LABEL), (10, 4200, LABEL)])) == [] + assert list(trigger.inform([(10, 5000, LABEL)])) == [-1] + assert list(trigger.inform([(10, 6000, LABEL), (10, 7000, LABEL), (10, 8000, LABEL), (10, 9000, LABEL)])) == [ -1, 0, 1, 2, ] - assert trigger.inform([(10, 15000, LABEL)]) == [-1, -1, -1, -1, -1, -1] - assert trigger.inform([(10, 17000, LABEL), (10, 18000, LABEL)]) == [-1, -1, 0] + assert list(trigger.inform([(10, 15000, LABEL)])) == [-1, -1, -1, -1, -1, -1] + assert list(trigger.inform([(10, 17000, LABEL), (10, 18000, LABEL)])) == [-1, -1, 0] diff --git a/modyn/tests/supervisor/internal/triggers/trigger_datasets/test_fixed_keys_dataset.py b/modyn/tests/supervisor/internal/triggers/trigger_datasets/test_fixed_keys_dataset.py new file mode 100644 index 000000000..a1ff8fca9 --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/trigger_datasets/test_fixed_keys_dataset.py @@ -0,0 +1,288 @@ +# pylint: disable=unused-argument, no-name-in-module, no-value-for-parameter +import math +import platform +from unittest.mock import patch + +import grpc +import pytest +import torch +from modyn.models.tokenizers import DistilBertTokenizerTransform +from modyn.storage.internal.grpc.generated.storage_pb2 import GetRequest, GetResponse +from modyn.supervisor.internal.triggers.trigger_datasets import FixedKeysDataset +from torchvision import transforms + +DATASET_ID = "MNIST" +TRIGGER_ID = 42 +STORAGE_ADDR = "localhost:1234" +KEYS = list(range(10)) + + +def get_identity_bytes_parser(): + return "def bytes_parser_function(x):\n\treturn x" + + +def bytes_parser_function(data): + return data + + +class MockStorageStub: + def __init__(self, channel) -> None: + pass + + def Get(self, request: GetRequest): # pylint: disable=invalid-name + yield GetResponse( + samples=[key.to_bytes(2, "big") for key in request.keys], keys=request.keys, labels=[1] * len(request.keys) + ) + + +def test_invalid_bytes_parser(): + with pytest.raises(AssertionError): + FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser="", + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + )._init_transforms() + + with pytest.raises(ValueError): + FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser="bytes_parser_function=1", + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + )._init_transforms() + + +def test_init(): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser=get_identity_bytes_parser(), + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + tokenizer="DistilBertTokenizerTransform", + ) + + assert fixed_keys_dataset._trigger_id == TRIGGER_ID + assert fixed_keys_dataset._dataset_id == DATASET_ID + assert fixed_keys_dataset._first_call + assert fixed_keys_dataset._bytes_parser_function is None + assert fixed_keys_dataset._storagestub is None + assert isinstance(fixed_keys_dataset._tokenizer, DistilBertTokenizerTransform) + assert fixed_keys_dataset._keys == KEYS + + +@patch("modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.StorageStub", MockStorageStub) +@patch( + "modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.grpc_connection_established", + return_value=True, +) +@patch.object(grpc, "insecure_channel", return_value=None) +def test_init_grpc(test_insecure_channel, test_grpc_connection_established): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser=get_identity_bytes_parser(), + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + ) + + assert fixed_keys_dataset._storagestub is None + fixed_keys_dataset._init_grpc() + assert isinstance(fixed_keys_dataset._storagestub, MockStorageStub) + + +@patch("modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.StorageStub", MockStorageStub) +@patch( + "modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.grpc_connection_established", + return_value=True, +) +@patch.object(grpc, "insecure_channel", return_value=None) +def test_get_data_from_storage(test_insecure_channel, test_grpc_connection_established): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser=get_identity_bytes_parser(), + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + ) + + fixed_keys_dataset._init_grpc() + all_data = fixed_keys_dataset._get_data_from_storage(KEYS) + for data in all_data: + for i, d in enumerate(data): + assert d == (i, i.to_bytes(2, "big"), 1) + + +def test_init_transforms(): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser="def bytes_parser_function(x):\n\treturn int.from_bytes(x, 'big')", + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + ) + + assert fixed_keys_dataset._bytes_parser_function is None + assert fixed_keys_dataset._transform is None + + with patch.object(fixed_keys_dataset, "_setup_composed_transform") as tv_ds: + fixed_keys_dataset._init_transforms() + assert fixed_keys_dataset._bytes_parser_function is not None + assert fixed_keys_dataset._bytes_parser_function(b"\x03") == 3 + + assert fixed_keys_dataset._transform is not None + + tv_ds.assert_called_once() + + +@pytest.mark.parametrize( + "serialized_transforms,transforms_list", + [ + pytest.param( + [ + "transforms.RandomResizedCrop(224)", + "transforms.RandomAffine(degrees=(0, 90))", + "transforms.ToTensor()", + "transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])", + ], + [ + transforms.RandomResizedCrop(224), + transforms.RandomAffine(degrees=(0, 90)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), + ], + ) + ], +) +def test__setup_composed_transform(serialized_transforms, transforms_list): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser=get_identity_bytes_parser(), + serialized_transforms=list(serialized_transforms), + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + tokenizer="DistilBertTokenizerTransform", + ) + fixed_keys_dataset._bytes_parser_function = bytes_parser_function + fixed_keys_dataset._setup_composed_transform() + assert isinstance(fixed_keys_dataset._transform.transforms, list) + assert fixed_keys_dataset._transform.transforms[0].__name__ == "bytes_parser_function" + for transform1, transform2 in zip(fixed_keys_dataset._transform.transforms[1:-1], transforms_list): + assert transform1.__dict__ == transform2.__dict__ + assert isinstance(fixed_keys_dataset._transform.transforms[-1], DistilBertTokenizerTransform) + + +@patch("modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.StorageStub", MockStorageStub) +@patch( + "modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.grpc_connection_established", + return_value=True, +) +@patch.object(grpc, "insecure_channel", return_value=None) +def test_dataset_iter(test_insecure_channel, test_grpc_connection_established): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser=get_identity_bytes_parser(), + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + ) + + all_data = list(fixed_keys_dataset) + assert [x[0] for x in all_data] == KEYS + assert [x[1] for x in all_data] == [x.to_bytes(2, "big") for x in KEYS] + assert [x[2] for x in all_data] == [1] * len(KEYS) + + +@patch("modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.StorageStub", MockStorageStub) +@patch( + "modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.grpc_connection_established", + return_value=True, +) +@patch.object(grpc, "insecure_channel", return_value=None) +def test_dataset_iter_with_parsing(test_insecure_channel, test_grpc_connection_established): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser="def bytes_parser_function(x):\n\treturn int.from_bytes(x, 'big')", + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + ) + dataset_iter = iter(fixed_keys_dataset) + all_data = list(dataset_iter) + assert [x[0] for x in all_data] == KEYS + assert [x[1] for x in all_data] == KEYS + assert [x[2] for x in all_data] == [1] * len(KEYS) + + +@patch("modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.StorageStub", MockStorageStub) +@patch( + "modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.grpc_connection_established", + return_value=True, +) +@patch.object(grpc, "insecure_channel", return_value=None) +def test_dataloader_dataset(test_insecure_channel, test_grpc_connection_established): + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser="def bytes_parser_function(x):\n\treturn int.from_bytes(x, 'big')", + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=KEYS, + ) + + dataloader = torch.utils.data.DataLoader(fixed_keys_dataset, batch_size=2) + for i, batch in enumerate(dataloader): + assert len(batch) == 3 + assert i < math.floor(len(KEYS) / 2) + assert batch[0].tolist() == [2 * i, 2 * i + 1] + assert torch.equal(batch[1], torch.Tensor([2 * i, 2 * i + 1])) + assert torch.equal(batch[2], torch.ones(2, dtype=torch.float64)) + + +@patch("modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.StorageStub", MockStorageStub) +@patch( + "modyn.supervisor.internal.triggers.trigger_datasets.fixed_keys_dataset.grpc_connection_established", + return_value=True, +) +@patch.object(grpc, "insecure_channel", return_value=None) +def test_dataloader_dataset_multi_worker(test_insecure_channel, test_grpc_connection_established): + if platform.system() == "Darwin": + # On macOS, spawn is the default, which loses the mocks + # Hence the test does not work on macOS, only on Linux. + return + + fixed_keys_dataset = FixedKeysDataset( + dataset_id=DATASET_ID, + bytes_parser="def bytes_parser_function(x):\n\treturn int.from_bytes(x, 'big')", + serialized_transforms=[], + storage_address=STORAGE_ADDR, + trigger_id=TRIGGER_ID, + keys=list(range(16)), + ) + dataloader = torch.utils.data.DataLoader(fixed_keys_dataset, batch_size=4, num_workers=4) + + data = list(dataloader) + data.sort(key=lambda batch_data: batch_data[0].min()) + + batch_num = -1 + for batch_num, batch in enumerate(data): + assert len(batch) == 3 + assert batch[0].tolist() == [4 * batch_num, 4 * batch_num + 1, 4 * batch_num + 2, 4 * batch_num + 3] + assert torch.equal( + batch[1], torch.Tensor([4 * batch_num, 4 * batch_num + 1, 4 * batch_num + 2, 4 * batch_num + 3]) + ) + assert torch.equal(batch[2], torch.ones(4, dtype=torch.float64)) + + assert batch_num == 3 diff --git a/modyn/tests/supervisor/internal/triggers/trigger_datasets/test_online_trigger_dataset.py b/modyn/tests/supervisor/internal/triggers/trigger_datasets/test_online_trigger_dataset.py new file mode 100644 index 000000000..cd352544d --- /dev/null +++ b/modyn/tests/supervisor/internal/triggers/trigger_datasets/test_online_trigger_dataset.py @@ -0,0 +1,84 @@ +# pylint: disable=unused-argument, no-name-in-module, no-value-for-parameter +import pathlib +from typing import Generator, Optional +from unittest.mock import patch + +from modyn.supervisor.internal.triggers.trigger_datasets import OnlineTriggerDataset +from modyn.trainer_server.internal.dataset.online_dataset import OnlineDataset + +NUM_SAMPLES = 10 + + +def get_mock_bytes_parser(): + return "def bytes_parser_function(x):\n\treturn x" + + +def bytes_parser_function(data): + return data + + +def noop_constructor_mock( + self, + pipeline_id: int, + trigger_id: int, + dataset_id: str, + bytes_parser: str, + serialized_transforms: list[str], + storage_address: str, + selector_address: str, + training_id: int, + num_prefetched_partitions: int, + parallel_prefetch_requests: int, + tokenizer: Optional[str], + log_path: Optional[pathlib.Path], +) -> None: + pass + + +def mock_data_generator(self) -> Generator: + yield from list(range(NUM_SAMPLES)) + + +def test_init(): + online_trigger_dataset = OnlineTriggerDataset( + pipeline_id=1, + trigger_id=1, + dataset_id="MNIST", + bytes_parser=get_mock_bytes_parser(), + serialized_transforms=[], + storage_address="localhost:1234", + selector_address="localhost:1234", + training_id=42, + tokenizer=None, + num_prefetched_partitions=1, + parallel_prefetch_requests=1, + sample_prob=0.5, + ) + assert online_trigger_dataset._pipeline_id == 1 + assert online_trigger_dataset._trigger_id == 1 + assert online_trigger_dataset._dataset_id == "MNIST" + assert online_trigger_dataset._first_call + assert online_trigger_dataset._bytes_parser_function is None + assert online_trigger_dataset._storagestub is None + assert online_trigger_dataset._sample_prob == 0.5 + + +@patch.object(OnlineDataset, "__iter__", mock_data_generator) +def test_dataset_iter(): + online_trigger_dataset = OnlineTriggerDataset( + pipeline_id=1, + trigger_id=1, + dataset_id="MNIST", + bytes_parser=get_mock_bytes_parser(), + serialized_transforms=[], + storage_address="localhost:1234", + selector_address="localhost:1234", + training_id=42, + tokenizer=None, + num_prefetched_partitions=1, + parallel_prefetch_requests=1, + sample_prob=0.5, + ) + + all_trigger_data = list(online_trigger_dataset) + assert len(all_trigger_data) < NUM_SAMPLES