Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel merge index #590

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

XiaohanZhangCMU
Copy link
Member

@XiaohanZhangCMU XiaohanZhangCMU commented Feb 5, 2024

Description of changes:

Make merge_index utility run in parallel with multiprocessing. Note the normal use case for merge index happens after mds shards are written to a number of partition folders (remote or local), and one wants to merge the index files of those folders into one merged index file. Depending on the number of cores available at the driver node, downloading and merging use all the available cores.

profiling with 40 cores. Explanation of the profile table:

  • with total number of mds files = 131072, and numbr of sreams = 8192, distribution = uniform, it means the root folder has 8192 subfolders, each folder has one index.json, so 8192 index.json to be merged. Each stream has 131072/8192 shards in it, so all index.json files have the same size.
  • with the same numbers as above but distribution = expoential, the ony difference is that the size of index.json files are not uniformly distributed, but skewed (exponentially distributed). This represents the cases where some streams are particularly heavier than some others.

Downloading

total number of mds files number of streams mds file distribution Serial Download (s) . Parallel Download
131072 8192 uniform 317 45
1048576 8192 uniform x 9
8388608 8192 uniform 314 48
16384 256 exponential 10 2.2
131072 8192 exponential 322 49
2097152 8192 exponential 316 50
8388608 8192 exponential 315 50

Merging

total number of mds files number of streams mds file distribution Serial Merge (s) . Parallel Merge
131072 8192 uniform 1 1
1048576 8192 uniform 1 1
8388608 8192 uniform 21 20
16384 256 exponential 1 1
131072 8192 exponential 1 1
2097152 8192 exponential 5 5
8388608 8192 exponential 20 19

Issue #, if available:

Merge Checklist:

Put an x without space in the boxes that apply. If you are unsure about any checklist, please don't hesitate to ask. We are here to help! This is simply a reminder of what we are going to look for before merging your pull request.

General

  • I have read the contributor guidelines
  • This is a documentation change or typo fix. If so, skip the rest of this checklist.
  • I certify that the changes I am introducing will be backward compatible, and I have discussed concerns about this, if any, with the MosaicML team.
  • I have updated any necessary documentation, including README and API docs (if appropriate).

Tests

  • I ran pre-commit on my change. (check out the pre-commit section of prerequisites)
  • I have added tests that prove my fix is effective or that my feature works (if appropriate).
  • I ran the tests locally to make sure it pass. (check out testing)
  • I have added unit and/or integration tests as appropriate to ensure backward compatibility of the changes.

tests/test_util.py Outdated Show resolved Hide resolved
streaming/base/util.py Outdated Show resolved Hide resolved
streaming/base/util.py Outdated Show resolved Hide resolved
with open(partition_index, 'r') as f:
obj = json.load(f)
for shard in obj['shards']:
for key in ('raw_data', 'zip_data', 'raw_meta', 'zip_meta'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we really ought to make this a Shard method, which is subject to inheritance and so on

this code won't work for parquet shards :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific suggestion how to deal with this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work for json/xsv or just for mds index files? Could you test that as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do json/xsv index files have the same file format? @knighton

streaming/base/util.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@snarayan21 snarayan21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left comments!

streaming/base/util.py Outdated Show resolved Hide resolved
streaming/base/util.py Outdated Show resolved Hide resolved
streaming/base/util.py Outdated Show resolved Hide resolved
tests/test_util.py Show resolved Hide resolved
shard = obj['shards'][i]
for key in ('raw_data', 'zip_data', 'raw_meta', 'zip_meta'):
if shard.get(key):
basename = shard[key]['basename']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait why are you just taking the basename of the child file here? and to be clear, why the basename of the parent as well, what if the dir to merge is >1 hops deep?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if anyone takes basename in the format literally, that would be a mistake lol, those are actually always relative paths, was originally named wrong

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@XiaohanZhangCMU was this resolved?

@XiaohanZhangCMU
Copy link
Member Author

@knighton I updated the description to include some of the profiling results. PTA~

@snarayan21
Copy link
Collaborator

@XiaohanZhangCMU is this ready for another round of reviewing? would be good to get it in

Copy link
Collaborator

@snarayan21 snarayan21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments!


Args:
index_file_urls (List[Union[str, Tuple[str,str]]]): index.json from all the partitions.
index_file_urls (List[Union[str, Tuple[str,str]]]): index.json from all the streams.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function had typed arguments before, could you add that back instead of using generic *args, **kwargs? Would help clarify what the function is expecting, since otherwise, people have to go looking for the docstring

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an entry function and it branches to either merge_index from list or merge_index from a root folder. Are you recommending to not remove this entry function, and just keep the two actual implementations? Can you elaborate a bit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I mean that the signature of the merge_index function isn't typed -- it just has *args, **kwargs, but the docstring has types for the arguments, for example, index_file_urls has type (List[Union[str, Tuple[str,str]]]). I'm suggesting we add types to the merge_index function so that users will easily be able to see what the merge_index function takes in (for example, IDEs all support this, would be better for our docs, etc). Otherwise they have to reference the docstring, which doesn't match up with the function signature. lmk if i should elaborate more

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this entry function is fine

with open(partition_index, 'r') as f:
obj = json.load(f)
for shard in obj['shards']:
for key in ('raw_data', 'zip_data', 'raw_meta', 'zip_meta'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work for json/xsv or just for mds index files? Could you test that as well?

streaming/base/util.py Show resolved Hide resolved
streaming/base/util.py Outdated Show resolved Hide resolved
streaming/base/util.py Outdated Show resolved Hide resolved
tests/test_util.py Show resolved Hide resolved
@snarayan21
Copy link
Collaborator

@XiaohanZhangCMU what remaining changes do we need here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants