Skip to content

Commit

Permalink
dataset tokenization script improvements (#106)
Browse files Browse the repository at this point in the history
* Update tokenize_dataset.py to use load_dataset instead of load_validation_dataset

* include tqdm

* add split handling functionality

* beauty fix

* arg names & cosmetics

* added scripts/demo_upload_in_chunks.py

* Update tokenization script to upload to HF in chunks

* Uncomment the repo creation

* refactor, cosmetics

* memory usage FIXMEs

* fix memory usage issues

* fix test_tokenization

---------

Co-authored-by: Jett <[email protected]>
Co-authored-by: Siwei Li <[email protected]>
3 people authored Apr 24, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 51a8e57 commit ad2936f
Showing 3 changed files with 182 additions and 116 deletions.
105 changes: 63 additions & 42 deletions scripts/tokenize_dataset.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,100 @@
#!/usr/bin/env python3

import argparse

from datasets import Dataset
from datasets import Dataset, Features, Value, load_dataset
from huggingface_hub import HfApi
from transformers import AutoTokenizer

from delphi.dataset.tokenization import tokenize_dataset
from delphi.eval.utils import load_validation_dataset
from delphi.dataset.tokenization import tokenize_and_upload_split

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="")
parser = argparse.ArgumentParser(description="", allow_abbrev=False)

parser.add_argument(
"--input-dataset-name",
"--in-repo-id",
"-i",
type=str,
required=True,
help="Text dataset from huggingface to tokenize",
)
parser.add_argument(
"--output-dataset-name",
"--feature",
"-f",
type=str,
help="Name of the tokenized dataset to upload to huggingface",
required=True,
help="Name of the column containing text documents in the input dataset",
)
parser.add_argument(
"--tokenizer-name",
"--split",
"-s",
type=str,
help="Name of the tokenizer from huggingface",
required=True,
help="Split of the dataset to be tokenized, supports slicing like 'train[:10%%]'",
)
parser.add_argument(
"--token",
"--out-repo-id",
"-o",
type=str,
help="Hugging Face API token",
required=True,
help="Name of the tokenized dataset to upload to huggingface",
)
parser.add_argument(
"--context-size",
"--tokenizer",
"-r",
type=str,
required=True,
help="Name of the tokenizer from huggingface",
)
parser.add_argument(
"--seq-len",
"-l",
type=int,
default=512,
required=True,
help="Context size of the tokenized dataset as input of the model",
)
parser.add_argument(
"--hf-token",
"-t",
type=str,
help="Hugging Face API token",
)
parser.add_argument(
"--batch-size",
"-b",
type=int,
default=50,
help="Batch size of text inputs into the tokenizer",
help="Size of input into batched tokenization",
)
parser.add_argument(
"--column-name",
type=str,
help="Name of the column containing text documents in the input dataset",
"--chunk-size",
"-c",
type=int,
default=200_000,
help="Size of the parquet chunks uploaded to HuggingFace",
)
args = parser.parse_args()

input_dataset = load_validation_dataset(args.input_dataset_name)
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_name)

if args.column_name:
text_docs = input_dataset[args.column_name]
else:
if len(input_dataset.column_names) > 1:
raise ValueError("There is more than one column in the specified dataset")
text_docs = input_dataset[input_dataset.column_names[0]]

tokenized_dataset = tokenize_dataset(
text_docs,
tokenizer,
context_size=args.context_size,
batch_size=args.batch_size,
)
output_dataset = Dataset.from_dict(
{
"tokens": tokenized_dataset,
}
print(f"Loading dataset '{args.in_repo_id}'...")
in_dataset_split = load_dataset(
args.in_repo_id,
split=args.split,
features=Features({args.feature: Value("string")}),
)
assert isinstance(in_dataset_split, Dataset)
print(f"Loading tokenizer '{args.tokenizer}'...")
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer)
assert tokenizer.bos_token_id is not None, "Tokenizer must have a bos_token_id"
assert tokenizer.eos_token_id is not None, "Tokenizer must have a eos_token_id"

