Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapping up CUDA support. #293

Open
CielAl opened this issue May 2, 2024 · 7 comments
Open

Wrapping up CUDA support. #293

CielAl opened this issue May 2, 2024 · 7 comments

Comments

@CielAl
Copy link
Contributor

CielAl commented May 2, 2024

Hi @choosehappy @jacksonjacobs1 @nanli-emory
Finally some relief (and your inputs are appreciated), the CuCIM supports to both image handle and QC modules are implemented.

Unified Interface and Adaptors

  • An adapter class histoqc.array_adapter.adapter.ArrayAdapter is implemented to handle the calls of array operations on CPU/GPU without overwhelmingly changing the existing code. It provides a decorator such that the CPU function call in the original modules like:
    dilation(img_small, footprint=np.ones((kernel_size, kernel_size)))
    can be easily turned to:
    adapter(dilation)(img_small, footprint=np.ones((kernel_size, kernel_size)))

  • How does it work:
    (1) Is an input device type (CPU or CUDA) specified? If so the adapter will push the input array(s) to specified device and invoke the
    corresponding function. If not the adapter will rely on the device of the input array(s) itself.
    (2) Is an output device type specified? If so, the adapter will always push the output array (do nothing to non-array outputs) to the specified device.
    (3) If the GPU-accelerated impl. of a function does not exist, adapter will revert to CPU calls, but the output device is still guaranteed.
    (4) By default, each image_handle has its own adapter that specifies the device type. For an openslide handle, both input and output device will be CPU, and for cucim handle they will be both CUDA. Therefore, given an image handle type, the associated adapter can guarantee all img/masks generated are on the same device. Methods such as ArrayAdapter.device_sync_all/ArrayAdapter.sync also provides explicit ways to sync the device of one or more arrays if needed.
    (5) cupy and numpy shares a tons of unified array operations already (e.g., np.sqrt works for cupy arrays).

  • Benchmark
    I tested with five Aperio WSIs (40x, average HxW 66374 x 65736) using the default config.ini pipeline/parameters, using cucim and openslide correspondingly. For CPU-only (openslide) mode, 8 workers are specified. The machine has an RTX Titan, 128GB memory, and Ryzen 3800x CPU and the OS is Ubuntu 20.04.

Device Time (without byExampleWithFeatures) Time (with byExampleWithFeatures)
CUDA 38s 87s
CPU (8 processes) 96s 180s

CUDA significantly boosts running time efficiency compared to the multiprocessing (8 cores/8 subprocesses) CPU counterpart.

  • Limitation
    CUDA doesn't seem to work well with multiprocessing ("spawn" needed and significantly more memory cost due to spikes during initialization/releasing of each image handle and they compete for resources). Essentially the only benefit of CPU multiprocessing here is perhaps to reduce the cpu-to-gpu data latency. Possible future plans include:
    (1) Need to explore more delicate memory management in future.
    (2) Workarounds including distributed frameworks: essentially most of the WSI-level operations are independent with each other and the only reduction procedure perhaps happens at the result collection stage.
@CielAl
Copy link
Contributor Author

CielAl commented May 6, 2024

@choosehappy
Updated and added very simple multiprocessing support for CUDA: one worker process is assigned with a unique device ID - # of processes cannot exceed # of available CUDA devices.

