From dedfdd707718991ab48ff3ff1e6c6c1464c4ad04 Mon Sep 17 00:00:00 2001 From: chenmoneygithub Date: Tue, 22 Oct 2024 18:01:52 -0700 Subject: [PATCH 1/2] metrics dedup --- composer/loggers/mlflow_logger.py | 56 ++++++++++++++++++----- tests/loggers/test_mlflow_logger.py | 70 ++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 13 deletions(-) diff --git a/composer/loggers/mlflow_logger.py b/composer/loggers/mlflow_logger.py index f3b60eb1b7..9d46aaeeab 100644 --- a/composer/loggers/mlflow_logger.py +++ b/composer/loggers/mlflow_logger.py @@ -34,6 +34,8 @@ __all__ = ['MLFlowLogger'] DEFAULT_MLFLOW_EXPERIMENT_NAME = 'my-mlflow-experiment' +LOG_DUPLICATED_METRIC_VALUE_PER_N_STEPS = 100 +LOG_DUPLICATED_METRIC_VALUE_PER_N_MILLIS = 600000 class MlflowMonitorProcess(multiprocessing.Process): @@ -118,6 +120,11 @@ class MLFlowLogger(LoggerDestination): logging_buffer_seconds (int, optional): The amount of time, in seconds, that MLflow waits before sending logs to the MLflow tracking server. Metrics/params/tags logged within this buffer time will be grouped in batches before being sent to the backend. + log_duplicated_metric_every_n_steps (int, optional): The number of steps to wait before + logging the duplicated metric value. Duplicated metric value means the new step has the + same value as the previous step. (default: ``100``) + log_duplicated_metric_every_n_millis (int, optional): The number of milliseconds to wait + before logging the duplicated metric value. (default: ``600000``) """ def __init__( @@ -138,6 +145,8 @@ def __init__( run_group: Optional[str] = None, resume: bool = False, logging_buffer_seconds: Optional[int] = 10, + log_duplicated_metric_every_n_steps: int = 100, + log_duplicated_metric_every_n_millis: int = 600000, ) -> None: try: import mlflow @@ -179,6 +188,10 @@ def __init__( mlflow.set_system_metrics_samples_before_logging(6) mlflow.set_system_metrics_sampling_interval(5) + self.log_duplicated_metric_every_n_steps = log_duplicated_metric_every_n_steps + self.log_duplicated_metric_every_n_millis = log_duplicated_metric_every_n_millis + self._metrics_cache = {} + self._rank_zero_only = rank_zero_only self._last_flush_time = time.time() self._flush_interval = flush_interval @@ -385,18 +398,37 @@ def rename(self, key: str): def log_metrics(self, metrics: dict[str, Any], step: Optional[int] = None) -> None: from mlflow import log_metrics - if self._enabled: - # Convert all metrics to floats to placate mlflow. - metrics = { - self.rename(k): float(v) - for k, v in metrics.items() - if not any(fnmatch.fnmatch(k, pattern) for pattern in self.ignore_metrics) - } - log_metrics( - metrics=metrics, - step=step, - synchronous=self.synchronous, - ) + if not self._enabled: + return + + metrics_to_log = {} + step = step or 0 + current_time_millis = int(time.time() * 1000) + for k, v in metrics.items(): + if any(fnmatch.fnmatch(k, pattern) for pattern in self.ignore_metrics): + continue + if k in self._metrics_cache: + value, last_step, last_time = self._metrics_cache[k] + if value == v and step < last_step + self.log_duplicated_metric_every_n_steps and current_time_millis < last_time + self.log_duplicated_metric_every_n_millis: + # Skip logging the metric if it has the same value as the last step and it's + # within the step and time window. + continue + else: + # Log the metric if it has a different value or it's outside the step and time + # window, and update the metrics cache. + self._metrics_cache[k] = (v, step, current_time_millis) + metrics_to_log[self.rename(k)] = float(v) + else: + # Log the metric if it's the first time it's being logged, and update the metrics + # cache. + self._metrics_cache[k] = (v, step, current_time_millis) + metrics_to_log[self.rename(k)] = float(v) + + log_metrics( + metrics=metrics_to_log, + step=step, + synchronous=self.synchronous, + ) def log_hyperparameters(self, hyperparameters: dict[str, Any]): from mlflow import log_params diff --git a/tests/loggers/test_mlflow_logger.py b/tests/loggers/test_mlflow_logger.py index 5ee6aab7a5..962da64562 100644 --- a/tests/loggers/test_mlflow_logger.py +++ b/tests/loggers/test_mlflow_logger.py @@ -19,7 +19,10 @@ from tests.common.datasets import RandomClassificationDataset, RandomImageDataset from tests.common.markers import device from tests.common.models import SimpleConvModel, SimpleModel -from tests.models.test_hf_model import check_hf_model_equivalence, check_hf_tokenizer_equivalence +from tests.models.test_hf_model import ( + check_hf_model_equivalence, + check_hf_tokenizer_equivalence, +) def _get_latest_mlflow_run(experiment_name, tracking_uri=None): @@ -836,6 +839,71 @@ def test_mlflow_logging_time_buffer(tmp_path): assert len(mock_log_batch.call_args_list[1][1]['metrics']) == 2 * steps +def test_mlflow_logging_with_metrics_dedupping(tmp_path): + with patch('mlflow.log_metrics') as mock_log_metrics: + + mlflow_uri = tmp_path / Path('my-test-mlflow-uri') + experiment_name = 'mlflow_logging_test' + mock_state = MagicMock() + mock_logger = MagicMock() + + test_mlflow_logger = MLFlowLogger( + tracking_uri=mlflow_uri, + experiment_name=experiment_name, + log_system_metrics=True, + run_name='test_run', + logging_buffer_seconds=2, + log_duplicated_metric_every_n_steps=3, + log_duplicated_metric_every_n_millis=10000, + ) + test_mlflow_logger.init(state=mock_state, logger=mock_logger) + # # Test dedupping of metrics and duplicated metrics get logged per + # # `log_duplicated_metric_every_n_steps` steps. + # steps = 10 + # for i in range(steps): + # # 'foo' always have different values, while 'bar' always have the same value. + # metrics = { + # 'foo': i, + # 'bar': 0, + # } + # test_mlflow_logger.log_metrics(metrics, step=i) + + # if i % 3 == 0: + # # 'bar' will be logged every 3 steps. + # mock_log_metrics.assert_called_with(metrics={'foo': float(i), 'bar': 0.0}, step=i, synchronous=False) + # else: + # # 'bar' will not be logged. + # mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=i, synchronous=False) + + # Test dedupping of metrics and duplicated metrics get logged per + # `log_duplicated_metric_every_n_millis` milliseconds. + timestamps = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45] + # Reset the metrics cache. + test_mlflow_logger._metrics_cache = {} + with patch('time.time', side_effect=timestamps): + for i in range(len(timestamps)): + # 'foo' always have different values, while 'bar' always have the same value. + metrics = { + 'foo': i, + 'bar': 0, + } + test_mlflow_logger.log_metrics(metrics, step=0) + + if i % 2 == 0: + # 'bar' will be logged every 2 steps. + mock_log_metrics.assert_called_with( + metrics={ + 'foo': float(i), + 'bar': 0.0, + }, step=0, synchronous=False, + ) + else: + # 'bar' will not be logged. + mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=0, synchronous=False) + + test_mlflow_logger.post_close() + + def test_mlflow_resume_run(tmp_path): mlflow = pytest.importorskip('mlflow') From 3842d155afa60390d5bee45b4582719f456410e8 Mon Sep 17 00:00:00 2001 From: chenmoneygithub Date: Fri, 25 Oct 2024 12:47:22 -0700 Subject: [PATCH 2/2] fix comments --- composer/loggers/mlflow_logger.py | 20 ++++------ tests/loggers/test_mlflow_logger.py | 60 ++++++++--------------------- 2 files changed, 24 insertions(+), 56 deletions(-) diff --git a/composer/loggers/mlflow_logger.py b/composer/loggers/mlflow_logger.py index 9d46aaeeab..2905f033fe 100644 --- a/composer/loggers/mlflow_logger.py +++ b/composer/loggers/mlflow_logger.py @@ -35,7 +35,6 @@ DEFAULT_MLFLOW_EXPERIMENT_NAME = 'my-mlflow-experiment' LOG_DUPLICATED_METRIC_VALUE_PER_N_STEPS = 100 -LOG_DUPLICATED_METRIC_VALUE_PER_N_MILLIS = 600000 class MlflowMonitorProcess(multiprocessing.Process): @@ -123,8 +122,6 @@ class MLFlowLogger(LoggerDestination): log_duplicated_metric_every_n_steps (int, optional): The number of steps to wait before logging the duplicated metric value. Duplicated metric value means the new step has the same value as the previous step. (default: ``100``) - log_duplicated_metric_every_n_millis (int, optional): The number of milliseconds to wait - before logging the duplicated metric value. (default: ``600000``) """ def __init__( @@ -146,7 +143,6 @@ def __init__( resume: bool = False, logging_buffer_seconds: Optional[int] = 10, log_duplicated_metric_every_n_steps: int = 100, - log_duplicated_metric_every_n_millis: int = 600000, ) -> None: try: import mlflow @@ -189,7 +185,6 @@ def __init__( mlflow.set_system_metrics_sampling_interval(5) self.log_duplicated_metric_every_n_steps = log_duplicated_metric_every_n_steps - self.log_duplicated_metric_every_n_millis = log_duplicated_metric_every_n_millis self._metrics_cache = {} self._rank_zero_only = rank_zero_only @@ -403,25 +398,24 @@ def log_metrics(self, metrics: dict[str, Any], step: Optional[int] = None) -> No metrics_to_log = {} step = step or 0 - current_time_millis = int(time.time() * 1000) for k, v in metrics.items(): if any(fnmatch.fnmatch(k, pattern) for pattern in self.ignore_metrics): continue if k in self._metrics_cache: - value, last_step, last_time = self._metrics_cache[k] - if value == v and step < last_step + self.log_duplicated_metric_every_n_steps and current_time_millis < last_time + self.log_duplicated_metric_every_n_millis: + value, last_step = self._metrics_cache[k] + if value == v and step < last_step + self.log_duplicated_metric_every_n_steps: # Skip logging the metric if it has the same value as the last step and it's - # within the step and time window. + # within the step window. continue else: - # Log the metric if it has a different value or it's outside the step and time - # window, and update the metrics cache. - self._metrics_cache[k] = (v, step, current_time_millis) + # Log the metric if it has a different value or it's outside the step window, + # and update the metrics cache. + self._metrics_cache[k] = (v, step) metrics_to_log[self.rename(k)] = float(v) else: # Log the metric if it's the first time it's being logged, and update the metrics # cache. - self._metrics_cache[k] = (v, step, current_time_millis) + self._metrics_cache[k] = (v, step) metrics_to_log[self.rename(k)] = float(v) log_metrics( diff --git a/tests/loggers/test_mlflow_logger.py b/tests/loggers/test_mlflow_logger.py index 962da64562..9d84baa06f 100644 --- a/tests/loggers/test_mlflow_logger.py +++ b/tests/loggers/test_mlflow_logger.py @@ -709,6 +709,7 @@ def test_mlflow_ignore_metrics(self, num_batches, device, ignore_metrics, expect logger = MLFlowLogger( tracking_uri=tmp_path / Path('my-test-mlflow-uri'), ignore_metrics=ignore_metrics, + log_duplicated_metric_every_n_steps=0, ) file_path = self.run_trainer(logger, num_batches) @@ -854,52 +855,25 @@ def test_mlflow_logging_with_metrics_dedupping(tmp_path): run_name='test_run', logging_buffer_seconds=2, log_duplicated_metric_every_n_steps=3, - log_duplicated_metric_every_n_millis=10000, ) test_mlflow_logger.init(state=mock_state, logger=mock_logger) - # # Test dedupping of metrics and duplicated metrics get logged per - # # `log_duplicated_metric_every_n_steps` steps. - # steps = 10 - # for i in range(steps): - # # 'foo' always have different values, while 'bar' always have the same value. - # metrics = { - # 'foo': i, - # 'bar': 0, - # } - # test_mlflow_logger.log_metrics(metrics, step=i) - - # if i % 3 == 0: - # # 'bar' will be logged every 3 steps. - # mock_log_metrics.assert_called_with(metrics={'foo': float(i), 'bar': 0.0}, step=i, synchronous=False) - # else: - # # 'bar' will not be logged. - # mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=i, synchronous=False) - # Test dedupping of metrics and duplicated metrics get logged per - # `log_duplicated_metric_every_n_millis` milliseconds. - timestamps = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45] - # Reset the metrics cache. - test_mlflow_logger._metrics_cache = {} - with patch('time.time', side_effect=timestamps): - for i in range(len(timestamps)): - # 'foo' always have different values, while 'bar' always have the same value. - metrics = { - 'foo': i, - 'bar': 0, - } - test_mlflow_logger.log_metrics(metrics, step=0) - - if i % 2 == 0: - # 'bar' will be logged every 2 steps. - mock_log_metrics.assert_called_with( - metrics={ - 'foo': float(i), - 'bar': 0.0, - }, step=0, synchronous=False, - ) - else: - # 'bar' will not be logged. - mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=0, synchronous=False) + # `log_duplicated_metric_every_n_steps` steps. + steps = 10 + for i in range(steps): + # 'foo' always have different values, while 'bar' always have the same value. + metrics = { + 'foo': i, + 'bar': 0, + } + test_mlflow_logger.log_metrics(metrics, step=i) + + if i % 3 == 0: + # 'bar' will be logged every 3 steps. + mock_log_metrics.assert_called_with(metrics={'foo': float(i), 'bar': 0.0}, step=i, synchronous=False) + else: + # 'bar' will not be logged. + mock_log_metrics.assert_called_with(metrics={'foo': float(i)}, step=i, synchronous=False) test_mlflow_logger.post_close()