Skip to content

Commit

Permalink
Fix Windows encoding and limit rounds so Github Actions CI doesn't ru…
Browse files Browse the repository at this point in the history
…n out of memory

Signed-off-by: Patrick Foley <[email protected]>
  • Loading branch information
psfoley committed Aug 25, 2023
1 parent 18fb737 commit acfa1e1
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/interactive-kvasir.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install .
- name: Set proper rendering on Windows
if: matrix.os == 'windows-latest'
run: |
chcp 65001
- name: Interactive API - pytorch_kvasir_unet
run: |
python setup.py build_grpc
Expand Down
30 changes: 17 additions & 13 deletions openfl/federated/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ def ignore_aliases(self, data):
frozen_yaml_path = Path(
f'{yaml_path.parent}/{yaml_path.stem}_{plan.hash[:8]}.yaml')
if frozen_yaml_path.exists():
Plan.logger.info(f'{yaml_path.name} is already frozen')
Plan.logger.info(f'{yaml_path.name} is already frozen'.encode('utf8'))
return
frozen_yaml_path.write_text(dump(config))
frozen_yaml_path.chmod(0o400)
Plan.logger.info(f'{yaml_path.name} frozen successfully')
Plan.logger.info(f'{yaml_path.name} frozen successfully'.encode('utf8'))
else:
yaml_path.write_text(dump(config))

Expand Down Expand Up @@ -109,7 +109,7 @@ def parse(plan_config_path: Path, cols_config_path: Path = None,
if resolve:
Plan.logger.info(
f'Loading DEFAULTS for section [red]{section}[/] '
f'from file [red]{defaults}[/].',
f'from file [red]{defaults}[/].'.encode("utf-8"),
extra={'markup': True})

defaults = Plan.load(Path(defaults))
Expand All @@ -127,7 +127,7 @@ def parse(plan_config_path: Path, cols_config_path: Path = None,
if gandlf_config_path is not None:
Plan.logger.info(
f'Importing GaNDLF Config into plan '
f'from file [red]{gandlf_config_path}[/].',
f'from file [red]{gandlf_config_path}[/].'.encode('utf-8'),
extra={'markup': True})

gandlf_config = Plan.load(Path(gandlf_config_path))
Expand All @@ -154,15 +154,16 @@ def parse(plan_config_path: Path, cols_config_path: Path = None,

Plan.logger.info(
f'Parsing Federated Learning Plan : [green]SUCCESS[/] : '
f'[blue]{plan_config_path}[/].',
f'[blue]{plan_config_path}[/].'.encode('utf-8'),
extra={'markup': True})
Plan.logger.info(dump(plan.config))

return plan

except Exception:
Plan.logger.exception(f'Parsing Federated Learning Plan : '
f'[red]FAILURE[/] : [blue]{plan_config_path}[/].',
f'[red]FAILURE[/] : [blue]{plan_config_path}[/].'
.encode('utf-8'),
extra={'markup': True})
raise

Expand All @@ -182,11 +183,11 @@ def build(template, settings, **override):
module_path = splitext(template)[0]

Plan.logger.info(f'Building [red]🡆[/] Object [red]{class_name}[/] '
f'from [red]{module_path}[/] Module.',
f'from [red]{module_path}[/] Module.'.encode('utf-8'),
extra={'markup': True})
Plan.logger.debug(f'Settings [red]🡆[/] {settings}',
Plan.logger.debug(f'Settings [red]🡆[/] {settings}'.encode('utf-8'),
extra={'markup': True})
Plan.logger.debug(f'Override [red]🡆[/] {override}',
Plan.logger.debug(f'Override [red]🡆[/] {override}'.encode('utf-8'),
extra={'markup': True})

settings.update(**override)
Expand All @@ -210,7 +211,7 @@ def import_(template):
class_name = splitext(template)[1].strip('.')
module_path = splitext(template)[0]
Plan.logger.info(f'Importing [red]🡆[/] Object [red]{class_name}[/] '
f'from [red]{module_path}[/] Module.',
f'from [red]{module_path}[/] Module.'.encode('utf-8'),
extra={'markup': True})
module = import_module(module_path)
instance = getattr(module, class_name)
Expand Down Expand Up @@ -245,7 +246,7 @@ def __init__(self):
def hash(self): # NOQA
"""Generate hash for this instance."""
self.hash_ = sha384(dump(self.config).encode('utf-8'))
Plan.logger.info(f'FL-Plan hash is [blue]{self.hash_.hexdigest()}[/]',
Plan.logger.info(f'FL-Plan hash is [blue]{self.hash_.hexdigest()}[/]'.encode('utf-8'),
extra={'markup': True})

return self.hash_.hexdigest()
Expand Down Expand Up @@ -274,8 +275,11 @@ def get_assigner(self):
aggregation_functions_by_task = self.restore_object('aggregation_function_obj.pkl')
assigner_function = self.restore_object('task_assigner_obj.pkl')
except Exception as exc:
self.logger.error(f'Failed to load aggregation and assigner functions: {exc}')
self.logger.info('Using Task Runner API workflow')
self.logger.error(
f'Failed to load aggregation and assigner functions: {exc}'
.encode('utf-8')
)
self.logger.info('Using Task Runner API workflow'.encode('utf-8'))
if assigner_function:
self.assigner_ = Assigner(
assigner_function=assigner_function,
Expand Down
21 changes: 13 additions & 8 deletions openfl/interface/interactive_api/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ def _initialize_plan(self):
def _assert_experiment_submitted(self):
"""Assure experiment is sent to director and accepted."""
if not self.experiment_submitted:
self.logger.error('The experiment was not submitted to a Director service.')
self.logger.error(
'The experiment was not submitted to a Director service.'
.encode('utf-8')
)
self.logger.error(
'Report the experiment first: '
'use the Experiment.start() method.')
'use the Experiment.start() method.'.encode('utf-8'))
return False
return True

Expand Down Expand Up @@ -128,7 +131,7 @@ def _rebuild_model(self, tensor_dict, upcoming_model_status=ModelStatus.BEST):

warning_msg += f'\nReturn {self.current_model_status} model'

self.logger.warning(warning_msg)
self.logger.warning(warning_msg.encode('utf-8'))

else:
self.task_runner_stub.rebuild_model(tensor_dict, validation=True, device='cpu')
Expand All @@ -145,7 +148,9 @@ def stream_metrics(self, tensorboard_logs: bool = True) -> None:
f'Round {metric_message_dict["round"]}, '
f'collaborator {metric_message_dict["metric_origin"]} '
f'{metric_message_dict["task_name"]} result '
f'{metric_message_dict["metric_name"]}:\t{metric_message_dict["metric_value"]:f}')
f'{metric_message_dict["metric_name"]}:\t{metric_message_dict["metric_value"]:f}'
.encode('utf-8')
)

if tensorboard_logs:
self.write_tensorboard_metric(metric_message_dict)
Expand All @@ -172,7 +177,7 @@ def remove_experiment_data(self):
else:
log_message += 'failed.'

self.logger.info(log_message)
self.logger.info(log_message.encode('utf-8'))

def prepare_workspace_distribution(self, model_provider, task_keeper, data_loader,
task_assigner,
Expand Down Expand Up @@ -242,7 +247,7 @@ def start(self, *, model_provider, task_keeper, data_loader,
pip_install_options
)

self.logger.info('Starting experiment!')
self.logger.info('Starting experiment!'.encode('utf-8'))
self.plan.resolve()
initial_tensor_dict = self._get_initial_tensor_dict(model_provider)
try:
Expand All @@ -256,10 +261,10 @@ def start(self, *, model_provider, task_keeper, data_loader,
self.remove_workspace_archive()

if response.accepted:
self.logger.info('Experiment was submitted to the director!')
self.logger.info('Experiment was submitted to the director!'.encode('utf-8'))
self.experiment_submitted = True
else:
self.logger.info('Experiment could not be submitted to the director.')
self.logger.info('Experiment could not be submitted to the director.'.encode('utf-8'))

def define_task_assigner(self, task_keeper, rounds_to_train):
"""Define task assigner by registered tasks."""
Expand Down
2 changes: 1 addition & 1 deletion openfl/utilities/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def split_tensor_dict_for_holdouts(logger, tensor_dict,
holdout_tensors[tensor_name] = tensors_to_send.pop(tensor_name)
except KeyError:
logger.warn(f'tried to remove tensor: {tensor_name} not present '
f'in the tensor dict')
f'in the tensor dict'.encode('utf-8'))
continue

# filter holdout_types from tensors_to_send and add to holdout_tensors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def run():
fl_experiment.start(model_provider=MI,
task_keeper=task_interface,
data_loader=fed_dataset,
rounds_to_train=2,
rounds_to_train=1,
opt_treatment='CONTINUE_GLOBAL')
fl_experiment.stream_metrics()
best_model = fl_experiment.get_best_model()
Expand Down

0 comments on commit acfa1e1

Please sign in to comment.