From 17045002796df75ed197bda920a26aaf713b8c33 Mon Sep 17 00:00:00 2001 From: Ilyas Moutawwakil <57442720+IlyasMoutawwakil@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:42:04 +0100 Subject: [PATCH] Adding latency and memory to energy star (#302) --- .../scenarios/energy_star/config.py | 4 + .../scenarios/energy_star/scenario.py | 323 ++++++++++------- .../scenarios/inference/scenario.py | 327 +++++++++--------- .../scenarios/training/scenario.py | 2 +- optimum_benchmark/trackers/energy.py | 22 +- optimum_benchmark/trackers/latency.py | 12 + optimum_benchmark/trackers/memory.py | 2 + tests/test_energy_star.py | 3 + 8 files changed, 415 insertions(+), 280 deletions(-) diff --git a/optimum_benchmark/scenarios/energy_star/config.py b/optimum_benchmark/scenarios/energy_star/config.py index da4ffa10..dc5427b9 100644 --- a/optimum_benchmark/scenarios/energy_star/config.py +++ b/optimum_benchmark/scenarios/energy_star/config.py @@ -55,6 +55,10 @@ class EnergyStarConfig(ScenarioConfig): audio_column_name: str = field(default="audio", metadata={"help": "Name of the column with the audio."}) # scenario options + energy: bool = field(default=True, metadata={"help": "Whether to measure energy."}) + memory: bool = field(default=False, metadata={"help": "Whether to measure memory."}) + latency: bool = field(default=False, metadata={"help": "Whether to measure latency."}) + warmup_runs: int = field(default=10, metadata={"help": "Number of warmup runs to perform before scenarioing"}) # methods kwargs diff --git a/optimum_benchmark/scenarios/energy_star/scenario.py b/optimum_benchmark/scenarios/energy_star/scenario.py index 8345cae0..4358c405 100644 --- a/optimum_benchmark/scenarios/energy_star/scenario.py +++ b/optimum_benchmark/scenarios/energy_star/scenario.py @@ -1,3 +1,5 @@ +from contextlib import ExitStack, contextmanager + from datasets import load_dataset from tqdm import tqdm @@ -6,6 +8,8 @@ from ...preprocessors.dataset_preprocessor import TASKS_TO_PREPROCESSORS from ...task_utils import IMAGE_DIFFUSION_TASKS, TEXT_GENERATION_TASKS from ...trackers.energy import Efficiency, EnergyTracker +from ...trackers.latency import LatencyTracker, Throughput +from ...trackers.memory import MemoryTracker from ..base import Scenario from .config import EnergyStarConfig @@ -42,6 +46,12 @@ DECODE_EFFICIENCY_UNIT = "tokens/kWh" CALL_EFFICIENCY_UNIT = "images/kWh" +PREPROCESS_THROUGHPUT_UNIT = "samples/s" +FORWARD_THROUGHPUT_UNIT = "samples/s" +PREFILL_THROUGHPUT_UNIT = "samples/s" +DECODE_THROUGHPUT_UNIT = "tokens/s" +CALL_THROUGHPUT_UNIT = "images/s" + class EnergyStarScenario(Scenario[EnergyStarConfig]): NAME = "energy-star" @@ -55,7 +65,6 @@ def run(self, backend: Backend[BackendConfigT]) -> BenchmarkReport: if self.backend.config.task in TEXT_GENERATION_TASKS: self.logger.info("\t+ Updating Text Generation kwargs with default values") self.config.generate_kwargs = {**TEXT_GENERATION_DEFAULT_KWARGS, **self.config.generate_kwargs} - self.prefill_kwargs = {**self.config.generate_kwargs, **TEXT_GENERATION_PREFILL_OVERRIDES} self.logger.info("\t+ Initializing Text Generation report") self.report = BenchmarkReport.from_list( targets=["load_dataset", "preprocess_dataset", "load_model", "prefill", "decode"] @@ -73,180 +82,262 @@ def run(self, backend: Backend[BackendConfigT]) -> BenchmarkReport: targets=["load_dataset", "preprocess_dataset", "load_model", "forward"] ) - self.energy_tracker = EnergyTracker( - backend=backend.config.name, - device=backend.config.device, - device_ids=backend.config.device_ids, - ) + if self.config.latency: + self.logger.info("\t+ Initializing Latency tracker") + self.latency_tracker = LatencyTracker( + backend=self.backend.config.name, + device=self.backend.config.device, + ) + if self.config.memory: + self.logger.info("\t+ Initializing Memory tracker") + self.memory_tracker = MemoryTracker( + backend=self.backend.config.name, + device=self.backend.config.device, + device_ids=self.backend.config.device_ids, + ) + if self.config.energy: + self.logger.info("\t+ Initializing Energy tracker") + self.energy_tracker = EnergyTracker( + backend=self.backend.config.name, + device=self.backend.config.device, + device_ids=self.backend.config.device_ids, + ) - self.run_dataset_loading_energy_tracking() - self.run_dataset_preprocessing_energy_tracking(backend) + # we start with loading/preprocessing the dataset as it takes no vram + self.run_dataset_loading_tracking() + self.run_dataset_preprocessing_tracking() + self.run_model_loading_tracking() - self.logger.info("\t+ Preparing sample inputs for model warmup") - self.sample_inputs = self.dataset[: self.config.input_shapes["batch_size"]] - self.sample_inputs = backend.prepare_inputs(self.sample_inputs) + if self.config.warmup_runs > 0: + self.logger.info("\t+ Preparing sample inputs for warmup") + self.sample_inputs = self.dataset[: self.config.input_shapes["batch_size"]] + self.sample_inputs = self.backend.prepare_inputs(self.sample_inputs) - self.run_model_loading_energy_tracking(backend) + if self.backend.config.task in TEXT_GENERATION_TASKS: + self.warmup_text_generation() + elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: + self.warmup_image_diffusion() + else: + self.warmup_inference() if self.backend.config.task in TEXT_GENERATION_TASKS: - self.warmup_text_generation(backend) - self.run_text_generation_energy_tracking(backend) + self.run_text_generation_tracking() elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: - self.warmup_image_diffusion(backend) - self.run_image_diffusion_energy_tracking(backend) + self.run_image_diffusion_tracking() else: - self.warmup_inference(backend) - self.run_inference_energy_tracking(backend) + self.run_inference_tracking() return self.report + @contextmanager + def track(self, task_name: str): + with ExitStack() as context_stack: + if self.config.energy: + context_stack.enter_context(self.energy_tracker.track(task_name=task_name)) + if self.config.memory: + context_stack.enter_context(self.memory_tracker.track()) + if self.config.latency: + self.latency_tracker.reset() + context_stack.enter_context(self.latency_tracker.track()) + yield + # Dataset loading tracking - def run_dataset_loading_energy_tracking(self): - self.logger.info("\t+ Running dataset loading energy tracking") + def run_dataset_loading_tracking(self): + self.logger.info("\t+ Running dataset loading tracking") - with self.energy_tracker.track(file_prefix="load_dataset"): + with self.track(task_name="load_dataset"): self.dataset = load_dataset( - self.config.dataset_name, self.config.dataset_config, split=self.config.dataset_split + self.config.dataset_name, + self.config.dataset_config, + split=self.config.dataset_split, ) - self.report.load_dataset.energy = self.energy_tracker.get_energy() + if self.config.energy: + self.report.load_dataset.energy = self.energy_tracker.get_energy() + if self.config.latency: + self.report.load_dataset.latency = self.latency_tracker.get_latency() + if self.config.memory: + self.report.load_dataset.memory = self.memory_tracker.get_max_memory() # Dataset preprocessing tracking - def run_dataset_preprocessing_energy_tracking(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Running dataset preprocessing energy tracking") + def run_dataset_preprocessing_tracking(self): + self.logger.info("\t+ Running dataset preprocessing tracking") - with self.energy_tracker.track(file_prefix="preprocess_dataset"): + with self.track(task_name="preprocess_dataset"): self.dataset = TASKS_TO_PREPROCESSORS[self.backend.config.task]( dataset=self.dataset, scenario_config=self.config, - pretrained_config=backend.pretrained_config, - pretrained_processor=backend.pretrained_processor, + pretrained_config=self.backend.pretrained_config, + pretrained_processor=self.backend.pretrained_processor, ) - preprocess_energy = self.energy_tracker.get_energy() - preprocess_volume = self.dataset_preprocess_volume - - self.report.preprocess_dataset.energy = preprocess_energy - self.report.preprocess_dataset.efficiency = Efficiency.from_energy( - preprocess_energy, - preprocess_volume, - unit=PREPROCESS_EFFICIENCY_UNIT, - ) + if self.config.energy: + preprocess_energy = self.energy_tracker.get_energy() + preprocess_volume = self.dataset_preprocess_volume + self.report.preprocess_dataset.energy = preprocess_energy + self.report.preprocess_dataset.efficiency = Efficiency.from_energy( + preprocess_energy, preprocess_volume, unit=PREPROCESS_EFFICIENCY_UNIT + ) + if self.config.latency: + preprocess_latency = self.latency_tracker.get_latency() + preprocess_volume = self.dataset_preprocess_volume + self.report.preprocess_dataset.latency = preprocess_latency + self.report.preprocess_dataset.throughput = Throughput.from_latency( + preprocess_latency, preprocess_volume, unit=PREPROCESS_THROUGHPUT_UNIT + ) + if self.config.memory: + self.report.preprocess_dataset.memory = self.memory_tracker.get_max_memory() # Model loading tracking - def run_model_loading_energy_tracking(self, backend: Backend[BackendConfigT]): + def run_model_loading_tracking(self): self.logger.info("\t+ Running model loading energy tracking") - with self.energy_tracker.track(file_prefix="load_model"): - backend.load() + with self.track(task_name="load_model"): + self.backend.load() - self.report.load_model.energy = self.energy_tracker.get_energy() + if self.config.latency: + self.report.load_model.latency = self.latency_tracker.get_latency() + if self.config.memory: + self.report.load_model.memory = self.memory_tracker.get_max_memory() + if self.config.energy: + self.report.load_model.energy = self.energy_tracker.get_energy() # Text Generation warmup - def warmup_text_generation(self, backend: Backend[BackendConfigT]): + def warmup_text_generation(self): + warmup_kwargs = {**self.config.generate_kwargs, **TEXT_GENERATION_WARMUP_OVERRIDES} self.logger.info("\t+ Warming up backend for Text Generation") - backend.generate(self.sample_inputs, self.config.generate_kwargs) + self.backend.generate(self.sample_inputs, self.config.generate_kwargs) for _ in range(self.config.warmup_runs): - backend.generate(self.sample_inputs, {**self.config.generate_kwargs, **TEXT_GENERATION_WARMUP_OVERRIDES}) + self.backend.generate(self.sample_inputs, warmup_kwargs) # Image Diffusion warmup - def warmup_image_diffusion(self, backend: Backend[BackendConfigT]): + def warmup_image_diffusion(self): + warmup_kwargs = {**self.config.call_kwargs, **IMAGE_DIFFUSION_WARMUP_OVERRIDES} self.logger.info("\t+ Warming up backend for Image Diffusion") - backend.call(self.sample_inputs, self.config.call_kwargs) + self.backend.call(self.sample_inputs, self.config.call_kwargs) for _ in range(self.config.warmup_runs): - backend.call(self.sample_inputs, {**self.config.call_kwargs, **IMAGE_DIFFUSION_WARMUP_OVERRIDES}) + self.backend.call(self.sample_inputs, warmup_kwargs) # Inference warmup - def warmup_inference(self, backend: Backend[BackendConfigT]): + def warmup_inference(self): self.logger.info("\t+ Warming up backend for Inference") for _ in range(self.config.warmup_runs): - backend.forward(self.sample_inputs, self.config.forward_kwargs) + self.backend.forward(self.sample_inputs, self.config.forward_kwargs) - # Text Generation energy tracking - def run_text_generation_energy_tracking(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Running Text Generation energy tracking") + # Text Generation tracking + def run_text_generation_tracking(self): + self.logger.info("\t+ Running Text Generation tracking") - with self.energy_tracker.track(file_prefix="prefill"): - for i in tqdm(range(0, self.config.num_samples, self.config.input_shapes["batch_size"])): - inputs = backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) - backend.prefill(inputs, self.prefill_kwargs) - - prefill_energy = self.energy_tracker.get_energy() - prefill_volume = self.dataset_prefill_volume - - self.report.prefill.energy = prefill_energy - self.report.prefill.efficiency = Efficiency.from_energy( - prefill_energy, prefill_volume, unit=PREFILL_EFFICIENCY_UNIT - ) + prefill_kwargs = {**self.config.generate_kwargs, **TEXT_GENERATION_PREFILL_OVERRIDES} - with self.energy_tracker.track(file_prefix="generate"): + with self.track(task_name="prefill"): for i in tqdm(range(0, self.config.num_samples, self.config.input_shapes["batch_size"])): - inputs = backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) - backend.generate(inputs, self.config.generate_kwargs) - - generate_energy = self.energy_tracker.get_energy() - decode_energy = generate_energy - prefill_energy - decode_volume = self.dataset_decode_volume - - self.report.decode.energy = decode_energy - self.report.decode.efficiency = Efficiency.from_energy( - decode_energy, - decode_volume, - unit=DECODE_EFFICIENCY_UNIT, - ) - - # Image Diffusion energy tracking - def run_image_diffusion_energy_tracking(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Running Image Diffusion energy tracking") + inputs = self.backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) + self.backend.prefill(inputs, prefill_kwargs) + + if self.config.energy: + prefill_energy = self.energy_tracker.get_energy() + decode_energy = self.dataset_prefill_volume + self.report.prefill.energy = prefill_energy + self.report.prefill.efficiency = Efficiency.from_energy( + prefill_energy, decode_energy, unit=PREFILL_EFFICIENCY_UNIT + ) + if self.config.latency: + prefill_latency = self.latency_tracker.get_latency() + prefill_volume = self.dataset_prefill_volume + self.report.prefill.latency = prefill_latency + self.report.prefill.throughput = Throughput.from_latency( + prefill_latency, prefill_volume, unit=PREFILL_THROUGHPUT_UNIT + ) + if self.config.memory: + self.report.prefill.memory = self.memory_tracker.get_max_memory() - with self.energy_tracker.track(file_prefix="call"): + with self.track(task_name="generate"): for i in tqdm(range(0, self.config.num_samples, self.config.input_shapes["batch_size"])): - inputs = backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) - backend.call(inputs, self.config.call_kwargs) - - call_energy = self.energy_tracker.get_energy() - call_volume = self.dataset_call_volume - - self.report.call.energy = call_energy - self.report.call.efficiency = Efficiency.from_energy( - call_energy, - call_volume, - unit=CALL_EFFICIENCY_UNIT, - ) + inputs = self.backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) + self.backend.generate(inputs, self.config.generate_kwargs) + + if self.config.energy: + generate_energy = self.energy_tracker.get_energy() + decode_energy = generate_energy - prefill_energy + decode_volume = self.dataset_decode_volume + self.report.decode.energy = decode_energy + self.report.decode.efficiency = Efficiency.from_energy( + decode_energy, decode_volume, unit=DECODE_EFFICIENCY_UNIT + ) + if self.config.latency: + generate_latency = self.latency_tracker.get_latency() + decode_latency = generate_latency - prefill_latency + decode_volume = self.dataset_decode_volume + self.report.decode.latency = decode_latency + self.report.decode.throughput = Throughput.from_latency( + decode_latency, decode_volume, unit=DECODE_THROUGHPUT_UNIT + ) + if self.config.memory: + self.report.decode.memory = self.memory_tracker.get_max_memory() - # Inference energy tracking - def run_inference_energy_tracking(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Running Inference energy tracking") + # Image Diffusion tracking + def run_image_diffusion_tracking(self): + self.logger.info("\t+ Running Image Diffusion tracking") - with self.energy_tracker.track(file_prefix="forward"): + with self.track(task_name="call"): for i in tqdm(range(0, self.config.num_samples, self.config.input_shapes["batch_size"])): - inputs = backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) - backend.forward(inputs, self.config.forward_kwargs) - - forward_energy = self.energy_tracker.get_energy() - forward_volume = self.dataset_forward_volume - - self.report.forward.energy = forward_energy - self.report.forward.efficiency = Efficiency.from_energy( - forward_energy, - forward_volume, - unit=FORWARD_EFFICIENCY_UNIT, - ) + inputs = self.backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) + self.backend.call(inputs, self.config.call_kwargs) + + if self.config.energy: + call_energy = self.energy_tracker.get_energy() + call_volume = self.dataset_call_volume + self.report.call.energy = call_energy + self.report.call.efficiency = Efficiency.from_energy(call_energy, call_volume, unit=CALL_EFFICIENCY_UNIT) + if self.config.latency: + call_latency = self.latency_tracker.get_latency() + call_volume = self.dataset_call_volume + self.report.call.latency = call_latency + self.report.call.throughput = Throughput.from_latency(call_latency, call_volume, unit=CALL_THROUGHPUT_UNIT) + if self.config.memory: + self.report.call.memory = self.memory_tracker.get_max_memory() + + # Inference tracking + def run_inference_tracking(self): + self.logger.info("\t+ Running Inference tracking") + + with self.track(task_name="forward"): + for i in tqdm(range(0, self.config.num_samples, self.config.input_shapes["batch_size"])): + inputs = self.backend.prepare_inputs(self.dataset[i : i + self.config.input_shapes["batch_size"]]) + self.backend.forward(inputs, self.config.forward_kwargs) + + if self.config.energy: + forward_energy = self.energy_tracker.get_energy() + forward_volume = self.dataset_forward_volume + self.report.forward.energy = forward_energy + self.report.forward.efficiency = Efficiency.from_energy( + forward_energy, forward_volume, unit=FORWARD_EFFICIENCY_UNIT + ) + if self.config.latency: + forward_latency = self.latency_tracker.get_latency() + forward_volume = self.dataset_forward_volume + self.report.forward.latency = forward_latency + self.report.forward.throughput = Throughput.from_latency( + forward_latency, forward_volume, unit=FORWARD_THROUGHPUT_UNIT + ) + if self.config.memory: + self.report.forward.memory = self.memory_tracker.get_max_memory() @property - def dataset_preprocess_volume(self) -> int: # in samples + def dataset_preprocess_volume(self) -> int: # in terms of processed samples return self.config.num_samples @property - def dataset_forward_volume(self) -> int: # in samples + def dataset_forward_volume(self) -> int: # in terms of processed samples return self.config.num_samples @property - def dataset_prefill_volume(self) -> int: # in samples + def dataset_prefill_volume(self) -> int: # in terms of processed samples return self.config.num_samples @property - def dataset_decode_volume(self) -> int: # in tokens + def dataset_decode_volume(self) -> int: # in terms of generated tokens return ( self.config.num_samples * self.config.generate_kwargs["num_beams"] # at each beam stage there are num_beams tokens generated @@ -254,7 +345,7 @@ def dataset_decode_volume(self) -> int: # in tokens ) @property - def dataset_call_volume(self) -> int: # in images + def dataset_call_volume(self) -> int: # in terms of generated images if self.backend.config.task == "text-to-image": return self.config.num_samples * self.config.call_kwargs["num_images_per_prompt"] else: diff --git a/optimum_benchmark/scenarios/inference/scenario.py b/optimum_benchmark/scenarios/inference/scenario.py index c7faffed..4a804b5f 100644 --- a/optimum_benchmark/scenarios/inference/scenario.py +++ b/optimum_benchmark/scenarios/inference/scenario.py @@ -66,7 +66,10 @@ def run(self, backend: Backend[BackendConfigT]) -> BenchmarkReport: self.logger.info("\t+ Updating Text Generation kwargs with default values") self.config.generate_kwargs = {**TEXT_GENERATION_DEFAULT_KWARGS, **self.config.generate_kwargs} self.logger.info("\t+ Initializing Text Generation report") - self.report = BenchmarkReport.from_list(targets=["load_model", "prefill", "decode", "per_token"]) + if self.backend.config.name in PER_TOKEN_BACKENDS: + self.report = BenchmarkReport.from_list(targets=["load_model", "prefill", "decode", "per_token"]) + else: + self.report = BenchmarkReport.from_list(targets=["load_model", "prefill", "decode"]) elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: self.logger.info("\t+ Updating Image Diffusion kwargs with default values") self.config.call_kwargs = {**IMAGE_DIFFUSION_DEFAULT_KWARGS, **self.config.call_kwargs} @@ -76,163 +79,169 @@ def run(self, backend: Backend[BackendConfigT]) -> BenchmarkReport: self.logger.info("\t+ Initializing Inference report") self.report = BenchmarkReport.from_list(targets=["load_model", "forward"]) - self.run_model_loading_tracking(backend) + if self.config.latency: + self.logger.info("\t+ Initializing Latency tracker") + self.latency_tracker = LatencyTracker(backend=self.backend.config.name, device=self.backend.config.device) + if self.backend.config.task in TEXT_GENERATION_TASKS and self.backend.config.name in PER_TOKEN_BACKENDS: + self.logger.info("\t+ Initializing Per-Token Latency tracker") + self.per_token_latency_tracker = PerTokenLatencyLogitsProcessor( + backend=self.backend.config.name, device=self.backend.config.device + ) + if self.config.memory: + self.logger.info("\t+ Initializing Memory tracker") + self.memory_tracker = MemoryTracker( + backend=self.backend.config.name, + device=self.backend.config.device, + device_ids=self.backend.config.device_ids, + ) + if self.config.energy: + self.logger.info("\t+ Initializing Energy tracker") + self.energy_tracker = EnergyTracker( + backend=self.backend.config.name, + device=self.backend.config.device, + device_ids=self.backend.config.device_ids, + ) + + self.run_model_loading_tracking() - self.logger.info("\t+ Creating input generator") - self.input_generator = InputGenerator( + self.logger.info(f"\t+ Generating inputs for task {self.backend.config.task}") + self.inputs = InputGenerator( task=self.backend.config.task, - model_shapes=backend.model_shapes, + model_shapes=self.backend.model_shapes, + model_type=self.backend.config.model_type, input_shapes=self.config.input_shapes, - model_type=backend.config.model_type, - ) - self.logger.info("\t+ Generating inputs") - self.inputs = self.input_generator() - self.logger.info("\t+ Preparing inputs for backend") - self.inputs = backend.prepare_inputs(inputs=self.inputs) - - if self.config.latency or self.config.energy: - # latency and energy are metrics that require some warmup - if self.config.warmup_runs > 0: - if self.backend.config.task in TEXT_GENERATION_TASKS: - self.warmup_text_generation(backend) - elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: - self.warmup_image_diffusion(backend) - else: - self.warmup_inference(backend) + )() + self.logger.info(f"\t+ Preparing inputs for backend {self.backend.config.name}") + self.inputs = self.backend.prepare_inputs(inputs=self.inputs) + + if self.config.warmup_runs > 0: + if self.backend.config.task in TEXT_GENERATION_TASKS: + self.warmup_text_generation() + elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: + self.warmup_image_diffusion() + else: + self.warmup_inference() if self.config.latency: if self.backend.config.task in TEXT_GENERATION_TASKS: - if backend.config.name in PER_TOKEN_BACKENDS: - self.run_per_token_text_generation_latency_tracking(backend) + if self.backend.config.name in PER_TOKEN_BACKENDS: + self.run_per_token_text_generation_latency_tracking() else: - self.run_text_generation_latency_tracking(backend) + self.run_text_generation_latency_tracking() elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: - self.run_image_diffusion_latency_tracking(backend) + self.run_image_diffusion_latency_tracking() else: - self.run_latency_inference_tracking(backend) + self.run_inference_latency_tracking() if self.config.memory: if self.backend.config.task in TEXT_GENERATION_TASKS: - self.run_text_generation_memory_tracking(backend) + self.run_text_generation_memory_tracking() elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: - self.run_image_diffusion_memory_tracking(backend) + self.run_image_diffusion_memory_tracking() else: - self.run_inference_memory_tracking(backend) + self.run_inference_memory_tracking() if self.config.energy: if self.backend.config.task in TEXT_GENERATION_TASKS: - self.run_text_generation_energy_tracking(backend) + self.run_text_generation_energy_tracking() elif self.backend.config.task in IMAGE_DIFFUSION_TASKS: - self.run_image_diffusion_energy_tracking(backend) + self.run_image_diffusion_energy_tracking() else: - self.run_inference_energy_tracking(backend) + self.run_inference_energy_tracking() return self.report - # Warmup - def warmup_text_generation(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Warming up backend for Text Generation") - _ = backend.generate(self.inputs, self.config.generate_kwargs) - for _ in range(self.config.warmup_runs): - _ = backend.generate(self.inputs, {**self.config.generate_kwargs, **TEXT_GENERATION_WARMUP_OVERRIDES}) - - def warmup_image_diffusion(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Warming up backend for Image Diffusion") - _ = backend.call(self.inputs, self.config.call_kwargs) - for _ in range(self.config.warmup_runs): - _ = backend.call(self.inputs, {**self.config.call_kwargs, **IMAGE_DIFFUSION_WARMUP_OVERRIDES}) - - def warmup_inference(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Warming up backend for Inference") - for _ in range(self.config.warmup_runs): - _ = backend.forward(self.inputs, self.config.forward_kwargs) - - # Loading tracking - def run_model_loading_tracking(self, backend: Backend[BackendConfigT]): + # Model loading tracking + def run_model_loading_tracking(self): self.logger.info("\t+ Running model loading tracking") - if self.config.memory: - memory_tracker = MemoryTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) - if self.config.latency: - latency_tracker = LatencyTracker(backend=backend.config.name, device=backend.config.device) - if self.config.energy: - energy_tracker = EnergyTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) - with ExitStack() as context_stack: if self.config.energy: - context_stack.enter_context(energy_tracker.track()) + context_stack.enter_context(self.energy_tracker.track(task_name="load_model")) if self.config.memory: - context_stack.enter_context(memory_tracker.track()) + context_stack.enter_context(self.memory_tracker.track()) if self.config.latency: - context_stack.enter_context(latency_tracker.track()) + self.latency_tracker.reset() + context_stack.enter_context(self.latency_tracker.track()) - backend.load() + self.backend.load() if self.config.latency: - self.report.load_model.latency = latency_tracker.get_latency() + self.report.load_model.latency = self.latency_tracker.get_latency() if self.config.memory: - self.report.load_model.memory = memory_tracker.get_max_memory() + self.report.load_model.memory = self.memory_tracker.get_max_memory() if self.config.energy: - self.report.load_model.energy = energy_tracker.get_energy() + self.report.load_model.energy = self.energy_tracker.get_energy() - ## Memory tracking - def run_text_generation_memory_tracking(self, backend: Backend[BackendConfigT]): - self.logger.info("\t+ Running Text Generation memory tracking") - memory_tracker = MemoryTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) + # Warmup + def warmup_text_generation(self): + self.logger.info("\t+ Warming up backend for Text Generation") + self.backend.generate(self.inputs, self.config.generate_kwargs) + for _ in range(self.config.warmup_runs): + self.backend.generate(self.inputs, {**self.config.generate_kwargs, **TEXT_GENERATION_WARMUP_OVERRIDES}) + + def warmup_image_diffusion(self): + self.logger.info("\t+ Warming up backend for Image Diffusion") + self.backend.call(self.inputs, self.config.call_kwargs) + for _ in range(self.config.warmup_runs): + self.backend.call(self.inputs, {**self.config.call_kwargs, **IMAGE_DIFFUSION_WARMUP_OVERRIDES}) + + def warmup_inference(self): + self.logger.info("\t+ Warming up backend for Inference") + for _ in range(self.config.warmup_runs): + self.backend.forward(self.inputs, self.config.forward_kwargs) + + ## Text Generation memory tracking + def run_text_generation_memory_tracking(self): prefill_kwargs = {**self.config.generate_kwargs, **TEXT_GENERATION_PREFILL_OVERRIDES} - with memory_tracker.track(): - _ = backend.prefill(self.inputs, prefill_kwargs) + self.logger.info("\t+ Running Text Generation memory tracking") + + with self.memory_tracker.track(): + self.backend.prefill(self.inputs, prefill_kwargs) - self.report.prefill.memory = memory_tracker.get_max_memory() + self.report.prefill.memory = self.memory_tracker.get_max_memory() - with memory_tracker.track(): - _ = backend.generate(self.inputs, self.config.generate_kwargs) + with self.memory_tracker.track(): + self.backend.generate(self.inputs, self.config.generate_kwargs) - self.report.decode.memory = memory_tracker.get_max_memory() + self.report.decode.memory = self.memory_tracker.get_max_memory() - def run_image_diffusion_memory_tracking(self, backend: Backend[BackendConfigT]): + ## Image Diffusion memory tracking + def run_image_diffusion_memory_tracking(self): self.logger.info("\t+ Running Image Diffusion memory tracking") - memory_tracker = MemoryTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) - with memory_tracker.track(): - _ = backend.call(self.inputs, self.config.call_kwargs) + with self.memory_tracker.track(): + self.backend.call(self.inputs, self.config.call_kwargs) - self.report.call.memory = memory_tracker.get_max_memory() + self.report.call.memory = self.memory_tracker.get_max_memory() - def run_inference_memory_tracking(self, backend: Backend[BackendConfigT]): + ## Inference memory tracking + def run_inference_memory_tracking(self): self.logger.info("\t+ Running Inference memory tracking") - memory_tracker = MemoryTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) - with memory_tracker.track(): - _ = backend.forward(self.inputs, self.config.forward_kwargs) + with self.memory_tracker.track(): + self.backend.forward(self.inputs, self.config.forward_kwargs) - self.report.forward.memory = memory_tracker.get_max_memory() + self.report.forward.memory = self.memory_tracker.get_max_memory() - ## Latency tracking - def run_per_token_text_generation_latency_tracking(self, backend: Backend[BackendConfigT]): + ## Per-Token Text Generation latency tracking + def run_per_token_text_generation_latency_tracking(self): self.logger.info("\t+ Running Per-Token Text Generation latency tracking") - latency_tracker = PerTokenLatencyLogitsProcessor(device=backend.config.device, backend=backend.config.name) - per_token_kwargs = {**self.config.generate_kwargs, "logits_processor": LogitsProcessorList([latency_tracker])} - while latency_tracker.elapsed() < self.config.duration or latency_tracker.count() < self.config.iterations: - with latency_tracker.track(): - _ = backend.generate(self.inputs, per_token_kwargs) + self.config.generate_kwargs["logits_processor"] = LogitsProcessorList([self.per_token_latency_tracker]) - per_token_latency = latency_tracker.get_per_token_latency() - prefill_latency = latency_tracker.get_prefill_latency() - decode_latency = latency_tracker.get_decode_latency() + self.per_token_latency_tracker.reset() + while ( + self.per_token_latency_tracker.elapsed() < self.config.duration + or self.per_token_latency_tracker.count() < self.config.iterations + ): + with self.per_token_latency_tracker.track(): + self.backend.generate(self.inputs, self.config.generate_kwargs) + per_token_latency = self.per_token_latency_tracker.get_per_token_latency() + prefill_latency = self.per_token_latency_tracker.get_prefill_latency() + decode_latency = self.per_token_latency_tracker.get_decode_latency() prefill_volume = self.atomic_prefill_volume decode_volume = self.atomic_decode_volume @@ -240,7 +249,9 @@ def run_per_token_text_generation_latency_tracking(self, backend: Backend[Backen self.report.prefill.latency = prefill_latency self.report.decode.latency = decode_latency - # we don't register a per-token throughput, as it's a confusing metric and the same as the decode throughput + # we don't register a per-token throughput, + # it's a confusing metric and the same signal as the decode throughput + self.report.prefill.throughput = Throughput.from_latency( prefill_latency, prefill_volume, unit=PREFILL_THROUGHPUT_UNIT ) @@ -248,16 +259,20 @@ def run_per_token_text_generation_latency_tracking(self, backend: Backend[Backen decode_latency, decode_volume, unit=DECODE_THROUGHPUT_UNIT ) - def run_text_generation_latency_tracking(self, backend: Backend[BackendConfigT]): + ## Text Generation latency tracking + def run_text_generation_latency_tracking(self): self.logger.info("\t+ Running Text Generation latency tracking") - latency_tracker = LatencyTracker(backend=backend.config.name, device=backend.config.device) prefill_kwargs = {**self.config.generate_kwargs, **TEXT_GENERATION_PREFILL_OVERRIDES} - while latency_tracker.elapsed() < self.config.duration or latency_tracker.count() < self.config.iterations: - with latency_tracker.track(): - _ = backend.prefill(self.inputs, prefill_kwargs) + self.latency_tracker.reset() + while ( + self.latency_tracker.elapsed() < self.config.duration + or self.latency_tracker.count() < self.config.iterations + ): + with self.latency_tracker.track(): + self.backend.prefill(self.inputs, prefill_kwargs) - prefill_latency = latency_tracker.get_latency() + prefill_latency = self.latency_tracker.get_latency() prefill_volume = self.atomic_prefill_volume self.report.prefill.latency = prefill_latency @@ -265,12 +280,15 @@ def run_text_generation_latency_tracking(self, backend: Backend[BackendConfigT]) prefill_latency, prefill_volume, unit=PREFILL_THROUGHPUT_UNIT ) - latency_tracker.reset() - while latency_tracker.elapsed() < self.config.duration or latency_tracker.count() < self.config.iterations: - with latency_tracker.track(): - _ = backend.generate(self.inputs, self.config.generate_kwargs) + self.latency_tracker.reset() + while ( + self.latency_tracker.elapsed() < self.config.duration + or self.latency_tracker.count() < self.config.iterations + ): + with self.latency_tracker.track(): + self.backend.generate(self.inputs, self.config.generate_kwargs) - generate_latency = latency_tracker.get_latency() + generate_latency = self.latency_tracker.get_latency() decode_latency = generate_latency - prefill_latency decode_volume = self.atomic_decode_volume @@ -279,29 +297,35 @@ def run_text_generation_latency_tracking(self, backend: Backend[BackendConfigT]) decode_latency, decode_volume, unit=DECODE_THROUGHPUT_UNIT ) - def run_image_diffusion_latency_tracking(self, backend: Backend[BackendConfigT]): + def run_image_diffusion_latency_tracking(self): self.logger.info("\t+ Running Image Diffusion latency tracking") - latency_tracker = LatencyTracker(backend=backend.config.name, device=backend.config.device) - while latency_tracker.elapsed() < self.config.duration or latency_tracker.count() < self.config.iterations: - with latency_tracker.track(): - _ = backend.call(self.inputs, self.config.call_kwargs) + self.latency_tracker.reset() + while ( + self.latency_tracker.elapsed() < self.config.duration + or self.latency_tracker.count() < self.config.iterations + ): + with self.latency_tracker.track(): + self.backend.call(self.inputs, self.config.call_kwargs) - call_latency = latency_tracker.get_latency() + call_latency = self.latency_tracker.get_latency() call_volume = self.atomic_call_volume self.report.call.latency = call_latency self.report.call.throughput = Throughput.from_latency(call_latency, call_volume, unit=CALL_THROUGHPUT_UNIT) - def run_latency_inference_tracking(self, backend: Backend[BackendConfigT]): + def run_inference_latency_tracking(self): self.logger.info("\t+ Running Inference latency tracking") - latency_tracker = LatencyTracker(backend=backend.config.name, device=backend.config.device) - while latency_tracker.elapsed() < self.config.duration or latency_tracker.count() < self.config.iterations: - with latency_tracker.track(): - _ = backend.forward(self.inputs, self.config.forward_kwargs) + self.latency_tracker.reset() + while ( + self.latency_tracker.elapsed() < self.config.duration + or self.latency_tracker.count() < self.config.iterations + ): + with self.latency_tracker.track(): + self.backend.forward(self.inputs, self.config.forward_kwargs) - forward_latency = latency_tracker.get_latency() + forward_latency = self.latency_tracker.get_latency() forward_volume = self.atomic_forward_volume self.report.forward.latency = forward_latency @@ -310,24 +334,21 @@ def run_latency_inference_tracking(self, backend: Backend[BackendConfigT]): ) ## Energy tracking - def run_text_generation_energy_tracking(self, backend: Backend[BackendConfigT]): + def run_text_generation_energy_tracking(self): self.logger.info("\t+ Running Text Generation energy tracking") - energy_tracker = EnergyTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) prefill_kwargs = {**self.config.generate_kwargs, **TEXT_GENERATION_PREFILL_OVERRIDES} count = 0 elapsed = 0 start_time = time.perf_counter() - with energy_tracker.track(file_prefix="prefill"): + with self.energy_tracker.track(task_name="prefill"): while elapsed < self.config.duration or count < self.config.iterations: - _ = backend.prefill(self.inputs, prefill_kwargs) + self.backend.prefill(self.inputs, prefill_kwargs) elapsed = time.perf_counter() - start_time count += 1 - prefill_energy = energy_tracker.get_energy() / count + prefill_energy = self.energy_tracker.get_energy() / count prefill_volume = self.atomic_prefill_volume self.report.prefill.energy = prefill_energy @@ -339,13 +360,13 @@ def run_text_generation_energy_tracking(self, backend: Backend[BackendConfigT]): elapsed = 0 start_time = time.perf_counter() - with energy_tracker.track(file_prefix="generate"): + with self.energy_tracker.track(task_name="generate"): while elapsed < self.config.duration or count < self.config.iterations: - _ = backend.generate(self.inputs, self.config.generate_kwargs) + self.backend.generate(self.inputs, self.config.generate_kwargs) elapsed = time.perf_counter() - start_time count += 1 - generate_energy = energy_tracker.get_energy() / count + generate_energy = self.energy_tracker.get_energy() / count decode_energy = generate_energy - prefill_energy decode_volume = self.atomic_decode_volume @@ -354,45 +375,39 @@ def run_text_generation_energy_tracking(self, backend: Backend[BackendConfigT]): decode_energy, decode_volume, unit=DECODE_EFFICIENCY_UNIT ) - def run_image_diffusion_energy_tracking(self, backend: Backend[BackendConfigT]): + def run_image_diffusion_energy_tracking(self): self.logger.info("\t+ Running Image Diffusion energy tracking") - energy_tracker = EnergyTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) count = 0 elapsed = 0 start_time = time.perf_counter() - with energy_tracker.track(file_prefix="call"): + with self.energy_tracker.track(task_name="call"): while elapsed < self.config.duration or count < self.config.iterations: - _ = backend.call(self.inputs, self.config.call_kwargs) + self.backend.call(self.inputs, self.config.call_kwargs) elapsed = time.perf_counter() - start_time count += 1 - call_energy = energy_tracker.get_energy() / count + call_energy = self.energy_tracker.get_energy() / count call_volume = self.atomic_call_volume self.report.call.energy = call_energy self.report.call.efficiency = Efficiency.from_energy(call_energy, call_volume, unit=CALL_EFFICIENCY_UNIT) - def run_inference_energy_tracking(self, backend: Backend[BackendConfigT]): + def run_inference_energy_tracking(self): self.logger.info("\t+ Running energy tracking") - energy_tracker = EnergyTracker( - backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids - ) count = 0 elapsed = 0 start_time = time.perf_counter() - with energy_tracker.track(file_prefix="forward"): + with self.energy_tracker.track(task_name="forward"): while elapsed < self.config.duration or count < self.config.iterations: - _ = backend.forward(self.inputs, self.config.forward_kwargs) + self.backend.forward(self.inputs, self.config.forward_kwargs) elapsed = time.perf_counter() - start_time count += 1 - forward_energy = energy_tracker.get_energy() / count + forward_energy = self.energy_tracker.get_energy() / count forward_volume = self.atomic_forward_volume self.report.forward.energy = forward_energy @@ -409,7 +424,7 @@ def atomic_prefill_volume(self) -> int: # in terms of processed samples return self.config.input_shapes["batch_size"] @property - def atomic_decode_volume(self) -> int: # in terms of output/generated tokens + def atomic_decode_volume(self) -> int: # in terms of generated tokens return ( self.config.input_shapes["batch_size"] * self.config.generate_kwargs["num_beams"] # at each beam stage there are num_beams tokens generated @@ -417,7 +432,7 @@ def atomic_decode_volume(self) -> int: # in terms of output/generated tokens ) @property - def atomic_call_volume(self) -> int: # in terms of output images + def atomic_call_volume(self) -> int: # in terms of generated images if self.backend.config.task == "text-to-image": return self.config.input_shapes["batch_size"] * self.config.call_kwargs["num_images_per_prompt"] else: diff --git a/optimum_benchmark/scenarios/training/scenario.py b/optimum_benchmark/scenarios/training/scenario.py index 0b80135c..7ee3ff0d 100644 --- a/optimum_benchmark/scenarios/training/scenario.py +++ b/optimum_benchmark/scenarios/training/scenario.py @@ -51,7 +51,7 @@ def run(self, backend: Backend[BackendConfigT]) -> BenchmarkReport: energy_tracker = EnergyTracker( device=backend.config.device, backend=backend.config.name, device_ids=backend.config.device_ids ) - context_stack.enter_context(energy_tracker.track(file_prefix="train")) + context_stack.enter_context(energy_tracker.track(task_name="train")) backend.train( training_dataset=training_dataset, diff --git a/optimum_benchmark/trackers/energy.py b/optimum_benchmark/trackers/energy.py index 427c4d40..19be24b0 100644 --- a/optimum_benchmark/trackers/energy.py +++ b/optimum_benchmark/trackers/energy.py @@ -1,7 +1,7 @@ +import json import os from contextlib import contextmanager from dataclasses import asdict, dataclass -from json import dump from logging import getLogger from typing import List, Literal, Optional, Union @@ -148,7 +148,7 @@ def print(self): class EnergyTracker: - def __init__(self, backend: str, device: str, device_ids: Optional[Union[str, int, List[int]]] = None): + def __init__(self, device: str, backend: str, device_ids: Optional[Union[str, int, List[int]]] = None): self.device = device self.backend = backend self.device_ids = device_ids @@ -223,12 +223,18 @@ def __init__(self, backend: str, device: str, device_ids: Optional[Union[str, in self.gpu_energy: Optional[float] = None self.ram_energy: Optional[float] = None + def reset(self): + self.total_energy = None + self.cpu_energy = None + self.gpu_energy = None + self.ram_energy = None + @contextmanager - def track(self, file_prefix: str = "task"): + def track(self, task_name: str = "task"): if self.is_pytorch_cuda: torch.cuda.synchronize() - self.emission_tracker.start_task() + self.emission_tracker.start_task(task_name=task_name) yield @@ -237,9 +243,9 @@ def track(self, file_prefix: str = "task"): emission_data: EmissionsData = self.emission_tracker.stop_task() - with open(f"{file_prefix}_codecarbon.json", "w") as f: - LOGGER.info(f"\t\t+ Saving codecarbon emission data to {file_prefix}_codecarbon.json") - dump(asdict(emission_data), f, indent=4) + with open(f"{task_name}_codecarbon.json", "w") as f: + LOGGER.info(f"\t\t+ Saving codecarbon emission data to {task_name}_codecarbon.json") + json.dump(asdict(emission_data), f, indent=4) self.total_energy = emission_data.energy_consumed self.cpu_energy = emission_data.cpu_energy @@ -247,6 +253,8 @@ def track(self, file_prefix: str = "task"): self.ram_energy = emission_data.ram_energy def get_energy(self) -> Energy: + assert self.total_energy is not None, "Energy must be tracked before calling this method" + return Energy( unit=ENERGY_UNIT, cpu=self.cpu_energy, gpu=self.gpu_energy, ram=self.ram_energy, total=self.total_energy ) diff --git a/optimum_benchmark/trackers/latency.py b/optimum_benchmark/trackers/latency.py index de4ab341..2850c503 100644 --- a/optimum_benchmark/trackers/latency.py +++ b/optimum_benchmark/trackers/latency.py @@ -209,6 +209,10 @@ def _cpu_latency(self): self.end_events.append(time.perf_counter()) def get_latency(self) -> Latency: + assert len(self.start_events) == len( + self.end_events + ), "Mismatched number of start and end events, get_latency() should only be called outside of track() context" + if self.is_pytorch_cuda: torch.cuda.synchronize() @@ -276,6 +280,10 @@ def on_step_end(self, *args, **kwargs): self.end_events.append(time.perf_counter()) def get_latency(self) -> Latency: + assert len(self.start_events) == len( + self.end_events + ), "Mismatched number of start and end events, get_latency() should only be called outside of track() context" + if self.is_pytorch_cuda: torch.cuda.synchronize() @@ -404,6 +412,10 @@ def get_decode_latency(self) -> Latency: return Latency.from_values(latencies_list, unit=LATENCY_UNIT) def get_per_token_latency(self) -> Latency: + assert ( + len(self.per_token_events) > 0 + ), "No per-token events recorded, make sure to pass the PerTokenLatencyLogitsProcessor to the generate() method" + if self.is_pytorch_cuda: torch.cuda.synchronize() diff --git a/optimum_benchmark/trackers/memory.py b/optimum_benchmark/trackers/memory.py index 47edf71e..8cb13185 100644 --- a/optimum_benchmark/trackers/memory.py +++ b/optimum_benchmark/trackers/memory.py @@ -258,6 +258,8 @@ def _cpu_memory(self): parent_connection.close() def get_max_memory(self): + assert self.max_ram_memory is not None, "Memory tracker must be run before getting the maximum memory" + return Memory( unit=MEMORY_UNIT, max_ram=self.max_ram_memory, diff --git a/tests/test_energy_star.py b/tests/test_energy_star.py index d3be12ac..bbb83f55 100644 --- a/tests/test_energy_star.py +++ b/tests/test_energy_star.py @@ -28,6 +28,9 @@ def test_cli_configs(config_name): TEST_CONFIG_DIR, "--config-name", config_name, + "scenario.energy=true", + "scenario.memory=true", + "scenario.latency=true", "scenario.num_samples=2", "scenario.input_shapes.batch_size=2", ]