Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nuke 1) torch dist, 2) shared memory, and 3) filelock #556

Open
wants to merge 62 commits into
base: main
Choose a base branch
from

Conversation

knighton
Copy link
Contributor

@knighton knighton commented Dec 30, 2023

In which we blow away 1) torch dist, 2) shared memory, and 3) filelock #YOLO

Nuke torch dist

Can we better contain or even eliminate Streaming's dependencies on PyTorch, allowing it to be used in other ecosystems? In more targeted fashion, can we improve performance at scale by eliminating Streaming's usage of torch dist? Currently torch dist is used for only a couple of rank barriers in critical places. Do they really have to be over all ranks?

Replacing the torch dist barrier in StreamingDataset init

StreamingDataset init has three stages:

  1. Validating, normalizing, and defaulting the arguments about iterating (the pure SD args).
  2. The spread out, joint Stream init sequence:
    1. Stream args handling.
    2. Loading all the shards.
    3. Building the mapping of dataset <-> streams <-> shards <-> samples.
  3. All the state that is shared across all ranks and workers:
    1. Setting it up (create or attach).
    2. Local leader populating it.

Currently the latter part of section 3 is guarded by the rank barrier. In this PR, we use file creation/polling for file existence instead.

Replacing the torch dist barrier in StreamingDataset job registration

The purpose of this method is to perform register run paths in a common place for the purpose of agreeing upon unique paths for shared state and detecting local dir collisions. It has to work while being operated concurrently by processes of different ranks and workers of different jobs.

Long story short, we determined that the existing approach is just hosed for some reasons detailed below, and opt to rewrite it from scratch using our secret sauce: files.

Approach 1: the honor system

Bottom line

An inarguably elegant solution. Technically it's a diffusion model, of lore and know-how.

How it works

Simply don't reuse local dirs across jobs and users on the same node, unless it's safe to do so.

Fine print

It pays to be in-group.

Approach 2: sequentially numbered shmem "slots" storing each job's local dirs, number used as prefix

Bottom line

This has some busted "edge cases" . They are known as causing a mild amount of ongoing support traffic. We were arguably thinking too much about solving the specifics of our use case and were not creative enough about the full generalities of the problem in the diversity of future Streaming use.

Fixing some of these looks practical but actually might be theoretically impossible (possibly pulling a chmod 777 on shmem in python?), and fixing others looks theoretically possible but are impractical (zombie shared memory, orphaned babysitter child processes, and who watches the watchmen?).

How it works

You have a potentially infinite number of sequentially numbered slots. Registering a run, you walk the slots and take the first available slot, by storing your local dirs in it. While walking, you cross-check each existing run's locals against your own for collisions. Unregistering a run, you erase the locals in your slot.

Fine print

Steps:

  • Receive as input from a StreamingDataset the local/remote paths for each Stream.
  • Check own locals for overlap.
  • The local leader walks the slots sequentially from zero, checking for local dir collisions, and populates the first available slot with its local paths.
  • Barrier.
  • Then, non-local leaders walk the slots until they find the match.
  • Return to the StreamingDataset a pair of (slot number taken, newly created shmem object that lives in that slot).
  • In subsequent StreamingDataset.--init--() work, use that slot number to create unique paths to shared resources (shmems and filelocks).
  • On StreamingDataset.--del--(), use that shmem handle to clear its slot so that the slot can be reused by future jobs.

A slot being taken by a run means there is a shared memory object called f'locals_{slot_number:06} containing that run's locals (for cross-checking purposes). We check if a slot is in use by attempting to attach to a shmem and noting whether its size prefix is non-zero, or by catching the exception upon failure. When it fails, we know we've hit the end and can now register that slot.

We went with shared memory for this because (a) we didn't want stale filesystem cruft left over when jobs die purportedly unlike with using files, (b) we were already using shmem for many other things so it was a well-trodden path, and (c) using RAM for this would be just theoretically nicer than using disk.

If a run dies badly, users are left with zombie shared memory objects. This has turned out to be nastier problem than expected. That script streaming.base.util.clean_stale_shared_memory(), which fixes things offline manually, was great ROI with users. At least with files, there are no pretensions that your hypothetical 'registration file' goes away when the run goes away.

We designed targeting compute clouds with ephemeral nodes where the user is root. If a system has multiple users using Streaming, who can't train as root, and they attempt to register their runs, which involves walking all the shared memory objects including those of other users, exceptions will be made. Our current recommendation in that case is yes it's actually possible, you just have to patch the code to drop the get_shm_prefix() codepath, and also modify its self._filelock_root to something workable if /tmp isn't useable. Speaks for itself, really.

There is no lock taken to protect us from races, but I think we are protected by our strict shmem create/attach semantics combined with wrapping the whole thing in a try block inside a loop with 100 iterations. A run starting is also a rare event, and these things tend to not like sharing GPU by running at the same time. I do not know of this having ever being a problem in the field.

Note there is a carve-out for runs having no remote, the semantics of which I strongly suspect were originally correct, but now are probably very much not, because of an edge case involving resuming training with a populated cache dir containing shards which theoretically support both forms (zip -> raw), and for which both forms were created (compression was used during dataset creation), and due to your StreamingDataset argument config you canonically keep both forms, but because of the details/implications of starting training ASAP by inventorying and normalizing local directories ASAP, getting shards to the expected normalized state may be done lazily upon getitem, not init. And so you actually are mucking around with files on the fly, which means it is actually very much not safe to reuse those locals. Ugh. If my read on all of that is correct. Streaming project is complicated, and there are many, many legitimate factors that collectively result in that if you want the best possible performance.

