diff --git a/requirements.txt b/requirements.txt index e47087d..f1b0335 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ datasets +scikit-learn sentencepiece tokenizers torch torchmetrics tqdm -transformers -scikit-learn \ No newline at end of file +transformers \ No newline at end of file diff --git a/transformer_ranker/datacleaner.py b/transformer_ranker/datacleaner.py index 7273c21..b817616 100644 --- a/transformer_ranker/datacleaner.py +++ b/transformer_ranker/datacleaner.py @@ -27,7 +27,7 @@ def __init__( text_pair_column: Optional[str] = None, ): """ - Prepare huggingface dataset, clean it, find sentence and label columns. + Prepare huggingface dataset. Identify task type, find text and label columns, down-sample, merge data splits. :param pre_tokenizer: Pre-tokenizer to use, such as Whitespace from huggingface pre-tokenizers. :param exclude_test_split: Whether to exclude the test split. @@ -79,7 +79,7 @@ def prepare_dataset(self, dataset: Union[str, DatasetDict, Dataset]) -> Union[Da logger.info("Removing the test split") del dataset['test'] - if self.merge_data_splits and (isinstance(dataset, DatasetDict) or isinstance(dataset, list)): + if self.merge_data_splits and isinstance(dataset, DatasetDict): dataset = self._merge_data_splits(dataset) # Find text and label columns @@ -120,7 +120,7 @@ def prepare_dataset(self, dataset: Union[str, DatasetDict, Dataset]) -> Union[Da self.label_column = label_column self.task_type = task_type self.dataset_size = len(dataset) - self.log_dataset_info(dataset) + self.log_dataset_info() # Simplify the dataset: keep only relevant columns keep_columns = [col for col in (self.text_column, self.text_pair_column, self.label_column) if col is not None] @@ -186,21 +186,23 @@ def merge_texts(example: Dict[str, str]) -> Dict[str, str]: return dataset @staticmethod - def _find_task_type(label_column: str, label_type: Union[Type[int], Type[str], Type[list], Type[float]]) -> str: - """Determine task type based on the label column's data type.""" + def _find_task_type(label_column: str, label_type: type) -> str: + """Determine the task type based on the label column's data type.""" label_type_to_task_type = { - int: "text classification", # labels can be integers + int: "text classification", # text classification labels can be integers str: "text classification", # or strings e.g. "positive" - list: "token classification", - float: "text regression", + list: "token classification", # token-level tasks have a list of labels + float: "text regression", # regression tasks have continuous values } - task_type = label_type_to_task_type.get(label_type, None) + for key, task_type in label_type_to_task_type.items(): + if issubclass(label_type, key): + return task_type - if not task_type: - raise ValueError(f"Cannot determine task type from the label column '{label_column}' " - f"value: {type(label_type)}.") - return task_type + raise ValueError( + f"Cannot determine the task type for the label column '{label_column}'. " + f"Expected label types are {list(label_type_to_task_type.keys())}, but got {label_type}." + ) @staticmethod def _tokenize(dataset: Dataset, pre_tokenizer: Whitespace, text_column: str) -> Dataset: @@ -227,7 +229,7 @@ def dataset_row_is_clean(example) -> bool: entry_has_text = bool(text) if isinstance(text, list) else True # non empty string all_tokens_are_valid = all(token != '\uFE0F' for token in text) if isinstance(text, list) else True label_is_valid = label is not None and (all(l >= 0 for l in label) if isinstance(label, list) else label >= 0) - return entry_has_text and label_is_valid and all_tokens_are_valid # keep entries that have text and labels + return entry_has_text and label_is_valid and all_tokens_are_valid # keep entries that have text and labels dataset = dataset.filter(dataset_row_is_clean, desc="Removing empty sentences") return dataset @@ -327,9 +329,9 @@ def map_to_spans(example): return dataset, span_label_map - def log_dataset_info(self, dataset) -> None: - """Log information about dataset after cleaning it""" - logger.info(f"Sentence and label columns: '{self.text_column}', '{self.label_column}'") - logger.info(f"Task type: '{self.task_type}'") - downsample_info = f"(downsampled to {self.dataset_downsample})" if self.dataset_downsample else "" + def log_dataset_info(self) -> None: + """Log information about dataset""" + logger.info(f"Text and label columns: '{self.text_column}', '{self.label_column}'") + logger.info(f"Task type identified: '{self.task_type}'") + downsample_info = f"(down-sampled to {self.dataset_downsample})" if self.dataset_downsample else "" logger.info(f"Dataset size: {self.dataset_size} {downsample_info}") diff --git a/transformer_ranker/embedder.py b/transformer_ranker/embedder.py index 81b7a3f..1dc4569 100644 --- a/transformer_ranker/embedder.py +++ b/transformer_ranker/embedder.py @@ -20,22 +20,17 @@ def __init__( device: Optional[str] = None, ): """ - Embed sentences using a pre-trained transformer model. It works at the word level, meaning each sentence - is represented by a list of word vectors. You can pool these into a single sentence embedding if needed. + Embed texts using a pre-trained transformer model. This embedder works at the word level, representing each + text as a list of word vectors. It supports various sub-word pooling and effective sentence pooling options. ♻️ Feel free to use it if you ever need a simple implementation for transformer embeddings. - :param model: Name of the model to be used. Either a model handle (e.g. 'bert-base-uncased') - or a loaded model e.g. AutoModel('bert-base-uncased'). - :param tokenizer: Optional parameter to specify the tokenizer. Either a tokenizer handle - (e.g. 'bert-base-uncased') or a loaded tokenizer e.g. AutoTokenizer.from_pretrained('bert-base-uncased'). - :param subword_pooling: Method used to pool sub-word embeddings to form word-level embeddings. - :param layer_ids: Specifies which layers' outputs should be used. This can be a single top-most layer as '-1', - multiple layers like '-1,-2,-3, -4', or 'all' to use all layers. Default is 'all'. - :param layer_pooling: Optional method used to combine or pool embeddings from selected layers. - If not specified, no pooling across layers is applied, and each layer's output is handled independently. - :param use_pretokenizer: If to pre-tokenize texts using whitespace - :param device: Optional specification of the computing device where the model operations are performed. - Can be 'cpu' or 'cuda'. If not specified, it defaults to the best available device. + :param model: The model to use, either by name (e.g., 'bert-base-uncased') or a loaded model instance. + :param tokenizer: Optional tokenizer, either by name or a loaded tokenizer instance. + :param subword_pooling: Method for pooling sub-word embeddings into word-level embeddings. + :param layer_ids: Layers to use e.g., '-1' for the top layer, '-1,-2' for multiple, or 'all'. Default is 'all'. + :param layer_pooling: Optional method for pooling across selected layers. + :param use_pretokenizer: Whether to pre-tokenize texts using whitespace. + :param device: Device for computations, either 'cpu' or 'cuda'. Defaults to the available device. """ # Load transformer model if isinstance(model, torch.nn.Module): @@ -47,15 +42,17 @@ def __init__( # Load a model-specific tokenizer self.tokenizer: PreTrainedTokenizerFast + tokenizer_source = tokenizer if isinstance(tokenizer, str) else self.model_name - if tokenizer is None: - self.tokenizer = AutoTokenizer.from_pretrained(self.model_name, add_prefix_space=True) - - elif isinstance(tokenizer, str): - self.tokenizer = AutoTokenizer.from_pretrained(tokenizer, add_prefix_space=True) - - else: + # Assign or load tokenizer + if isinstance(tokenizer, PreTrainedTokenizerFast): self.tokenizer = tokenizer + else: + self.tokenizer = AutoTokenizer.from_pretrained( + tokenizer_source, + add_prefix_space=True, + clean_up_tokenization_spaces=True, + ) # Add padding token for models that do not have it (e.g. GPT2) if self.tokenizer.pad_token is None: @@ -81,7 +78,6 @@ def __init__( # Set cpu or gpu device if device is None: self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - else: self.device = torch.device(device) @@ -126,7 +122,7 @@ def embed( for batch in tqdm( batches, - desc="Retrieving Embeddings ", + desc="Retrieving Embeddings", disable=not show_loading_bar, bar_format=tqdm_bar_format ): @@ -190,8 +186,8 @@ def _filter_layer_ids(self, layer_ids) -> List[int]: return [-i for i in range(1, self.num_transformer_layers + 1)] layer_ids = [int(number) for number in layer_ids.split(",")] + layer_ids = [layer_id for layer_id in layer_ids if self.num_transformer_layers >= abs(layer_id)] - layer_ids = [layer_id for layer_id in layer_ids if self.num_transformer_layers + 1 >= abs(layer_id)] return layer_ids def _extract_relevant_layers(self, batched_embeddings: torch.Tensor) -> torch.Tensor: diff --git a/transformer_ranker/ranker.py b/transformer_ranker/ranker.py index 26e4c8c..6d8bc25 100644 --- a/transformer_ranker/ranker.py +++ b/transformer_ranker/ranker.py @@ -24,13 +24,13 @@ def __init__( **kwargs ): """ - Rank language models based on their predicted performance for a specific NLP task. - We use metrics like h-score or logme to estimate the quality of embeddings. Features are taken from - deeper layers by averaging all layers or by selecting the best-scoring layer in each model. + Rank language models for different NLP tasks. Embed a part of the dataset and + estimate embedding suitability with transferability metrics like hscore or logme. + Embeddings can either be averaged across all layers or selected from the best-performing layer. - :param dataset: huggingface dataset for evaluating transformer models, containing texts and label columns. - :param dataset_downsample: a fraction to which the dataset should be down-sampled. - :param kwargs: Additional parameters for data pre-processing. + :param dataset: a dataset from huggingface, containing texts and label columns. + :param dataset_downsample: a fraction to which the dataset should be reduced. + :param kwargs: Additional dataset-specific parameters for data cleaning. """ # Clean the original dataset and keep only needed columns self.data_handler = DatasetCleaner(dataset_downsample=dataset_downsample, @@ -42,7 +42,6 @@ def __init__( self.dataset = self.data_handler.prepare_dataset(dataset) - # Find task type if not given: word classification or text classification self.task_type = self.data_handler.task_type # Find text and label columns @@ -76,16 +75,15 @@ def run( """ self._confirm_ranker_setup(estimator=estimator, layer_aggregator=layer_aggregator) - # Load all transformers into hf cache for later use + # Load all transformers into hf cache self._preload_transformers(models) labels = self.data_handler.prepare_labels(self.dataset) - result_dictionary = Result(metric=estimator) + ranking_results = Result(metric=estimator) # Iterate over each transformer model and score it for model in models: - # Select transformer layers to be used: last layer (i.e. output layer) or all of the layers layer_ids = "-1" if layer_aggregator == "lastlayer" else "all" layer_pooling = "mean" if "mean" in layer_aggregator else None @@ -111,13 +109,11 @@ def run( # Single list of embeddings for sequence tagging tasks if self.task_type == "token classification": - embeddings = [word_embedding for sentence_embedding in embeddings - for word_embedding in sentence_embedding] + embeddings = [word for sentence in embeddings for word in sentence] - embedded_layer_ids = embedder.layer_ids model_name = embedder.model_name + embedded_layer_ids = embedder.layer_ids num_layers = embeddings[0].size(0) - layer_scores = [] if gpu_estimation: labels = labels.to(embedder.device) @@ -127,8 +123,9 @@ def run( torch.cuda.empty_cache() # Estimate scores for each layer + layer_scores = [] tqdm_bar_format = '{l_bar}{bar:10}{r_bar}{bar:-10b}' - for layer_id in tqdm(range(num_layers), desc="Estimating Performance", bar_format=tqdm_bar_format): + for layer_id in tqdm(range(num_layers), desc="Transferability Score", bar_format=tqdm_bar_format): # Get the position of the layer index layer_index = embedded_layer_ids[layer_id] @@ -143,26 +140,21 @@ def run( layer_scores.append(score) # Store scores for each layer in the result dictionary - result_dictionary.layer_estimates[model_name] = dict(zip(embedded_layer_ids, layer_scores)) - - # Aggregate scores for each layer - if layer_aggregator in ["layermean", "lastlayer"]: - final_score = layer_scores[0] - elif layer_aggregator == "bestlayer": - final_score = max(layer_scores) - else: - logger.warning(f'Unknown estimator: "{estimator}"') - final_score = 0. - - result_dictionary.add_score(model_name, final_score) - - # Log the scoring information for a model - base_log = f"{model_name}, score: {final_score}" - layer_estimates_log = (f", layerwise scores: {result_dictionary.layer_estimates[model_name]}" - if layer_aggregator == 'bestlayer' else "") - logger.info(base_log + layer_estimates_log) - - return result_dictionary + ranking_results.layerwise_scores[model_name] = dict(zip(embedded_layer_ids, layer_scores)) + + # Aggregate layer scores + final_score = max(layer_scores) if layer_aggregator == "bestlayer" else layer_scores[0] + ranking_results.add_score(model_name, final_score) + + # Log the final score along with scores for each layer + result_log = f"{model_name} estimation: {final_score} ({ranking_results.metric})" + + if layer_aggregator == 'bestlayer': + result_log += f", layerwise scores: {ranking_results.layerwise_scores[model_name]}" + + logger.info(result_log) + + return ranking_results @staticmethod def _preload_transformers(models: List[Union[str, torch.nn.Module]]) -> None: diff --git a/transformer_ranker/utils.py b/transformer_ranker/utils.py index b708630..4c405d0 100644 --- a/transformer_ranker/utils.py +++ b/transformer_ranker/utils.py @@ -97,7 +97,7 @@ def __init__(self, metric: str): """ self.metric = metric self._results: Dict[str, float] = {} - self.layer_estimates: Dict[str, Dict[int, float]] = {} + self.layerwise_scores: Dict[str, Dict[int, float]] = {} @property def results(self) -> Dict[str, float]: @@ -106,19 +106,23 @@ def results(self) -> Dict[str, float]: @property def best_model(self) -> str: - """Return the model with the highest score""" + """Return the highest scoring model""" model_name, _ = max(self.results.items(), key=lambda item: item[1]) return model_name @property def top_three(self) -> Dict[str, float]: - """Return first three model names and scores""" - return {k: self.results[k] for k in list(self.results.keys())[:3]} + """Return three highest scoring models""" + return {k: self.results[k] for k in list(self.results.keys())[:min(3, len(self.results))]} @property def best_layers(self) -> Dict[str, int]: - """Return a dictionary with model name: best layer id""" - return {model: max(values.items(), key=operator.itemgetter(1))[0] for model, values in self.layer_estimates.items()} + """Return a dictionary where each key is a model name and the value is the best layer's ID for that model.""" + best_layers_dict = {} + for model, values in self.layerwise_scores.items(): + best_layer = max(values.items(), key=operator.itemgetter(1))[0] + best_layers_dict[model] = best_layer + return best_layers_dict def add_score(self, model_name, score) -> None: self._results[model_name] = score @@ -126,13 +130,21 @@ def add_score(self, model_name, score) -> None: def append(self, additional_results: "Result") -> None: if isinstance(additional_results, Result): self._results.update(additional_results.results) - self.layer_estimates.update(additional_results.layer_estimates) + self.layerwise_scores.update(additional_results.layerwise_scores) else: raise ValueError(f"Expected an instance of 'Result', but got {type(additional_results).__name__}. " f"Only 'Result' instances can be appended.") - def __str__(self) -> str: - """Return sorted results as a string""" + def _format_results(self) -> str: + """Helper method to return sorted results as a formatted string.""" sorted_results = sorted(self._results.items(), key=lambda item: item[1], reverse=True) - result_lines = [f"Rank {i+1}. {model_name}: {score}" for i, (model_name, score) in enumerate(sorted_results)] + result_lines = [f"Rank {i + 1}. {model_name}: {score}" for i, (model_name, score) in enumerate(sorted_results)] return "\n".join(result_lines) + + def __str__(self) -> str: + """Return sorted results as a string (user-friendly).""" + return self._format_results() + + def __repr__(self) -> str: + """Return sorted results as a string (coder-friendly).""" + return self._format_results()