Skip to content

Commit

Permalink
Merge pull request #5 from owczr/develop
Browse files Browse the repository at this point in the history
Add distributed strategy for training
  • Loading branch information
owczr authored Jan 7, 2024
2 parents b4e48ba + 3c11ef6 commit eafad6f
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 36 deletions.
27 changes: 15 additions & 12 deletions scripts/azure/machine_learning/run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,13 @@ def connect_to_workspace():

def get_compute(ml_client):
cpu_compute_target = os.getenv("AZURE_COMPUTE_TARGET")
size = os.getenv("AZURE_COMPUTE_SIZE")
min_instances = os.getenv("AZURE_COMPUTE_MIN_INSTANCES")
max_instances = os.getenv("AZURE_COMPUTE_MAX_INSTANCES")

try:
ml_client.compute.get(cpu_compute_target)
except Exception:
click.echo(f"Compute {cpu_compute_target} not found.")

def submit_job(ml_client, model, optimizer, loss, epochs, batch_size):
def submit_job(ml_client, model, optimizer, loss, epochs, batch_size, distributed):
code = os.getenv("AZURE_CODE_PATH")
environment = os.getenv("AZURE_ENVIRONMENT")
type_ = os.getenv("AZURE_STORAGE_TYPE")
Expand All @@ -53,15 +50,19 @@ def submit_job(ml_client, model, optimizer, loss, epochs, batch_size):

job_name = f"train_{model}_{datetime.now().strftime('%Y%m%d%H%M%S')}"

command_var = (
"python -m scripts.azure.machine_learning.train"
" --train ${{inputs.train}} --test ${{inputs.test}}"
" --epochs ${{inputs.epochs}} --optimizer ${{inputs.optimizer}}"
" --loss ${{inputs.loss}} --batch_size ${{inputs.batch_size}}"
" --model ${{inputs.model}} --job_name ${{inputs.job_name}}"
)

command_var = command_var + " --distributed" if distributed else command_var

command_job = command(
code=code,
command=(
"python -m scripts.azure.machine_learning.train"
" --train ${{inputs.train}} --test ${{inputs.test}}"
" --epochs ${{inputs.epochs}} --optimizer ${{inputs.optimizer}}"
" --loss ${{inputs.loss}} --batch_size ${{inputs.batch_size}}"
" --model ${{inputs.model}} --job_name ${{inputs.job_name}}"
),
command=command_var,
environment=environment,
inputs={
"train": Input(
Expand Down Expand Up @@ -115,7 +116,8 @@ def register_model(ml_client, returned_job, run_name, run_description):
)
@click.option("--epochs", type=int, default=10, help="Number of epochs to train for")
@click.option("--batch_size", type=int, default=32, help="Batch size to use")
def run(model, optimizer, loss, epochs, batch_size):
@click.option("--distributed", is_flag=True, help="Use distributed startegy")
def run(model, optimizer, loss, epochs, batch_size, distributed):
if model not in MODELS:
raise ValueError(f"Model {model} not supported")

Expand All @@ -130,6 +132,7 @@ def run(model, optimizer, loss, epochs, batch_size):
loss=loss,
epochs=epochs,
batch_size=batch_size,
distributed=distributed,
)

click.echo(
Expand Down
7 changes: 7 additions & 0 deletions scripts/azure/machine_learning/test_distributed_mobilenet.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
python -m scripts.azure.machine_learning.run_job \
--model mobilenet \
--optimizer adam \
--loss binary_crossentropy \
--epochs 2 \
--batch_size 128 \
--distributed
18 changes: 18 additions & 0 deletions scripts/azure/machine_learning/test_distributed_models.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh

OPTIMIZER="adam"
LOSS="binary_crossentropy"
EPOCHS=5
BATCH_SIZE=128

for model in mobilenet nasnet efficientnet efficientnetv2 densenet inceptionnet xception resnet resnetv2 convnext inceptionresnet vgg;
do
python -m scripts.azure.machine_learning.run_job \
--model "$model" \
--optimizer "$OPTIMIZER" \
--loss "$LOSS" \
--epochs "$EPOCHS" \
--batch_size "$BATCH_SIZE" \
--distributed
done

2 changes: 1 addition & 1 deletion scripts/azure/machine_learning/test_mobilenet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ python -m scripts.azure.machine_learning.run_job \
--optimizer adam \
--loss binary_crossentropy \
--epochs 2 \
--batch_size 64
--batch_size 16
2 changes: 1 addition & 1 deletion scripts/azure/machine_learning/test_models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

OPTIMIZER="adam"
LOSS="binary_crossentropy"
EPOCHS=1
EPOCHS=5
BATCH_SIZE=32

for model in mobilenet nasnet efficientnet efficientnetv2 densenet inceptionnet xception resnet resnetv2 convnext inceptionresnet vgg;
Expand Down
52 changes: 41 additions & 11 deletions scripts/azure/machine_learning/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
MODELS,
BUILDERS,
CALLBACKS,
METRICS,
METRICS,
config_logging
)

Expand All @@ -27,6 +27,39 @@
logger = logging.getLogger("azure")


def get_compiled_model(model, optimizer, loss):
builder = BUILDERS[model]()

director = ModelDirector(builder)
model_nn = director.make()
logger.info(f"Built model_nn with {str(builder)}")

optimizer_cls = {
"adam": tf.keras.optimizers.Adam,
"sgd": tf.keras.optimizers.SGD,
}[optimizer]()

loss_cls = {
"binary_crossentropy": tf.keras.losses.BinaryCrossentropy,
"categorical_crossentropy": tf.keras.losses.CategoricalCrossentropy,
}[loss]()

metrics = [metric() for metric in METRICS]

model_nn.compile(optimizer=optimizer_cls, loss=loss_cls, metrics=metrics, run_eagerly=True)
logger.info("Compiled model")

return model_nn


def get_compiled_distributed_model(model, optimizer, loss):
strategy = tf.distribute.MultiWorkerMirroredStrategy()

with strategy.scope():
model_nn = get_compiled_model(model, optimizer, loss)

return model_nn

@click.command()
@click.option(
"--model", type=click.Choice(MODELS), default="mobilenet", help="Model to train"
Expand All @@ -50,7 +83,8 @@
@click.option("--epochs", type=click.INT, default=10, help="Number of epochs to train for")
@click.option("--batch_size", type=click.INT, default=64, help="Batch size for dataset loaders")
@click.option("--job_name", type=click.STRING, help="Azure Machine Learning job name")
def run(model, train, test, optimizer, loss, epochs, batch_size, job_name):
@click.option("--distributed", is_flag=True, help="Use distributed startegy")
def run(model, train, test, optimizer, loss, epochs, batch_size, job_name, distributed):
mlflow.set_experiment("lung-cancer-detection")
mlflow_run = mlflow.start_run(run_name=f"train_{model}_{datetime.now().strftime('%Y%m%d%H%M%S')}")

Expand All @@ -64,12 +98,11 @@ def run(model, train, test, optimizer, loss, epochs, batch_size, job_name):
logger.info(
f"Run parameters - optimizer: {optimizer}, loss: {loss}"
)

builder = BUILDERS[model]()

director = ModelDirector(builder)
model_nn = director.make()
logger.info(f"Built model_nn with {str(builder)}")

if not distributed:
model_nn = get_compiled_model(model, optimizer, loss)
else:
model_nn = get_compiled_distributed_model(model, optimizer, loss)

train_loader = DatasetLoader(train)
test_loader = DatasetLoader(test)
Expand All @@ -81,9 +114,6 @@ def run(model, train, test, optimizer, loss, epochs, batch_size, job_name):
test_dataset = test_loader.get_dataset()
logger.info("Loaded train and test datasets")

model_nn.compile(optimizer=optimizer, loss=loss, metrics=METRICS)
logger.info("Compiled model")

history = model_nn.fit(train_dataset, epochs=epochs, callbacks=CALLBACKS)
logger.info("Trained model")

Expand Down
18 changes: 18 additions & 0 deletions scripts/azure/machine_learning/train_distributed_models.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh

OPTIMIZER="adam"
LOSS="binary_crossentropy"
EPOCHS=100
BATCH_SIZE=128

for model in mobilenet nasnet efficientnet efficientnetv2 densenet inceptionnet xception resnet resnetv2 convnext inceptionresnet vgg;
do
python -m scripts.azure.machine_learning.run_job \
--model "$model" \
--optimizer "$OPTIMIZER" \
--loss "$LOSS" \
--epochs "$EPOCHS" \
--batch_size "$BATCH_SIZE" \
--distributed
done

12 changes: 7 additions & 5 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ def config_logging():
}

_threshold = 0.5

# lambda is needed, because metrics need to be created within strategy scope
METRICS = [
tf.keras.metrics.BinaryAccuracy(threshold=_threshold, name="accuracy"),
tfa.metrics.F1Score(num_classes=1, threshold=_threshold, name="f1"),
tf.keras.metrics.Precision(thresholds=_threshold, name="precision"),
tf.keras.metrics.Recall(thresholds=_threshold, name="recall"),
tf.keras.metrics.AUC(thresholds=[_threshold], curve="ROC", name="roc_auc"),
lambda : tf.keras.metrics.BinaryAccuracy(threshold=_threshold, name="accuracy"),
lambda : tfa.metrics.F1Score(num_classes=1, threshold=_threshold, name="f1"),
lambda : tf.keras.metrics.Precision(thresholds=_threshold, name="precision"),
lambda : tf.keras.metrics.Recall(thresholds=_threshold, name="recall"),
lambda : tf.keras.metrics.AUC(thresholds=[_threshold], curve="ROC", name="roc_auc"),
]

EARLY_STOPPING_CONFIG = {
Expand Down
31 changes: 25 additions & 6 deletions src/dataset/dataset_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,20 @@ def _shuffle(self, paths, labels):

return paths, labels

def _load_batch_data(self, paths_batch):
def _load_batch_data(self, paths_batch, labels_batch):
"""Loads batch of data"""
batch_data = [np.load(path) for path in paths_batch]
return batch_data
# def load(path: str) -> np.ndarray | None:
# try:
# data = np.load(path)
# except ValueError as e:
# logger.error(f"Error while loading {path}.\n{e}")
# return None
#
# batch_data = [(load(path), label) for path, label in zip(paths_batch, labels_batch)]
# batch_data = [(data, label) for data, label in batch_data if data is not None]
#
# return batch_data
return [(np.load(path), label) for path, label in zip(paths_batch, labels_batch)]

def _data_generator(self):
"""Loads and yields batches of data"""
Expand All @@ -89,9 +99,18 @@ def _data_generator(self):
for i in range(0, len(paths), self.batch_size):
paths_batch = paths[i : i + self.batch_size]
labels_batch = labels[i : i + self.batch_size]

batch_data = self._load_batch_data(paths_batch)

try:
batch_data = self._load_batch_data(paths_batch, labels_batch)
except Exception as e:
logger.error(f"Error while loading batch {i}.\n{e}")
continue

logger.info(f"Batch {i} loaded")

yield np.array(batch_data), np.array(labels_batch)
if len(batch_data) == 0:
continue

data_batch, batch_labels = zip(*batch_data)

yield np.array(data_batch), np.array(labels_batch)

0 comments on commit eafad6f

Please sign in to comment.