Approach 3 (this PR): JSON registry file protected by a file lock, with subdir per job

Bottom line

This way is safe, bypasses the problems dragged in by shared memory, and is overall quite sequitur and boring.

How it works

  • /tmp/streaming/
    • <job hash>
      • worker_barrier.npy
      • worker_barrier.lock
      • next_epoch.npy
      • cache.lock
      • ...
    • <job hash>
      • ...
    • <job hash>
      • ...

You are already familiar with per-Stream root dirs remote and local, and the unfortunately currently hardcoded internal filelock root. In this PR, all Streaming jobs now share a single unified config root dir (defaults to /<temp root>/streaming/), which is separate from their one/multiple/many Stream-specific local dir(s).

Each job hashes deterministically from its list of fully-qualified local path(s) to some opaque hex string. This "job hash" aka "dataset hash" is appended to the Streaming config root to get the job-level (as opposed to individual stream-level) root directory (i.e., /<temp root>/streaming/<job hash>/). These job dirs exist for as long as their jobs do. In those subroots live all data structures shared across ranks and workers of the job, as files which are memory-mapped into their address spaces.

Fine print

A Streaming "job" is a collection of StreamingDataset replicas with basically identical configs, which correspond to a single dataset or dataset split.

Everything which is shared across the different local ranks and workers of a given job lives as memory-mapped files that are persisted under that job's directory and flushed to disk strategically. No possibility of zombie shared memory because this is using mmap. In return for this, we get the possibility of stale state on disk, depending on how we use them. In practice, in our use case this is a non-issue.

Job-level shared resources:

  • In --init--():
    • (MMapBarrier) worker_barrier.npy (MMapArray), worker_barrier.lock (SoftFileLock)
    • next_epoch.npy (MMapNumber)
    • cache.lock (SoftFileLock)
    • cache_usage.npy (MMapNumber)
    • shard_states.npy (MMapArray)
    • shard_access_times.npy (MMapArray)
  • In get_work(), ephemerally created and destroyed during epoch generation:
    • work_shape.npy (MMapArray)
    • work.npy (MMapArray)
  • In load_state_dict(), aka resuming from checkpoint:
    • checkpoint.json (MMapBuffer)

Job metadata:

  • JobEntry:
    • Stores 1) job hash, 2) stream hashes, 3) pid, 4) process creation time.
    • Note that we store hashes, not the originally fully-qualified paths, for cross-user privacy.
  • JobFile:
    • Corresponds to a JSON registry config file, e.g. /tmp/streaming/registry.json.
    • Contains JobEntries, indexing into them by job hashes and stream hashes.
    • Has all the stored state needed by a JobRegistry.
    • Can't be used concurrently. They are used safe inside a JobRegistry.
  • JobRegistry:
    • Corresponds to the config root dir, e.g. /tmp/streaming/.
    • Multiprocess-safe.
    • Exposes two methods: register() and unregister().
  • JobDirectory:
    • Corresponding to a job dir e.g. /tmp/streaming/<job hash>/.
    • On StreamingDataset init, you create or attach to a JobDirectory which represents all the state shared over all the job's StreamingDataset replicas.
    • You pass to its init the JobRegistry singleton, the dataset's streams, and the world state.
    • On init, JobDirectory calls JobRegistry.register(), and on del, it calls JobRegistry.unregister(). So the job object magically registers and unregisters itself at the proper time.

Along the way, the interprocess coordination functionality was organized into its own directory tree:

streaming/base/coord/
├── file.py
├── __init__.py
├── job
│   ├── directory.py
│   ├── entry.py
│   ├── file.py
│   ├── __init__.py
│   └── registry.py
├── mmap
│   ├── array.py
│   ├── barrier.py
│   ├── base.py
│   ├── buffer.py
│   ├── __init__.py
│   └── number.py
├── process.py
├── shmem
│   ├── array.py
│   ├── barrier.py
│   ├── __init__.py
│   ├── memory.py
│   ├── prefix.py
│   └── scalar.py
└── world.py

Nuke shared memory

Shared memory only lives in RAM and is exists separately of process address spaces and lifespans thereof. Python has a fair weather solution to not have zombie shared memory which, when everything goes right, does the intended thing. Programmers make plans, and Murphy's law laughs.

When we attach to a shared memory object, we typically get some unknowable quantity of extra bytes for free at the end due to memory page size. This is annoying, but solved in routine fashion by (a) by terminating our data with a forbidden byte such as null, (b) by using the first four bytes for a size prefix, or (c) by knowing the size (shape if we're talking about arrays) of the data beforehand. Depends.

TODO

Nuke filelock

TODO

@knighton knighton changed the title No dist, no shmem fork Nuke 1) torch dist, 2) shmem, and 3) filelock Dec 30, 2023
@knighton knighton marked this pull request as ready for review January 3, 2024 13:53
@knighton knighton changed the title Nuke 1) torch dist, 2) shmem, and 3) filelock Nuke 1) torch dist, 2) shared memory, and 3) filelock Jan 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant