diff --git a/mteb/evaluation/MTEB.py b/mteb/evaluation/MTEB.py index 70f3e21ca8..9c4f2e8f2a 100644 --- a/mteb/evaluation/MTEB.py +++ b/mteb/evaluation/MTEB.py @@ -4,7 +4,7 @@ import logging import os import traceback -from copy import copy +from copy import copy, deepcopy from datetime import datetime from pathlib import Path from time import time @@ -13,6 +13,7 @@ import datasets from sentence_transformers import SentenceTransformer +from mteb.abstasks.AbsTask import ScoresDict from mteb.encoder_interface import Encoder from mteb.model_meta import ModelMeta from mteb.models import model_meta_from_sentence_transformers @@ -76,6 +77,8 @@ def __init__( self._version = version self.err_logs_path = err_logs_path + self.last_evaluated_splits = {} + self.select_tasks(**kwargs) def deprecation_warning( @@ -273,6 +276,59 @@ def _run_eval( tock = time() return results, tick, tock + @staticmethod + def _get_missing_splits( + existing_results: MTEBResults | None, task_eval_splits: list[str], task: AbsTask + ) -> list[str]: + if existing_results is None: + return task_eval_splits + + missing_splits = [] + for split in task_eval_splits: + if split not in existing_results.scores: + missing_splits.append(split) + elif not existing_results.scores[ + split + ]: # Check if the split has any scores + missing_splits.append(split) + + return missing_splits + + @staticmethod + def _merge_results( + existing_results: MTEBResults, new_results: MTEBResults + ) -> MTEBResults: + merged_scores = existing_results.scores.copy() + + for split, scores in new_results.scores.items(): + if split in merged_scores: + merged_scores[split] = MTEB._merge_split_scores( + merged_scores[split], scores + ) + else: + merged_scores[split] = scores + + merged_results = MTEBResults( + dataset_revision=existing_results.dataset_revision, + task_name=existing_results.task_name, + mteb_version=existing_results.mteb_version, + scores=merged_scores, + evaluation_time=existing_results.evaluation_time + + new_results.evaluation_time, + kg_co2_emissions=existing_results.kg_co2_emissions, + ) + + return merged_results + + @staticmethod + def _merge_split_scores( + existing_scores: list[ScoresDict], new_scores: list[ScoresDict] + ) -> list[ScoresDict]: + merged = {score["hf_subset"]: score for score in existing_scores} + for score in new_scores: + merged[score["hf_subset"]] = score + return list(merged.values()) + def run( self, model: SentenceTransformer | Encoder, @@ -330,30 +386,44 @@ def run( original_tasks = ( self.tasks.copy() ) # save them in case we re-use the object (e.g. for reranking) + self.last_evaluated_splits = {} while len(self.tasks) > 0: task = self.tasks[0] logger.info( f"\n\n********************** Evaluating {task.metadata.name} **********************" ) - # skip evaluation if results folder exists and overwrite_results is False if output_path: save_path = output_path / f"{task.metadata.name}{task.save_suffix}.json" + existing_results = None if save_path.exists() and not overwrite_results: - logger.info( - f"{task.metadata.name} results already exists. Loading results from disk. Set overwrite_results=True to overwrite." - ) - mteb_results = MTEBResults.from_disk(save_path) - evaluation_results.append(mteb_results) - del self.tasks[0] # empty memory - continue - try: + try: + existing_results = MTEBResults.from_disk(save_path) + except Exception as e: + logger.warning(f"Error loading existing results: {e}") + task_eval_splits = ( eval_splits if eval_splits is not None else task.eval_splits ) + missing_splits = self._get_missing_splits( + existing_results, task_eval_splits, task + ) + + if not missing_splits and existing_results: + logger.info( + f"{task.metadata.name} results already exist. Loading results from disk." + ) + evaluation_results.append(existing_results) + self.last_evaluated_splits[task.metadata.name] = [] # Add this line + del self.tasks[0] + continue + + if missing_splits: + logger.info( + f"Running evaluation for missing splits: {missing_splits}" + ) - # load data - logger.info(f"Loading dataset for {task.metadata_dict['name']}") + try: task.check_if_dataset_is_superseeded() task.load_data(eval_splits=task_eval_splits, **kwargs) @@ -361,7 +431,8 @@ def run( task_results = {} evaluation_time = 0 kg_co2_emissions: int | None = 0 if co2_tracker else None - for split in task_eval_splits: + + for split in missing_splits: if co2_tracker: try: from codecarbon import EmissionsTracker @@ -404,21 +475,26 @@ def run( if verbosity >= 1: logger.info(f"Scores: {results}") - mteb_task_result = MTEBResults.from_task_results( + if task.metadata_dict["name"] not in self.last_evaluated_splits: + self.last_evaluated_splits[task.metadata_dict["name"]] = set() + self.last_evaluated_splits[task.metadata_dict["name"]].add(split) + + new_results = MTEBResults.from_task_results( task, task_results, evaluation_time=evaluation_time, kg_co2_emissions=kg_co2_emissions, ) - # save results + if existing_results: + merged_results = self._merge_results(existing_results, new_results) + else: + merged_results = new_results + if output_path: - with open(save_path, "w") as f_out: - json.dump( - mteb_task_result.to_dict(), f_out, indent=2, sort_keys=True - ) + merged_results.to_disk(save_path) - evaluation_results.append(mteb_task_result) + evaluation_results.append(merged_results) except Exception as e: logger.error( @@ -437,7 +513,6 @@ def run( # empty memory del self.tasks[0] - # restore original tasks self.tasks = original_tasks return evaluation_results @@ -488,3 +563,11 @@ def _save_model_metadata(model_meta: ModelMeta, output_folder: Path) -> None: with save_path.open("w") as f: json.dump(model_meta.to_dict(), f) + + def get_last_evaluated_splits(self): + """Returns a dictionary of tasks and their evaluated splits from the most recent run. + Tasks with empty lists indicate that results already existed and no splits were evaluated. + """ + return deepcopy( + {task: list(splits) for task, splits in self.last_evaluated_splits.items()} + ) diff --git a/tests/test_evaluation/test_split_evaluation.py b/tests/test_evaluation/test_split_evaluation.py new file mode 100644 index 0000000000..419055d0ef --- /dev/null +++ b/tests/test_evaluation/test_split_evaluation.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import pytest +from sentence_transformers import SentenceTransformer + +import mteb +from mteb import MTEB + + +@pytest.fixture +def model(): + return SentenceTransformer("all-MiniLM-L6-v2") + + +@pytest.fixture +def nfcorpus_tasks(): + return mteb.get_tasks(tasks=["NFCorpus"], languages=["eng"]) + + +def test_all_splits_evaluated(model, nfcorpus_tasks, tmp_path): + evaluation = MTEB(tasks=nfcorpus_tasks) + evaluation.run( + model, + eval_splits=["train", "test"], + save_predictions=True, + output_folder=str(tmp_path / "testcase1"), + verbosity=2, + ) + last_evaluated_splits = evaluation.get_last_evaluated_splits() + print(last_evaluated_splits) + assert "NFCorpus" in last_evaluated_splits + assert set(last_evaluated_splits["NFCorpus"]) == {"train", "test"} + assert len(last_evaluated_splits["NFCorpus"]) == 2 + + +def test_one_missing_split(model, nfcorpus_tasks, tmp_path): + evaluation = MTEB(tasks=nfcorpus_tasks) + evaluation.run( + model, + eval_splits=["train"], + save_predictions=True, + output_folder=str(tmp_path / "testcase2"), + verbosity=2, + ) + + # Get model and tasks again + model = SentenceTransformer("all-MiniLM-L6-v2") + nfcorpus_tasks = mteb.get_tasks(tasks=["NFCorpus"], languages=["eng"]) + + evaluation_2 = MTEB(tasks=nfcorpus_tasks) + evaluation_2.run( + model, + eval_splits=["train", "test"], + save_predictions=True, + output_folder=str(tmp_path / "testcase2"), + verbosity=2, + ) + + last_evaluated_splits = evaluation_2.get_last_evaluated_splits() + + print(last_evaluated_splits) + assert "NFCorpus" in last_evaluated_splits + assert set(last_evaluated_splits["NFCorpus"]) == {"test"} + assert len(last_evaluated_splits["NFCorpus"]) == 1 + + +def test_no_missing_splits(model, nfcorpus_tasks, tmp_path): + evaluation_1 = MTEB(tasks=nfcorpus_tasks) + evaluation_1.run( + model, + eval_splits=["train", "test"], + save_predictions=True, + output_folder=str(tmp_path / "testcase3"), + verbosity=2, + ) + + evaluation_2 = MTEB(tasks=nfcorpus_tasks) + evaluation_2.run( + model, + eval_splits=["train", "test"], + save_predictions=True, + output_folder=str(tmp_path / "testcase3"), + verbosity=2, + ) + + last_evaluated_splits = evaluation_2.get_last_evaluated_splits() + + assert "NFCorpus" in last_evaluated_splits + assert len(last_evaluated_splits["NFCorpus"]) == 0