output_dataset.push_to_hub(
repo_id=args.output_dataset_name,
private=False,
token=args.token,
api = HfApi(token=args.hf_token)
api.create_repo(repo_id=args.out_repo_id, repo_type="dataset", exist_ok=True)
tokenize_and_upload_split(
dataset_split=in_dataset_split,
split_name=args.split.split("[")[0],
tokenizer=tokenizer,
seq_len=args.seq_len,
batch_size=args.batch_size,
chunk_size=args.chunk_size,
out_repo_id=args.out_repo_id,
api=api,
)
125 changes: 81 additions & 44 deletions src/delphi/dataset/tokenization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import io
from collections import deque
from typing import Optional
from collections.abc import Generator

from datasets import Dataset
from huggingface_hub import HfApi
from tqdm.auto import trange
from transformers import PreTrainedTokenizerBase


def extend_deque(
dq: deque[int],
deq: deque[int],
context_size: int,
text_documents: list[str],
dataset: Dataset,
doc_idx: int,
tokenizer: PreTrainedTokenizerBase,
batch_size: int,
@@ -25,61 +29,54 @@ def extend_deque(
text_documents: List of (untokenized) text documents to be tokenized.
doc_idx: Index of the current text story.
tokenizer: Tokenizer to encode the text strings.
batch_size: The size of input into batched tokenization.
Returns:
int: Updated index in the text documents dataset.
"""
while len(dq) < context_size and doc_idx < len(text_documents):
text_doc = text_documents[doc_idx : doc_idx + batch_size]
feature = dataset.column_names[0]
while len(deq) < context_size and doc_idx < len(dataset):
documents = dataset[doc_idx : doc_idx + batch_size][feature]
batch_input_ids = tokenizer(
text_doc, return_attention_mask=False, add_special_tokens=False
documents, return_attention_mask=False, add_special_tokens=False
)["input_ids"]
for input_ids in batch_input_ids:
dq.extend(input_ids + [tokenizer.eos_token_id])
for input_ids in batch_input_ids: # type: ignore
deq.extend(input_ids + [tokenizer.eos_token_id])
doc_idx += batch_size
return doc_idx


def make_new_samples(
dq: deque[int], context_size: int, bos_token_id: int
) -> list[list[int]]:
def make_new_sample(deq: deque[int], context_size: int, bos_token_id: int) -> list[int]:
"""
Generates new samples for training by creating sequences of tokens
from the deque until the deque does not hold enough tokens to generate
another sample.
Generates new sample for training by creating sequence of tokens
from the deque until the deque.
Note: the model is unable to use the last token in an input sequence,
so we repeat this token in the next input sequence.
Args:
dq: Deque containing tokenized tokens.
deq: Deque containing tokenized tokens.
context_size: Size of the context (input sequences).
bos_token_id: bos_token_id of the tokenizer used.
Returns:
list[list[int]]: List of token sequences of the same length(context_size).
list[int]: token sequence.
"""

samples = []
while len(dq) >= context_size:
sample = [bos_token_id]

# For the first (n-1) elements, pop from the left of the deque
# and add to the new sample, the n-th element will be retained
# in the deque for making the next sample.
for _ in range(context_size - 1):
sample.append(dq.popleft())
sample.append(dq[0])

samples.append(sample)
return samples
sample = [bos_token_id]
# For the first (n-1) elements, pop from the left of the deque
# and add to the new sample, the n-th element will be retained
# in the deque for making the next sample.
for _ in range(context_size - 1):
sample.append(deq.popleft())
sample.append(deq[0])
return sample


def tokenize_dataset(
text_documents: list[str],
dataset: Dataset,
tokenizer: PreTrainedTokenizerBase,
context_size: int,
seq_len: int,
batch_size: int,
) -> list[list[int]]:
) -> Generator[list[int], None, None]:
"""
Tokenizes the input text documents using the provided tokenizer and
generates token sequences of the specified length.
@@ -88,20 +85,60 @@ def tokenize_dataset(
text_documents: List[str],
tokenizer,
context_size,
batch_size: The size of input into batched tokenization.
Returns:
list[list[int]]: List of token sequences of length equal to context_size.
oken sequences of length equal to context_size.
"""

dq = deque()
assert tokenizer.bos_token_id is not None
deq = deque()
doc_idx = 0
samples = []
# iterate through the text documents and tokenize them
while doc_idx < len(dataset):
doc_idx = extend_deque(deq, seq_len, dataset, doc_idx, tokenizer, batch_size)
yield make_new_sample(deq, seq_len, tokenizer.bos_token_id)
# We discard the last chunk, so no processing on the remainder of the deque here

while doc_idx < len(text_documents):
doc_idx = extend_deque(
dq, context_size, text_documents, doc_idx, tokenizer, batch_size
)
samples.extend(make_new_samples(dq, context_size, tokenizer.bos_token_id))

# We discard the last chunk, so no processing on the remainder of the deque here
return samples
def tokenize_and_upload_split(
dataset_split: Dataset,
split_name: str,
tokenizer: PreTrainedTokenizerBase,
seq_len: int,
batch_size: int,
chunk_size: int,
out_repo_id: str,
api: HfApi,
):
seq_gen = tokenize_dataset(
dataset_split,
tokenizer,
seq_len=seq_len,
batch_size=batch_size,
)
seq_it = iter(seq_gen)
print(f"Tokenizing {split_name=}...")
chunk_idx = 0
done = False
while not done:
tokens = []
print(f"Processing chunk {chunk_idx}...")
for _ in trange(chunk_size):
try:
tokens.append(next(seq_it))
except StopIteration:
done = True
break
ds_chunk = Dataset.from_dict({"tokens": tokens})
ds_parquet_chunk = io.BytesIO()
ds_chunk.to_parquet(ds_parquet_chunk)
chunk_name = f"{split_name}-{chunk_idx:05}.parquet"
print(f"Uploading {chunk_name}...")
api.upload_file(
path_or_fileobj=ds_parquet_chunk,
path_in_repo=f"data/{chunk_name}",
repo_id=out_repo_id,
repo_type="dataset",
)
chunk_idx += 1
print("Done.")
Original file line number Diff line number Diff line change
@@ -2,50 +2,59 @@
import random

import pytest
from datasets import Dataset
from transformers import AutoTokenizer

from delphi.dataset.tokenization import extend_deque, make_new_samples, tokenize_dataset
from delphi.dataset.tokenization import extend_deque, make_new_sample, tokenize_dataset


@pytest.fixture
def tokenizer():
return AutoTokenizer.from_pretrained("delphi-suite/stories-tokenizer")


def make_random_document(tokenizer):
all_token_ids = range(2, tokenizer.vocab_size)
n_tokens = random.randint(100, 800)
random_tokens = random.choices(all_token_ids, k=n_tokens)
return tokenizer.decode(random_tokens)


def get_random_feature_name():
return "".join(random.choices("abcdefghijklmnopqrstuvwxyz", k=10))


def test_extend_deque(tokenizer):
CTX_SIZE = 10
BATCH_SIZE = 2
# generate 100 random stories
text_stories = [
" ".join(
[
tokenizer.decode(random.randint(3, tokenizer.vocab_size))
for _ in range(random.randint(100, 800))
]
)
for _ in range(100)
]
documents = [make_random_document(tokenizer) for _ in range(100)]
feature_name = get_random_feature_name()
dataset = Dataset.from_dict({feature_name: documents})

prompt_idx = 0
dq = collections.deque()
deq = collections.deque()

while prompt_idx < len(text_stories):
while prompt_idx < len(dataset):
prompt_idx = extend_deque(
dq, CTX_SIZE, text_stories, prompt_idx, tokenizer, BATCH_SIZE
deq, CTX_SIZE, dataset, prompt_idx, tokenizer, BATCH_SIZE
)
if prompt_idx < len(text_stories) - 1:
if prompt_idx < len(dataset) - 1:
# assert that the deque has grown large enough in each round
assert len(dq) >= CTX_SIZE
while len(dq) >= CTX_SIZE:
assert len(deq) >= CTX_SIZE
while len(deq) >= CTX_SIZE:
for _ in range(CTX_SIZE - 1):
dq.popleft()
deq.popleft()


def test_make_new_sample(tokenizer):
for _ in range(100):
total_tokens = random.randint(100, 1000)
context_size = random.randint(5, total_tokens // 2)
dq = collections.deque(random.choices(range(3, 1000), k=total_tokens))
samples = make_new_samples(dq, context_size, tokenizer.bos_token_id)
samples = []
while len(dq) >= context_size:
samples.append(make_new_sample(dq, context_size, tokenizer.bos_token_id))
tokens_cnt = 0
for i, sample in enumerate(samples):
assert sample[0] == tokenizer.bos_token_id
@@ -67,22 +76,21 @@ def test_tokenize_dataset(tokenizer):
CTX_SIZE = 10
BATCH_SIZE = 2

text_stories = [
documents = [
"Once upon a",
"Mother woke up alert. She put on her coat",
"Once upon a time, in a small town, there was a weird",
"Once upon a time, there was a",
"Sara and Tom are friends. They like to play in the park.",
]
correct_batches = [
[1, 432, 440, 261, 2, 367, 501, 1917, 372, 3398, 4037],
[1, 4037, 341, 577, 359, 342, 1854, 2, 432, 440, 261],
[1, 261, 403, 4045, 317, 261, 560, 1000, 4045, 406, 286],
[1, 286, 261, 2567, 2, 432, 440, 261, 403, 4045, 406],
[1, 406, 286, 261, 2, 787, 269, 396, 484, 415, 4037],
[1, 4037, 311, 519, 268, 326, 317, 264, 525, 4037, 2],
feature_name = get_random_feature_name()
dataset = Dataset.from_dict({feature_name: documents})
expected = [
[0, 431, 440, 260, 1, 46, 499, 1945, 368, 3443, 15],
[0, 15, 340, 576, 355, 337, 1887, 1, 431, 440, 260],
[0, 260, 399, 13, 314, 260, 560, 1005, 13, 402, 284],
[0, 284, 260, 2606, 1, 431, 440, 260, 399, 13, 402],
[0, 402, 284, 260, 1, 1370, 268, 415, 484, 412, 15],
]
assert (
tokenize_dataset(text_stories, tokenizer, CTX_SIZE, BATCH_SIZE)
== correct_batches
)
actual = [x for x in tokenize_dataset(dataset, tokenizer, CTX_SIZE, BATCH_SIZE)]
assert actual == expected

0 comments on commit ad2936f

Please sign in to comment.