Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Shuffle between epochs #456

Merged
merged 25 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmark/mnist/mnist.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 64
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
1 change: 1 addition & 0 deletions benchmark/wildtime_benchmarks/example_pipelines/arxiv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 128
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 96
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 64
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 64
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
1 change: 1 addition & 0 deletions benchmark/wildtime_benchmarks/example_pipelines/fmow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 64
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 64
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 64
shuffle: True
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
84 changes: 69 additions & 15 deletions integrationtests/online_dataset/test_online_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import random
import shutil
import time
from typing import Iterable, Tuple
from typing import Iterable, Optional, Tuple

import grpc
import modyn.storage.internal.grpc.generated.storage_pb2 as storage_pb2
Expand Down Expand Up @@ -275,7 +275,9 @@ def test_dataset_impl(
pipeline_id: int,
trigger_id: int,
items: list[int],
consistency_check: Optional[str] = None,
) -> None:
shuffle = consistency_check is not None and consistency_check == "shuffle"
dataloader, _ = prepare_dataloaders(
pipeline_id,
trigger_id,
Expand All @@ -289,6 +291,7 @@ def test_dataset_impl(
42,
prefetched_partitions,
parallel_prefetch_requests,
shuffle,
None,
None,
)
Expand Down Expand Up @@ -339,6 +342,50 @@ def test_dataset_impl(
if image_bytes not in FIRST_ADDED_IMAGES:
raise ValueError(f"Could not find image {idx} in created images, all_samples = {all_samples}")

if consistency_check is None:
return

print("Iterating again to check across epochs.")

second_samples = []
second_data = []
second_labels = []

for batch_number, batch in enumerate(dataloader):
sample_ids = batch[0]
if isinstance(sample_ids, torch.Tensor):
sample_ids = sample_ids.tolist()
elif isinstance(sample_ids, tuple):
sample_ids = list(sample_ids)

assert isinstance(sample_ids, list), "Cannot parse result from DataLoader"
assert isinstance(batch[1], torch.Tensor) and isinstance(batch[2], torch.Tensor)

second_samples.extend(sample_ids)
for sample in batch[1]:
second_data.append(sample) # iterate over batch dimension to extract samples
second_labels.extend(batch[2].tolist())

assert consistency_check in ["twice", "shuffle"]

if consistency_check == "twice":
# Validate exact same order
assert second_samples == all_samples
assert all(torch.allclose(data1, data2) for data1, data2 in zip(second_data, all_data))
assert second_labels == all_labels

if consistency_check == "shuffle":
# Same content, but not same order
assert second_samples != all_samples
assert not all(torch.allclose(data1, data2) for data1, data2 in zip(second_data, all_data))
print([torch.allclose(data1, data2) for data1, data2 in zip(second_data, all_data)])
assert second_labels != all_labels

assert set(second_samples) == set(all_samples)
assert set(second_labels) == set(all_labels)
for data1 in second_data:
assert any(torch.allclose(data1, data2) for data2 in all_data)

MaxiBoether marked this conversation as resolved.
Show resolved Hide resolved

def test_dataset() -> None:
NUM_IMAGES = 10
Expand All @@ -359,22 +406,29 @@ def test_dataset() -> None:
if prefetched_partitions == 5:
ppr_list = [1, 2, 5, 999]

consistency_checks = [None]
if num_dataworkers == 4 and prefetched_partitions in [0, 4]:
consistency_checks = [None, "twice", "shuffle"]

for parallel_prefetch_requests in ppr_list:
for batch_size in [1, 2, 10]:
print(
f"Testing num_workers = {num_dataworkers}, partitions = {prefetched_partitions},"
+ f"batch_size = {batch_size}, parallel_prefetch_requests={parallel_prefetch_requests}"
)
test_dataset_impl(
num_dataworkers,
batch_size,
prefetched_partitions,
parallel_prefetch_requests,
pipeline_id,
trigger_id,
keys,
)
gc.collect()
for consistency_check in consistency_checks:
print(
f"Testing num_workers = {num_dataworkers}, partitions = {prefetched_partitions},"
+ f"batch_size = {batch_size}, parallel_prefetch_requests={parallel_prefetch_requests}"
+ f" consistency_check = {consistency_check}"
)
test_dataset_impl(
num_dataworkers,
batch_size,
prefetched_partitions,
parallel_prefetch_requests,
pipeline_id,
trigger_id,
keys,
consistency_check,
)
gc.collect()


def main() -> None:
Expand Down
1 change: 1 addition & 0 deletions modyn/config/examples/example-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ training:
use_previous_model: True
initial_model: random
batch_size: 64
shuffle: False
optimizers:
- name: "default"
algorithm: "SGD"
Expand Down
6 changes: 6 additions & 0 deletions modyn/config/schema/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ class TrainingConfig(ModynBaseModel):
description="The number of data loader workers on the trainer node that fetch data from storage.", ge=1
)
batch_size: int = Field(description="The batch size to be used during training.", ge=1)
shuffle: bool = Field(
description=(
"If True, we shuffle the order of partitions and the data within each partition at each worker."
"Otherwise, the output order is deterministic."
)
)
use_previous_model: bool = Field(
description=(
"If True, on trigger, we continue training on the model outputted by the previous trigger. If False, "
Expand Down
1 change: 1 addition & 0 deletions modyn/protos/selector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message GetSamplesRequest {
int32 trigger_id = 2;
int32 partition_id = 3;
int32 worker_id = 4;
optional bool shuffle = 5;
MaxiBoether marked this conversation as resolved.
Show resolved Hide resolved
}

message SamplesResponse {
Expand Down
1 change: 1 addition & 0 deletions modyn/protos/trainer_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message StartTrainingRequest {
optional int32 seed = 21;
optional PythonString tokenizer = 22;
int64 num_samples_to_pass = 23;
bool shuffle = 24;
}

message StartTrainingResponse {
Expand Down
76 changes: 38 additions & 38 deletions modyn/selector/internal/grpc/generated/selector_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading