Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor LocalRuntime, Remove Serialization of Private Attributes, and Collaborator, and Aggregator as a Stateful Actor #791

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
ba21c19
Fix flake8 error in local runtime (#764)
psfoley Mar 20, 2023
10dbfa0
Update ROADMAP.md (#765)
psfoley Mar 20, 2023
4df74ef
Update README.md
operepel Mar 21, 2023
27d3d58
Update GOVERNANCE.md
operepel Mar 30, 2023
c9979ed
Update ROADMAP.md (#785)
joedevon Apr 3, 2023
e65c916
Updated integrations to GaNDLF (#781)
sarthakpati Apr 3, 2023
0107fa6
Update README.md
SprashAI Apr 18, 2023
c41aff2
Fix Flake8 C419 for Ubuntu CI (#800)
akantak Apr 20, 2023
bc5d390
Introduced shard descriptor based collaborator private attributes
ParthM-GitHub Mar 24, 2023
2cc2d68
Adding batch size for train, and test in config.yaml file
ParthM-GitHub Mar 26, 2023
0f071a9
Introducing multiple config yaml files
ParthM-GitHub Mar 29, 2023
dcaa9ab
Removing unnecessary config.yaml file.
ParthM-GitHub Mar 30, 2023
dc8ade6
Added collaborator private atribute delayed initialization
ParthM-GitHub Mar 31, 2023
6b0e23c
Incorporated review comments
ParthM-GitHub Apr 10, 2023
6959cdf
Added multi-pricessing ray backend support and, aggregator yaml file
ParthM-GitHub Apr 11, 2023
e927197
Updated multi-processing code
ParthM-GitHub Apr 12, 2023
42ad481
RayExecutor class moved from participants.py to localruntime.py
ParthM-GitHub Apr 12, 2023
c0caa70
RayExecytor moved from interface/pariticipants.py to
ParthM-GitHub Apr 12, 2023
2366a6f
Added Aggregator private attribute initialation in runtime
ParthM-GitHub Apr 13, 2023
19eea62
Removed unnecessary import statements
ParthM-GitHub Apr 14, 2023
344ff9c
Code cleaned up, validated checkpoints manually
ParthM-GitHub Apr 14, 2023
10b92ff
Refactored, and added some new doc string
ParthM-GitHub Apr 14, 2023
563441a
Resolved Flake8 instructions
ParthM-GitHub Apr 14, 2023
c49bd80
Recusrsion removal + Serialization removal integrated
ParthM-GitHub Apr 18, 2023
96e462d
Incoporated Review Comments
ParthM-GitHub Apr 20, 2023
62603d0
Removed configuration YAML files, and
ParthM-GitHub Apr 21, 2023
14f5874
Removed commented code
ParthM-GitHub Apr 21, 2023
cc128a6
Implemented new approach, two example files given
ParthM-GitHub Apr 25, 2023
94eb51c
Internal Review Comments Incorporated
ParthM-GitHub Apr 26, 2023
2068b60
No private attributes are required
ParthM-GitHub Apr 27, 2023
a1b6d47
Update participants.py
ParthM-GitHub Apr 27, 2023
1a88a27
Added a check for GPU Resource Allocation
ParthM-GitHub Apr 28, 2023
75bd0cb
Modified error message for resource allocation
ParthM-GitHub Apr 28, 2023
a57fe61
Resolved bug found during testing phase
ParthM-GitHub Apr 28, 2023
085facb
Modifide all the test cases, and following tutorials
ParthM-GitHub May 4, 2023
5dc9a10
Added following test cases:
May 4, 2023
cbcdb7c
Modified and Added Global_DP tutorials.
ParthMandaliya May 5, 2023
db7e585
Modified and Added tutorial Workflow-Interface_201_Exclusive_GPUs_wit…
ParthM-GitHub May 5, 2023
bea2a82
Modified documentation for Workflow_Interface_201 tutorial.
ParthM-GitHub May 5, 2023
58f371f
fixed flake-8 errors
KeertiX May 9, 2023
07f3fcb
reverted import module code
May 10, 2023
5821ddd
Resolved merge conflicts in local_runtime.py
psfoley Mar 20, 2023
2d88c3c
Update README.md
psfoley May 12, 2023
e1f0dd1
Fix warnings and issues in docs (#825)
akantak May 17, 2023
7636e8a
Add Logo (#827)
psfoley May 17, 2023
9499f16
Change OpenFL documentation font to improve accessibility (#809)
wangleflex May 19, 2023
49e358b
Update unit tests to improve code coverage (#821)
fangxiaoran May 19, 2023
e270e7b
Add PyTorch linear regression example (#808)
danhe1 May 19, 2023
63839e5
This prints out the hash of the CSR to disk for both the aggregator a…
bjklemme-intel May 19, 2023
6ffab84
Improve workspace requirements import (#810)
danhe1 May 22, 2023
e42cbe0
Issue 506 Added Example using FedProx (#818)
bjklemme-intel May 23, 2023
47bd7c9
[Bug: 768] FX CLI: Separate create, cert gen commands (#807)
bjklemme-intel May 23, 2023
a275294
Add new tutorial example to OpenFL interactive API (#812)
bjklemme-intel May 24, 2023
c5c4f7f
build(deps): bump tensorflow in /openfl-workspace/tf_cnn_histology (#…
dependabot[bot] Jun 13, 2023
849914e
build(deps): bump tensorflow (#777)
dependabot[bot] Jun 13, 2023
83548af
Running a federation with GaNDLF Documentation (#794)
psfoley Jun 29, 2023
2a03504
Fixed GaNDLF rst issues. Add sphinxcontrib-mermaid (#841)
psfoley Jun 29, 2023
eda8b61
Fix GaNDLF documentation links (#842)
psfoley Jun 29, 2023
b5a4add
Fix incorrectly formatted link in docs (#839)
fstrr Jun 29, 2023
3c4fd5d
Resolving merge conflicts in local-runtime.py
ParthM-GitHub Jul 13, 2023
b99b216
build(deps): bump onnx in /openfl-workspace/gandlf_seg_test (#840)
dependabot[bot] Jul 12, 2023
1e7ddc5
Merged changes of remove-torch-dependency branch
ParthM-GitHub Aug 28, 2023
75c6123
Update setup.py
ParthM-GitHub Aug 30, 2023
3600ce2
Resolving merge conflicts
ParthM-GitHub Sep 7, 2023
94767ab
Accessibility updates (#861)
fstrr Jul 14, 2023
fa3b13f
build(deps): bump tensorflow from 2.8.4 to 2.11.1 in /openfl-workspac…
dependabot[bot] Jul 14, 2023
8e4c668
build(deps): bump tensorflow from 2.8.4 to 2.11.1 in /openfl-workspac…
dependabot[bot] Jul 14, 2023
39c894b
build(deps): bump tensorflow from 2.8.4 to 2.11.1 in /openfl-workspac…
dependabot[bot] Jul 14, 2023
480cc76
build(deps): bump tensorflow from 2.9.3 to 2.11.1 in /openfl-tutorial…
dependabot[bot] Jul 14, 2023
417b34c
build(deps): bump tensorflow-cpu from 2.8.4 to 2.11.1 in /openfl-work…
dependabot[bot] Jul 14, 2023
31b72d1
Accessibility color contrast fixes (#864)
fstrr Jul 14, 2023
82a8c2f
Tweak link color so it’s not so aggressive (#865)
fstrr Jul 14, 2023
d77ec12
build(deps): bump tensorflow from 2.8.4 to 2.11.1 in /tests/github/in…
dependabot[bot] Jul 25, 2023
c150c09
build(deps): bump tensorflow from 2.8.4 to 2.11.1 in /openfl-workspac…
dependabot[bot] Jul 27, 2023
d47c598
Update Tensorflow, gRPC, Protobuf dependencies (#868)
psfoley Aug 25, 2023
0ca3340
Add FL plan description to documentation (#872)
mansishr Aug 29, 2023
a9dad25
Resolved flake8 issues
ParthM-GitHub Sep 7, 2023
ba03257
GPU Added for aggregator
ParthM-GitHub Sep 27, 2023
2717983
Resolve Coverity Issues (#874)
psfoley Sep 14, 2023
38085b6
Migrate to Ubuntu 22.04 LTS release (supported through 2027) (#875)
psfoley Sep 21, 2023
caff281
Updated documentation:
scngupta-dsp Sep 25, 2023
d71a8c0
Update
scngupta-dsp Sep 25, 2023
037444b
Update
scngupta-dsp Sep 25, 2023
806fc5c
Update
scngupta-dsp Sep 25, 2023
c36f8e5
Updated
scngupta-dsp Sep 25, 2023
2bd994a
Updated
scngupta-dsp Sep 25, 2023
600c642
Updated documentation
scngupta-dsp Sep 25, 2023
54e1e84
Update workflow_interface.rst
ParthM-GitHub Sep 26, 2023
3750ccf
Added best model and last model extraction technique in docs/workflow…
ParthM-GitHub Sep 27, 2023
f6c8606
Added GPU for aggregator
ParthM-GitHub Sep 27, 2023
820633a
Resolving merge conflicts in 103 cyclic tutorial notebook
kta-intel Jul 14, 2023
f542cdf
Resolved merge conflicts in tests/github/experimental/testflow_datast…
KeertiX Jul 14, 2023
b5c5f2f
Added weighted_average aggregation function under openfl.experimental…
ParthM-GitHub Sep 29, 2023
6a5a3d8
Update EdenPipeline in the documentation (#877)
amitport Sep 28, 2023
a0fbd73
WIP: CI Scans (#873)
psfoley Sep 29, 2023
dd140cf
Update ROADMAP.md (#878)
psfoley Oct 3, 2023
5e8e04b
Merge branch 'develop' into serialization_removal
ParthMandaliya Oct 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 144 additions & 45 deletions docs/workflow_interface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,30 +146,60 @@ A :code:`Runtime` defines where the flow will be executed, who the participants

.. code-block:: python
# Setup participants
aggregator = Aggregator()
aggregator.private_attributes = {}
# Setup collaborators with private attributes
collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']
collaborators = [Collaborator(name=name) for name in collaborator_names]
for idx, collaborator in enumerate(collaborators):
local_train = deepcopy(mnist_train)
local_test = deepcopy(mnist_test)
local_train.data = mnist_train.data[idx::len(collaborators)]
local_train.targets = mnist_train.targets[idx::len(collaborators)]
local_test.data = mnist_test.data[idx::len(collaborators)]
local_test.targets = mnist_test.targets[idx::len(collaborators)]
collaborator.private_attributes = {
'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),
'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)
# Aggregator
aggregator_ = Aggregator()
collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"]
def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):
train = deepcopy(train_dataset)
test = deepcopy(test_dataset)
train.data = train_dataset.data[index::n_collaborators]
train.targets = train_dataset.targets[index::n_collaborators]
test.data = test_dataset.data[index::n_collaborators]
test.targets = test_dataset.targets[index::n_collaborators]
return {
"train_loader": torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True),
"test_loader": torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=True),
}
# This is equivalent to:
# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)
Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` placeholders. These placeholders represent the nodes where tasks will be executed. Each participant placeholder has its own set of :code:`private_attributes`; a dictionary where the key is the name of the attribute, and the value is the object. In the above example, each of the four collaborators ('Portland', 'Seattle', 'Chandler', and 'Bangalore'), have a :code:`train_loader` and `test_loader` that they can access. These private attributes can be named anything, and do not necessarily need to be the same across each participant.
# Setup collaborators private attributes via callable function
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
collaborators.append(
Collaborator(
name=collaborator_name,
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
index=idx,
n_collaborators=len(collaborator_names),
train_dataset=mnist_train,
test_dataset=mnist_test,
batch_size=64
)
)
local_runtime = LocalRuntime(aggregator=aggregator_, collaborators=collaborators)
Let's break this down, starting with the :code:`Aggregator` and :code:`Collaborator` components. These components represent the *Participants* in a Federated Learning experiment. Each participant has its own set of *private attributes* that represent the information / data specific to its role or requirements. As the name suggests these *private attributes* are accessible only to the particular participant, and are appropriately inserted into or filtered out of current Flow state when transferring from between Participants. For e.g. Collaborator private attributes are inserted into :code:`flow` when transitioning from Aggregator to Collaborator and are filtered out when transitioning from Collaborator to Aggregator.

In the above :code:`FederatedFlow`, each collaborator accesses train and test datasets via *private attributes* :code:`train_loader` and :code:`test_loader`. These *private attributes* need to be set using a (user defined) callback function while instantiating the participant. Participant *private attributes* are returned by the callback function in form of a dictionary, where the key is the name of the attribute and the value is the object.

In this example callback function :code:`callable_to_initialize_collaborator_private_attributes()` returns the collaborator private attributes :code:`train_loader` and :code:`test_loader` that are accessed by collaborator steps (:code:`aggregated_model_validation`, :code:`train` and :code:`local_model_validation`). Some important points to remember while creating callback function and private attributes are:

- Callback Function needs to be defined by the user and should return the *private attributes* required by the participant in form of a key/value pair
- In above example multiple collaborators have the same callback function. Depending on the Federated Learning requirements, user can specify unique callback functions for each Participant
- If no Callback Function is specified then the Participant shall not have any *private attributes*
- Callback function can be provided with any parameters required as arguments. In this example, parameters essential for the callback function are supplied with corresponding values bearing *same names* during the instantiation of the Collaborator

* :code:`index`: Index of the particular collaborator needed to shard the dataset
* :code:`n_collaborators`: Total number of collaborators in which the dataset is sharded
* :code:`batch_size`: For the train and test loaders
* :code:`train_dataset`: Train Dataset to be sharded between n_collaborators
* :code:`test_dataset`: Test Dataset to be sharded between n_collaborators

- Callback function needs to be specified by user while instantiating the participant. Callback function is invoked by the OpenFL runtime at the time participant is created and once created these attributes cannot be modified
- Private attributes are accessible only in the Participant steps

Now let's see how the runtime for a flow is assigned, and the flow gets run:

Expand All @@ -184,23 +214,43 @@ And that's it! This will run an instance of the :code:`FederatedFlow` on a singl
Runtime Backends
================

The Runtime defines where code will run, but the Runtime has a :code:`Backend` - which defines the underlying implementation of *how* the flow will be executed. :code:`'single_process'` is the default in the :code:`LocalRuntime`: it executes all code sequentially within a single python process, and is well suited to run both on high spec and low spec hardware. For users with large servers or multiple GPUs they wish to take advantage of, we also provide a `Ray <https://github.com/ray-project/ray>` backend. The Ray backend enables parallel task execution for collaborators, and optionally allows users to request dedicated GPUs for collaborator tasks in the placement decorator, as follows:
The Runtime defines where code will run, but the Runtime has a :code:`Backend` - which defines the underlying implementation of *how* the flow will be executed. :code:`single_process` is the default in the :code:`LocalRuntime`: it executes all code sequentially within a single python process, and is well suited to run both on high spec and low spec hardware

For users with large servers or multiple GPUs they wish to take advantage of, we also provide a :code:`ray` `<https://github.com/ray-project/ray>` backend. The Ray backend enables parallel task execution for collaborators, and optionally allows users to request dedicated CPU / GPUs for Participants by using the :code:`num_cpus` and :code:`num_gpus` arguments while instantiating the Participant in following manner:

.. code-block:: python
ExampleDedicatedGPUFlow(FLSpec):
...
# We request one dedicated GPU for this task
@collaborator(num_gpus=1)
def training(self):
print(f'CUDA_VISIBLE_DEVICES: {os.environ["CUDA_VISIBLE_DEVICES"]}'))
self.loss = train_func(self.model, self.train_loader)
self.next(self.validation)
...
# Aggregator
aggregator_ = Aggregator(num_gpus=0.2)
collaborator_names = ["Portland", "Seattle", "Chandler", "Bangalore"]
def callable_to_initialize_collaborator_private_attributes(index, n_collaborators, batch_size, train_dataset, test_dataset):
...
# Setup collaborators private attributes via callable function
collaborators = []
for idx, collaborator_name in enumerate(collaborator_names):
collaborators.append(
Collaborator(
name=collaborator_name,
num_gpus=0.2, # Number of the GPU allocated to Participant
private_attributes_callable=callable_to_initialize_collaborator_private_attributes,
index=idx,
n_collaborators=len(collaborator_names),
train_dataset=mnist_train,
test_dataset=mnist_test,
batch_size=64
)
)
# The Ray Backend will now be used for local execution
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')
In the above example, we have used :code:`num_gpus=0.2` while instantiating Aggregator and Collaborator to specify that each participant shall use 1/5th of GPU - this results in one GPU being dedicated for a total of 4 collaborators and 1 Aggregator. Users can tune these arguments based on their Federated Learning requirements and available hardware resources. Configurations where one Participant is shared across GPUs is not supported. For e.g. trying to run 5 participants on 2 GPU hardware with :code:`num_gpus=0.4` will not work since 80% of each GPU is allocated to 4 participants and 5th participant does not have any available GPU remaining for use.

**Note:** It is not necessary to have ALL the participants use GPUs. For e.g. only the Collaborator are allocated to GPUs. In this scenario user should ensure that the artifacts returned by Collaborators to Aggregator (e.g. locally trained model object) should be loaded back to CPU before exiting the collaborator step (i.e. before the join step). As Tensorflow manages the object allocation by default therefore this step is needed only for Pytorch.

Debugging with the Metaflow Client
==================================

Expand All @@ -218,19 +268,38 @@ After the flow has started running, you can use the Metaflow Client to get inter

.. code-block:: python
from metaflow import Flow, Run, Task, Step
from metaflow import Metaflow, Flow, Step, Task
# Initialize Metaflow object and obtain list of executed flows:
m = Metaflow()
list(m)
> [Flow('FederatedFlow'), Flow('AggregatorValidationFlow'), Flow('FederatedFlow_MNIST_Watermarking')]
# The name of the flow is the name of the class
flow = Flow('FederatedFlow')
run = flow.latest_run
# Identify the Flow name
flow_name = 'FederatedFlow'
# List all instances of Federatedflow executed under distinct run IDs
flow = Flow(flow_name)
list(flow)
> [Run('FederatedFlow/1692946840822001'),
Run('FederatedFlow/1692946796234386'),
Run('FederatedFlow/1692902602941163'),
Run('FederatedFlow/1692902559123920'),]
# To Retrieve the latest run of the Federatedflow
run = Flow(flow_name).latest_run
print(run)
> Run('FederatedFlow/1692946840822001')
list(run)
> [Step('FederatedFlow/1671152854447797/end'),
Step('FederatedFlow/1671152854447797/join'),
Step('FederatedFlow/1671152854447797/local_model_validation'),
Step('FederatedFlow/1671152854447797/train'),
Step('FederatedFlow/1671152854447797/aggregated_model_validation'),
Step('FederatedFlow/1671152854447797/start')]
step = Step('FederatedFlow/1671152854447797/aggregated_model_validation')
> [Step('FederatedFlow/1692946840822001/end'),
Step('FederatedFlow/1692946840822001/join'),
Step('FederatedFlow/1692946840822001/local_model_validation'),
Step('FederatedFlow/1692946840822001/train'),
Step('FederatedFlow/1692946840822001/aggregated_model_validation'),
Step('FederatedFlow/1692946840822001/start')]
step = Step('FederatedFlow/1692946840822001/aggregated_model_validation')
for task in step:
if task.data.input == 'Portland':
print(task.data)
Expand Down Expand Up @@ -260,6 +329,37 @@ And if we wanted to get log or error message for that task, you can just run:
print(portland_task.stderr)
> [No output]
Also, If we wanted to get the best model and the last model, you can just run:

.. code-block:: python
# Choose the specific step containing the desired models (e.g., 'join' step):
step = Step('FederatedFlow/1692946840822001/join')
list(step)
> [Task('FederatedFlow/1692946840822001/join/12'),--> Round 3
Task('FederatedFlow/1692946840822001/join/9'), --> Round 2
Task('FederatedFlow/1692946840822001/join/6'), --> Round 1
Task('FederatedFlow/1692946840822001/join/3')] --> Round 0
"""The sequence of tasks represents each round, with the most recent task corresponding to the final round and the preceding tasks indicating the previous rounds
in chronological order.
To determine the best model, analyze the command line logs and model accuracy for each round. Then, provide the corresponding task ID associated with that Task"""
task = Task('FederatedFlow/1692946840822001/join/9')
# Access the best model and its associated data
best_model = task.data.model
best_local_model_accuracy = task.data.local_model_accuracy
best_aggregated_model_accuracy = t.data.aggregated_model_accuracy
# To retrieve the last model, select the most recent Task i.e last round.
task = Task('FederatedFlow/1692946840822001/join/12')
last_model = task.data.model
# Save the chosen models using a suitable framework (e.g., PyTorch in this example):
import torch
torch.save(last_model.state_dict(), PATH)
torch.save(best_model.state_dict(), PATH)
While this information is useful for debugging, depending on your workflow it may require significant disk space. For this reason, `checkpoint` is disabled by default.

Runtimes: Future Plans
Expand All @@ -279,4 +379,3 @@ Our goal is to make it a one line change to configure where and how a flow is ex
federated_runtime = FederatedRuntime(...)
flow.runtime = federated_runtime
flow.run()
Loading