Skip to content

Commit

Permalink
Merge branch 'main' into dbe/add_more_logging_feddgga
Browse files Browse the repository at this point in the history
  • Loading branch information
emersodb committed Nov 11, 2024
2 parents 5ae4210 + 9f6ac72 commit 8c0430b
Show file tree
Hide file tree
Showing 24 changed files with 114 additions and 61 deletions.
17 changes: 11 additions & 6 deletions fl4health/utils/partitioners.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ def partition_label_indices(
# Dropping the last partition as they are "excess" indices
return partitioned_indices[:-1], min_samples, partition_allocations

def partition_dataset(self, original_dataset: D, max_retries: int = 5) -> Tuple[List[D], Dict[T, np.ndarray]]:
def partition_dataset(
self, original_dataset: D, max_retries: Optional[int] = 5
) -> Tuple[List[D], Dict[T, np.ndarray]]:
"""
Attempts partitioning of the original dataset up to max_retries times. Retries are potentially required if
the user requests a minimum number of labels be assigned to each of the partitions. If the drawn Dirichlet
Expand All @@ -157,16 +159,19 @@ def partition_dataset(self, original_dataset: D, max_retries: int = 5) -> Tuple[
Args:
original_dataset (D): The dataset to be partitioned
max_retries (int, optional): Number of times to attempt to satisfy a user provided minimum
label-associated data points per partition. Defaults to 5.
max_retries (Optional[int], optional): Number of times to attempt to satisfy a user provided minimum
label-associated data points per partition. Set this value to None if you want to retry indefinitely.
Defaults to 5.
Raises:
ValueError: Throws this error if the retries have been exhausted and the user provided minimum is not met.
Returns:
List[D]: The partitioned datasets, length should correspond to self.number_of_partitions
Dict[T, np.ndarray]: The Dirichlet distribution used to partition the data points for each label.
Tuple[List[D], Dict[T, np.ndarray]]: List[D] is the partitioned datasets, length should correspond to
self.number_of_partitions. Dict[T, np.ndarray] is the Dirichlet distribution used to partition the data
points for each label.
"""

targets = original_dataset.targets
assert targets is not None, "A label-based partitioner requires targets but this dataset has no targets"
partitioned_indices = [torch.Tensor([]).int() for _ in range(self.number_of_partitions)]
Expand Down Expand Up @@ -195,7 +200,7 @@ def partition_dataset(self, original_dataset: D, max_retries: int = 5) -> Tuple[
f"minimum requested was {self.min_label_examples}. Resampling the partition..."
),
)
if partition_attempts == max_retries:
if max_retries is not None and partition_attempts >= max_retries:
raise ValueError(
(
f"Max Retries: {max_retries} reached. Partitioning failed to "
Expand Down
34 changes: 27 additions & 7 deletions fl4health/utils/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,43 @@
from flwr.common.logger import log


def set_all_random_seeds(seed: Optional[int] = 42) -> None:
"""Set seeds for python random, numpy random, and pytorch random.
def set_all_random_seeds(
seed: Optional[int] = 42, use_deterministic_torch_algos: bool = False, disable_torch_benchmarking: bool = False
) -> None:
"""
Set seeds for python random, numpy random, and pytorch random. It also offers the option to force pytorch to use
deterministic algorithm for certain methods and layers see:
https://pytorch.org/docs/stable/generated/torch.use_deterministic_algorithms.html) for more details. Finally, it
allows one to disable cuda benchmarking, which can also affect the determinism of pytorch training outside of
random seeding. For more information on reproducibility in pytorch see:
https://pytorch.org/docs/stable/notes/randomness.html
Will no-op if seed is `None`.
NOTE: If the use_deterministic_torch_algos flag is set to True, you may need to set the environment variable
CUBLAS_WORKSPACE_CONFIG to something like :4096:8, to avoid CUDA errors. Additional documentation may be found
here: https://docs.nvidia.com/cuda/cublas/index.html#results-reproducibility
Args:
seed (int): The seed value to be used for random number generators. Default is 42.
seed (Optional[int], optional): The seed value to be used for random number generators. Default is 42. Seed
setting will no-op if the seed is explicitly set to None
use_deterministic_torch_algos (bool, optional): Whether or not to set torch.use_deterministic_algorithms to
True. Defaults to False.
disable_torch_benchmarking (bool, optional): Whether to explicitly disable cuda benchmarking in
torch processes. Defaults to False.
"""
if seed is None:
log(INFO, "No seed provided. Using random seed.")
else:
log(INFO, f"Setting seed to {seed} and fixing torch determinism")
log(INFO, f"Setting random seeds to {seed}.")
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.use_deterministic_algorithms(True)
if use_deterministic_torch_algos:
log(INFO, "Setting torch.use_deterministic_algorithms to True.")
# warn_only is set to true so that layers and components without deterministic algorithms available will
# warn the user that they don't exist, but won't take down the process with an exception.
torch.use_deterministic_algorithms(True, warn_only=True)
if disable_torch_benchmarking:
log(INFO, "Disabling CUDA algorithm benchmarking.")
torch.backends.cudnn.benchmark = False


Expand All @@ -37,7 +58,6 @@ def unset_all_random_seeds() -> None:
np.random.seed(None)
torch.seed()
torch.use_deterministic_algorithms(False)
torch.backends.cudnn.benchmark = True


def generate_hash(length: int = 8) -> str:
Expand Down
4 changes: 2 additions & 2 deletions research/cifar10/adaptive_pfl/ditto/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get_optimizer(self, config: Config) -> Dict[str, Optimizer]:
return {"global": global_optimizer, "local": local_optimizer}

def get_model(self, config: Config) -> nn.Module:
return ConvNet(in_channels=3, use_bn=False, dropout=0.1).to(self.device)
return ConvNet(in_channels=3, use_bn=False, dropout=0.1, hidden=512).to(self.device)


if __name__ == "__main__":
Expand Down Expand Up @@ -130,7 +130,7 @@ def get_model(self, config: Config) -> nn.Module:
log(INFO, f"Beta: {args.beta}")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

# Adding extensive checkpointing for the client
checkpoint_dir = os.path.join(args.artifact_dir, args.run_name)
Expand Down
8 changes: 6 additions & 2 deletions research/cifar10/adaptive_pfl/ditto/run_fold_experiment.slrm
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ if [[ "${SLURM_JOB_PARTITION}" == "t4v2" ]] || \
export NCCL_SOCKET_IFNAME=bond0
fi

# This environment variable must be set in order to force torch to use determinsitic algorithms. See documentation
# in fl4health/utils/random.py for more information
export CUBLAS_WORKSPACE_CONFIG=:4096:8

# Process Inputs

SERVER_CONFIG_PATH=$1
Expand Down Expand Up @@ -132,15 +136,15 @@ do
--config_path ${SERVER_CONFIG_PATH} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
--use_adaptation \
> ${SERVER_OUTPUT_FILE} 2>&1 &
else
nohup python -m research.cifar10.adaptive_pfl.ditto.server \
--config_path ${SERVER_CONFIG_PATH} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
> ${SERVER_OUTPUT_FILE} 2>&1 &
fi

Expand Down
6 changes: 3 additions & 3 deletions research/cifar10/adaptive_pfl/ditto/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei

client_manager = SimpleClientManager()
# Initializing the model on the server side
model = ConvNet(in_channels=3, use_bn=False, dropout=0.1)
model = ConvNet(in_channels=3, use_bn=False, dropout=0.1, hidden=512)
# Server performs simple FedAveraging as its server-side optimization strategy
strategy = FedAvgWithAdaptiveConstraint(
min_fit_clients=config["n_clients"],
Expand Down Expand Up @@ -101,7 +101,7 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei
required=False,
)
parser.add_argument(
"--lam", action="store", type=float, help="Ditto loss weight for local model training", default=0.01
"--lambda", action="store", type=float, help="Ditto loss weight for local model training", default=0.01
)
parser.add_argument(
"--use_adaptation",
Expand All @@ -118,6 +118,6 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei
log(INFO, "Adapting the loss weight for model drift via global model loss")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

main(config, args.server_address, args.lam, args.use_adaptation)
4 changes: 2 additions & 2 deletions research/cifar10/adaptive_pfl/fedprox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_optimizer(self, config: Config) -> Optimizer:
return torch.optim.AdamW(self.model.parameters(), lr=self.learning_rate)

def get_model(self, config: Config) -> nn.Module:
return ConvNet(in_channels=3, use_bn=False, dropout=0.1).to(self.device)
return ConvNet(in_channels=3, use_bn=False, dropout=0.1, hidden=512).to(self.device)


if __name__ == "__main__":
Expand Down Expand Up @@ -134,7 +134,7 @@ def get_model(self, config: Config) -> nn.Module:
log(INFO, f"Beta: {args.beta}")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

# Adding extensive checkpointing for the client
checkpoint_dir = os.path.join(args.artifact_dir, args.run_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ if [[ "${SLURM_JOB_PARTITION}" == "t4v2" ]] || \
export NCCL_SOCKET_IFNAME=bond0
fi

# This environment variable must be set in order to force torch to use determinsitic algorithms. See documentation
# in fl4health/utils/random.py for more information
export CUBLAS_WORKSPACE_CONFIG=:4096:8

# Process Inputs

SERVER_CONFIG_PATH=$1
Expand Down Expand Up @@ -134,7 +138,7 @@ do
--run_name ${RUN_NAME} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
--use_adaptation \
> ${SERVER_OUTPUT_FILE} 2>&1 &
else
Expand All @@ -144,7 +148,7 @@ do
--run_name ${RUN_NAME} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
> ${SERVER_OUTPUT_FILE} 2>&1 &
fi

Expand Down
6 changes: 3 additions & 3 deletions research/cifar10/adaptive_pfl/fedprox/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main(

client_manager = SimpleClientManager()
# Initializing the model on the server side
model = ConvNet(in_channels=3, use_bn=False, dropout=0.1)
model = ConvNet(in_channels=3, use_bn=False, dropout=0.1, hidden=512)
# Server performs simple FedAveraging as its server-side optimization strategy
strategy = FedAvgWithAdaptiveConstraint(
min_fit_clients=config["n_clients"],
Expand Down Expand Up @@ -136,7 +136,7 @@ def main(
required=False,
)
parser.add_argument(
"--lam", action="store", type=float, help="FedProx loss weight for local model training", default=0.01
"--lambda", action="store", type=float, help="FedProx loss weight for local model training", default=0.01
)
parser.add_argument(
"--use_adaptation",
Expand All @@ -153,6 +153,6 @@ def main(
log(INFO, "Adapting the loss weight for model drift via model loss")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

main(config, args.server_address, args.artifact_dir, args.run_name, args.lam, args.use_adaptation)
4 changes: 2 additions & 2 deletions research/cifar10/adaptive_pfl/fenda_ditto/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get_optimizer(self, config: Config) -> Dict[str, Optimizer]:
return {"global": global_optimizer, "local": local_optimizer}

def get_model(self, config: Config) -> FendaModel:
return ConvNetFendaModel(in_channels=3, use_bn=False).to(self.device)
return ConvNetFendaModel(in_channels=3, use_bn=False, dropout=0.1, hidden=512).to(self.device)

def get_global_model(self, config: Config) -> SequentiallySplitModel:
return ConvNetFendaDittoGlobalModel(in_channels=3, use_bn=False, dropout=0.1).to(self.device)
Expand Down Expand Up @@ -144,7 +144,7 @@ def get_global_model(self, config: Config) -> SequentiallySplitModel:
log(INFO, "Freezing the global feature extractor of the FENDA model")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

# Adding extensive checkpointing for the client
checkpoint_dir = os.path.join(args.artifact_dir, args.run_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ if [[ "${SLURM_JOB_PARTITION}" == "t4v2" ]] || \
export NCCL_SOCKET_IFNAME=bond0
fi

# This environment variable must be set in order to force torch to use determinsitic algorithms. See documentation
# in fl4health/utils/random.py for more information
export CUBLAS_WORKSPACE_CONFIG=:4096:8

# Process Inputs

SERVER_CONFIG_PATH=$1
Expand Down Expand Up @@ -135,15 +139,15 @@ do
--config_path ${SERVER_CONFIG_PATH} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
--use_adaptation \
> ${SERVER_OUTPUT_FILE} 2>&1 &
else
nohup python -m research.cifar10.adaptive_pfl.fenda_ditto.server \
--config_path ${SERVER_CONFIG_PATH} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
> ${SERVER_OUTPUT_FILE} 2>&1 &
fi

Expand Down
6 changes: 3 additions & 3 deletions research/cifar10/adaptive_pfl/fenda_ditto/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei

client_manager = SimpleClientManager()
# Initializing the model on the server side
model = ConvNetFendaDittoGlobalModel(in_channels=3, use_bn=False, dropout=0.1)
model = ConvNetFendaDittoGlobalModel(in_channels=3, use_bn=False, dropout=0.1, hidden=512)
# Server performs simple FedAveraging as its server-side optimization strategy
strategy = FedAvgWithAdaptiveConstraint(
min_fit_clients=config["n_clients"],
Expand Down Expand Up @@ -101,7 +101,7 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei
required=False,
)
parser.add_argument(
"--lam", action="store", type=float, help="FENDA Ditto loss weight for local model training", default=0.01
"--lambda", action="store", type=float, help="FENDA Ditto loss weight for local model training", default=0.01
)
parser.add_argument(
"--use_adaptation",
Expand All @@ -118,6 +118,6 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei
log(INFO, "Adapting the loss weight for model drift via global model loss")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

main(config, args.server_address, args.lam, args.use_adaptation)
4 changes: 2 additions & 2 deletions research/cifar10/adaptive_pfl/mrmtl/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_optimizer(self, config: Config) -> Optimizer:
return torch.optim.SGD(self.model.parameters(), lr=self.learning_rate)

def get_model(self, config: Config) -> nn.Module:
return ConvNet(in_channels=3, use_bn=False, dropout=0.1).to(self.device)
return ConvNet(in_channels=3, use_bn=False, dropout=0.1, hidden=512).to(self.device)


if __name__ == "__main__":
Expand Down Expand Up @@ -128,7 +128,7 @@ def get_model(self, config: Config) -> nn.Module:
log(INFO, f"Beta: {args.beta}")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

# Adding extensive checkpointing for the client
checkpoint_dir = os.path.join(args.artifact_dir, args.run_name)
Expand Down
8 changes: 6 additions & 2 deletions research/cifar10/adaptive_pfl/mrmtl/run_fold_experiment.slrm
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ if [[ "${SLURM_JOB_PARTITION}" == "t4v2" ]] || \
export NCCL_SOCKET_IFNAME=bond0
fi

# This environment variable must be set in order to force torch to use determinsitic algorithms. See documentation
# in fl4health/utils/random.py for more information
export CUBLAS_WORKSPACE_CONFIG=:4096:8

# Process Inputs

SERVER_CONFIG_PATH=$1
Expand Down Expand Up @@ -132,15 +136,15 @@ do
--config_path ${SERVER_CONFIG_PATH} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
--use_adaptation \
> ${SERVER_OUTPUT_FILE} 2>&1 &
else
nohup python -m research.cifar10.adaptive_pfl.mrmtl.server \
--config_path ${SERVER_CONFIG_PATH} \
--server_address ${SERVER_ADDRESS} \
--seed ${SEED} \
--lam ${LAM_VALUE} \
--lambda ${LAM_VALUE} \
> ${SERVER_OUTPUT_FILE} 2>&1 &
fi

Expand Down
6 changes: 3 additions & 3 deletions research/cifar10/adaptive_pfl/mrmtl/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei

client_manager = SimpleClientManager()
# Initializing the model on the server side
model = ConvNet(in_channels=3, use_bn=False, dropout=0.1)
model = ConvNet(in_channels=3, use_bn=False, dropout=0.1, hidden=512)
# Server performs simple FedAveraging as its server-side optimization strategy
strategy = FedAvgWithAdaptiveConstraint(
min_fit_clients=config["n_clients"],
Expand Down Expand Up @@ -125,7 +125,7 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei
required=False,
)
parser.add_argument(
"--lam", action="store", type=float, help="Ditto loss weight for local model training", default=0.01
"--lambda", action="store", type=float, help="Ditto loss weight for local model training", default=0.01
)
parser.add_argument(
"--use_adaptation",
Expand All @@ -142,6 +142,6 @@ def main(config: Dict[str, Any], server_address: str, lam: float, adapt_loss_wei
log(INFO, "Adapting the loss weight for model drift via global model loss")

# Set the random seed for reproducibility
set_all_random_seeds(args.seed)
set_all_random_seeds(args.seed, use_deterministic_torch_algos=True, disable_torch_benchmarking=True)

main(config, args.server_address, args.lam, args.use_adaptation)
Loading

0 comments on commit 8c0430b

Please sign in to comment.