The image handle/array adapter are now fully device-aware. Therefore, it makes future changes such as potential introduction of Dask a possible task (Dask handles both multiprocessing and distributed procedures much better than native python's built-in function and it can support multiple GPU nodes).

@CielAl
Copy link
Contributor Author

CielAl commented May 9, 2024

@jacksonjacobs1 @nanli-emory @choosehappy

Finally, I finish the dask implementation of multiprocessing/distributed learning for both CPU and GPU tasks. It basically enables the GPU-accelerated modules/image handles to utilize multiprocessing more efficiently.

A few takeways:

(1) Dask will be a requirement. However Dask-GPU is optional (just like cuCIM/cupy). Dask-GPU specifically deal with CUDA clusters and handle multiprocessing (and like Dask, it also handles distributed frameworks), and it basically solves the racing conditions I observe if we manually deploy a one-process-for-one-device multiprocessing task.

(2) It greatly simplify the deployment of multiprocessing procedures as well, hide most of the lower level operations/configurations under the hood. For isntance, the current HistoQC implementation would need a MultiprocessLoggingManager to launch a thread and fetch all logRecords from worker loggers through a synchronized queue handler. Under dask, you only need to forward all worker loggers directly to the main process.

(3) Basically it only slightly alters the main.py. Most of the other scripts are not touched. For instance, we would have a more readable layout to submit tasks to worker follow the below pattern:

        if args.nprocesses > 0:
            local_cluster = new_cluster('histoqc', is_cuda=is_cuda,
                                        nprocesses=args.nprocesses, gpu_ids=args.gpu_ids)
            with Client(local_cluster) as client:
                # register the worker side
                logging_setup.setup_client(client, forward_name="root")

                # noinspection PyTypeChecker
                futures_list = [client.submit(worker, idx, file_name, **_shared_state)
                                for idx, file_name in enumerate(files)]

                for future in as_completed(futures_list):
                    try:
                        base_img_finished = future.result()
                    except Exception as exc:
                        worker_error(exc, failed)
                    else:
                        worker_success(base_img_finished, result_file=results) 

(4) Finally, with dask and dask-gpu, it really significantly makes HistoQC scalable to large cohorts, as you may utilize CPU/GPUs across multiple machines for the downstream QC modules.

@CielAl
Copy link
Contributor Author

CielAl commented May 9, 2024

All changes are pushed to #290 if you are interested of reviewing the codes.

@jacksonjacobs1
Copy link
Collaborator

@CielAl This is an impressive contribution, thanks for your effort.

The added cuCIM support looks reasonable to me and I'll take a better look at the code soon.

The support for dask may actually conflict with the distributed approach (Ray) we've chosen and implemented in
https://github.com/choosehappy/HistoQC/tree/ray_cluster_functionality

In addition to handling distributed processing via the Ray job scheduler, Ray supports infrastructure-as-code for provisioning clusters. Ray clusters also facilitate model serving and training, which we plan to use for QuickAnnotator & PatchSorter.

Based on your work, did you observe any advantages that Dask might have over Ray for multi-machine multiprocessing?

@CielAl
Copy link
Contributor Author

CielAl commented May 14, 2024

@jacksonjacobs1 Hi, appreciate the compliment.

My previous experiences were mostly about pyspark (which I despise) and dask so my opinion could be biased. Below are my personal opinions.

Now for your question, in one word: simplicity.

First, let's wrap up what both Ray and Dask can do:

  • Avoid lower-level multiprocessing/distributed framework implementation and simplify the main workflow in main.py with very few changes.
  • Both support IaC to deploy the clusters (e.g., AWS).
  • Both support model training/deployment (e.g., sklearn, pytorch, etc.).

Key difference:

  • Ray has a much more decentralized scheduler and therefore has higher scalability (over simplicity for a consistent/reproducible internal date state, as well as debugging), providing that you providing you breaking down all individual QC array operations to upscale them in a distributed system (e.g., lazy evaluation etc.) Note that if you merely distribute the task inputs (e.g., WSI files) across different nodes and each node just run the whole pipeline to process a few files, the difference in efficiency is pretty much not of concern.

Advantage of Dask:

  • Better affinity to Nvidia's Rapids ecosystem (cuCim, cuML, etc.), and with Dask-CUDA there is a more comprehensive native GPU/VRAM memory/worker management, especially if multiple GPUs are involved in each cluster. Dask-Cuda is RapidsAI's choice for distributed learning deployment (e.g., RMM pooling and spilling).
  • Minimum modification of codes due to better integration of most data science libraries. Should you ever plan to turn QC pipelines into those lazy evaluated task graphs in the future, it's much easier to do in Dask as Dask and its extensions pretty much mirror most of the APIs of libraries you need. For instance, dask.array to np.array (or cupy's), dask_ml to sklearn etc.
  • In the case of PyTorch models, converting the single-machine version of both training and prediction to a distributed version is much more straightforward (e.g., skorch/pytorch-dask-ddp, etc).
  • Overall user-friendliness: simplicity and debugging.

In conclusion:

  • Dask and Ray greatly overlap in what they can achieve (e.g., Ray also supports Pytorch) but the simplicity of the Dask in terms of learning curve, debugging, amount of modification of codes, and tight integration to Python's data science ecosystem.
  • Dask might also be an "easier" choice for GPU-related tasks (Rapids/Dask-CUDA).
  • Dask does scale down to a single machine quite nicely (e.g., in case no need to perform distributed learning at all).
  • Ray would be a much better choice due to its scalability in the scenario when you upscale the individual array operations in QC modules (e.g., chunking/lazy evaluation, etc). As mentioned earlier, this would require an overhaul of most modules and Ray would introduce more changes of codes. If your parallelism is only task-level (e.g. each worker runs a subset of images) then Ray might not have significant added value but could be annoying to develop.

@jacksonjacobs1
Copy link
Collaborator

Thanks for the overview. FYI ray ships a scheduler for Dask to enable running dask environments on Ray clusters:
https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html

Ray would be a much better choice due to its scalability in the scenario when you upscale the individual array operations in QC modules (e.g., chunking/lazy evaluation, etc). As mentioned earlier, this would require an overhaul of most modules and Ray would introduce more changes of codes. If your parallelism is only task-level (e.g. each worker runs a subset of images) then Ray might not have significant added value but could be annoying to develop.

I'm not sure I understand this point. We implemented HistoQC + Ray at the task level, which has substantial added value in addition to being conceptually simple;

$processing time \propto \dfrac{\#images}{\#cpu cores} $

The user can add CPUs to their Ray cluster to meet their processing time needs, and nodes can even be added in live time.

Thoughts?

@CielAl
Copy link
Contributor Author

CielAl commented May 16, 2024

Appreciate feedbacks and my apologies due to a lack of clarification.

I'm not sure I understand this point. We implemented HistoQC + Ray at the task level, which has substantial added value in addition to being conceptually simple;

Where distributed frameworks such as Ray, Dask, or Spark shine is that they parallelize the individual array operation within the tasks (not that different to the MapReduce). For instance, you attempt to perform certain rank filter to a huge image which is way too large to be fit into physical memory, these frameworks breakdown the array into small chunks and each cluster will handle a few chunks, before the results from clusters being aggregated. All these operations will be managed by a scheduler for an optimized and balanced execution. (Other techniques such as lazy evaluation might also be involved to achieve this goal). Therefore, they are scalable when dealing with big data which can't be fit into individual machines.

And particularly in this scenario, Ray outperforms Dask due to its decentralized scheduling algorithm and has better scalability (dask's scheduler is mostly centralized).

However, if we only use Ray/Dask/... as a wrapper to deploy task-level parallelized jobs across single/multiple nodes, then in terms of scalability, none of them has much added value compared to each other, so scalability won't be why we choose a particular framework here, and we need to look into other factors for the decision.

FYI ray ships a scheduler for Dask to enable running dask environments on Ray clusters

That's actually a case of the aforementioned much fine-grained parallelization (e.g., dask.array). This might work only if (1) we intend to make HistoQC work at high magnification so that overhead is neglectable. (2) rewrite all individual modules to explicitly implement the parallelization.

If the scalability bonus of Ray is not significant (which I doubt, because after all in most cases individual tasks of HistoQC can be fit into a single node) or not applicapable (e.g., if we only aim for task-level parallelization) then it falls into where-ray-does-not-outperform-others scenario and we can just use dask API directly rather than dealing with a dask wrapper inside ray.

and nodes can even be added in live time
Both Ray and Dask can achieve that quite handily.

So in the end I think in terms of implement the distributed framework itself, both Ray and Dask suffice, and the real question is the benefit of using Ray/Dask at development time and what features in future we might deploy.

We could perhaps add a checklist to compare the advantages/disadvantages later. My vote (very much biased) to Dask besides its easiness to use is because of its integration with Nvidia's Rapids ecosystem through the Dask-CUDA extension. Dask-CUDA handles GPU clusters a lot better and easier than Ray with both device and memory management implemented on the Rapids side. Note that Rapids not only speedup the IO of big images but also implement GPU-accelerated APIs of skimage (cuCIM) and sklearn (cuML).

My second concern is the nuisance of code changes for Deep Learning model training/deployment. Both Ray and Dask can achieve this (Ray uses ray.train; dask uses skorch or dask-pytorch-ddp) but in general dask's interface would be what's closer to the single machine/single processing counterpart. In fact, with Dask-CUDA it would be further.

Finally, it's way easier to trace the error/logging/debugging in Dask than Ray.

If your roadmap involves integration of more and more GPU-related tasks (e.g., model training/deployment) then Dask could be an easier choice with less nuissance, while Ray is still an available option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants