From 12389a58c4facbaab5ae14c47f6481d0facd3ebf Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 23 Oct 2024 12:55:25 -0700 Subject: [PATCH 1/2] Define GPU as a worker resource --- dask_cuda/cuda_worker.py | 12 ++++++++---- dask_cuda/local_cuda_cluster.py | 24 +++++++++++++++--------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 30c14450..2da4d3e2 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -6,7 +6,7 @@ import os import warnings -from toolz import valmap +from toolz import itemfilter, valmap import dask from distributed import Nanny @@ -103,11 +103,15 @@ def del_pid_file(): atexit.register(del_pid_file) if resources: - resources = resources.replace(",", " ").split() + resources = resources.replace(",", " ").replace("'", "").split() resources = dict(pair.split("=") for pair in resources) - resources = valmap(float, resources) + gpu_resources = valmap(int, itemfilter(lambda x: x != "GPU", resources)) + resources = valmap(float, itemfilter(lambda x: x == "GPU", resources)) + resources.update(gpu_resources) + if "GPU" not in resources: + resources["GPU"] = 1 else: - resources = None + resources = {"GPU": 1} preload_argv = kwargs.pop("preload_argv", []) kwargs = {"worker_port": None, "listen_address": None, **kwargs} diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index 7a24df43..48e6e73e 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -23,8 +23,8 @@ class LoggedWorker(Worker): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, *args, **worker_kwargs): + super().__init__(*args, **worker_kwargs) async def start(self): await super().start() @@ -32,8 +32,8 @@ async def start(self): class LoggedNanny(Nanny): - def __init__(self, *args, **kwargs): - super().__init__(*args, worker_class=LoggedWorker, **kwargs) + def __init__(self, *args, **worker_kwargs): + super().__init__(*args, worker_class=LoggedWorker, **worker_kwargs) class LocalCUDACluster(LocalCluster): @@ -244,7 +244,7 @@ def __init__( log_spilling=False, worker_class=None, pre_import=None, - **kwargs, + **worker_kwargs, ): # Required by RAPIDS libraries (e.g., cuDF) to ensure no context # initialization happens before we can set CUDA_VISIBLE_DEVICES @@ -326,7 +326,7 @@ def __init__( self.rmm_log_directory = rmm_log_directory self.rmm_track_allocations = rmm_track_allocations - if not kwargs.pop("processes", True): + if not worker_kwargs.pop("processes", True): raise ValueError( "Processes are necessary in order to use multiple GPUs with Dask" ) @@ -337,7 +337,7 @@ def __init__( if jit_unspill is None: jit_unspill = dask.config.get("jit-unspill", default=False) - data = kwargs.pop("data", None) + data = worker_kwargs.pop("data", None) if data is None: if device_memory_limit is None and memory_limit is None: data = {} @@ -375,7 +375,7 @@ def __init__( "protocol='ucxx'" ) - self.host = kwargs.get("host", None) + self.host = worker_kwargs.get("host", None) initialize( create_cuda_context=False, @@ -399,6 +399,12 @@ def __init__( self.pre_import = pre_import + if "resources" in worker_kwargs: + if "GPU" not in worker_kwargs["resources"]: + worker_kwargs["GPU"] = 1 + else: + worker_kwargs["resources"] = {"GPU": 1} + super().__init__( n_workers=0, threads_per_worker=threads_per_worker, @@ -416,7 +422,7 @@ def __init__( enable_rdmacm=enable_rdmacm, ) }, - **kwargs, + **worker_kwargs, ) self.new_spec["options"]["preload"] = self.new_spec["options"].get( From 1d6d5e82899b53f0d5c996501b92a07a2ee1a1df Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 23 Oct 2024 13:49:31 -0700 Subject: [PATCH 2/2] Add tests for GPU resource definition --- dask_cuda/tests/test_dask_cuda_worker.py | 42 ++++++++++++++++++++++ dask_cuda/tests/test_local_cuda_cluster.py | 20 +++++++++++ 2 files changed, 62 insertions(+) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 049fe85f..c32fa93d 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -594,3 +594,45 @@ def test_worker_cudf_spill_warning(enable_cudf_spill_warning): # noqa: F811 assert b"UserWarning: cuDF spilling is enabled" in ret.stderr else: assert b"UserWarning: cuDF spilling is enabled" not in ret.stderr + + +def test_worker_gpu_resource(loop): # noqa: F811 + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): + with popen( + [ + "dask", + "cuda", + "worker", + "127.0.0.1:9369", + "--no-dashboard", + ] + ): + with Client("127.0.0.1:9369", loop=loop) as client: + assert wait_workers(client, n_gpus=get_n_gpus()) + + workers = client.scheduler_info()["workers"] + for v in workers.values(): + assert "GPU" in v["resources"] + assert v["resources"]["GPU"] == 1 + + +def test_worker_gpu_resource_user_defined(loop): # noqa: F811 + with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]): + with popen( + [ + "dask", + "cuda", + "worker", + "127.0.0.1:9369", + "--resources", + "'GPU=55'", + "--no-dashboard", + ] + ): + with Client("127.0.0.1:9369", loop=loop) as client: + assert wait_workers(client, n_gpus=get_n_gpus()) + + workers = client.scheduler_info()["workers"] + for v in workers.values(): + assert "GPU" in v["resources"] + assert v["resources"]["GPU"] == 55 diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index b144d111..2b15973f 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -217,6 +217,26 @@ async def test_all_to_all(): assert all(all_data.count(i) == n_workers for i in all_data) +@gen_test(timeout=20) +async def test_worker_gpu_resource(): + async with LocalCUDACluster(asynchronous=True) as cluster: + async with Client(cluster, asynchronous=True) as client: + workers = client.scheduler_info()["workers"] + for v in workers.values(): + assert "GPU" in v["resources"] + assert v["resources"]["GPU"] == 1 + + +@gen_test(timeout=20) +async def test_worker_gpu_resource_user_defined(): + async with LocalCUDACluster(asynchronous=True, resources={"GPU": 55}) as cluster: + async with Client(cluster, asynchronous=True) as client: + workers = client.scheduler_info()["workers"] + for v in workers.values(): + assert "GPU" in v["resources"] + assert v["resources"]["GPU"] == 55 + + @gen_test(timeout=20) async def test_rmm_pool(): rmm = pytest.importorskip("rmm")