Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define "GPU" as a worker resource #1401

Open
wants to merge 2 commits into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import warnings

from toolz import valmap
from toolz import itemfilter, valmap

import dask
from distributed import Nanny
Expand Down Expand Up @@ -103,11 +103,15 @@ def del_pid_file():
atexit.register(del_pid_file)

if resources:
resources = resources.replace(",", " ").split()
resources = resources.replace(",", " ").replace("'", "").split()
resources = dict(pair.split("=") for pair in resources)
resources = valmap(float, resources)
gpu_resources = valmap(int, itemfilter(lambda x: x != "GPU", resources))
resources = valmap(float, itemfilter(lambda x: x == "GPU", resources))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacobtomlinson you have written this section originally, I presume mapping values to float was done to support a definition such as "MEMORY" but since I don't see any tests or any other explicit mention in Dask-CUDA, could you comment if that's right and whether you can think of more robust ways for us to handle types here other than what I wrote above as "GPU" or NOT "GPU"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I don't remember the reasoning. I think dask handles these values as floats, as you say it's to support values like MEMORY or other arbitrary quantities.

resources.update(gpu_resources)
if "GPU" not in resources:
resources["GPU"] = 1
else:
resources = None
resources = {"GPU": 1}

preload_argv = kwargs.pop("preload_argv", [])
kwargs = {"worker_port": None, "listen_address": None, **kwargs}
Expand Down
24 changes: 15 additions & 9 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@


class LoggedWorker(Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, *args, **worker_kwargs):
super().__init__(*args, **worker_kwargs)

async def start(self):
await super().start()
self.data.set_address(self.address)


class LoggedNanny(Nanny):
def __init__(self, *args, **kwargs):
super().__init__(*args, worker_class=LoggedWorker, **kwargs)
def __init__(self, *args, **worker_kwargs):
super().__init__(*args, worker_class=LoggedWorker, **worker_kwargs)


class LocalCUDACluster(LocalCluster):
Expand Down Expand Up @@ -244,7 +244,7 @@ def __init__(
log_spilling=False,
worker_class=None,
pre_import=None,
**kwargs,
**worker_kwargs,
):
# Required by RAPIDS libraries (e.g., cuDF) to ensure no context
# initialization happens before we can set CUDA_VISIBLE_DEVICES
Expand Down Expand Up @@ -326,7 +326,7 @@ def __init__(
self.rmm_log_directory = rmm_log_directory
self.rmm_track_allocations = rmm_track_allocations

if not kwargs.pop("processes", True):
if not worker_kwargs.pop("processes", True):
raise ValueError(
"Processes are necessary in order to use multiple GPUs with Dask"
)
Expand All @@ -337,7 +337,7 @@ def __init__(

if jit_unspill is None:
jit_unspill = dask.config.get("jit-unspill", default=False)
data = kwargs.pop("data", None)
data = worker_kwargs.pop("data", None)
if data is None:
if device_memory_limit is None and memory_limit is None:
data = {}
Expand Down Expand Up @@ -375,7 +375,7 @@ def __init__(
"protocol='ucxx'"
)

self.host = kwargs.get("host", None)
self.host = worker_kwargs.get("host", None)

initialize(
create_cuda_context=False,
Expand All @@ -399,6 +399,12 @@ def __init__(

self.pre_import = pre_import

if "resources" in worker_kwargs:
if "GPU" not in worker_kwargs["resources"]:
worker_kwargs["GPU"] = 1
else:
worker_kwargs["resources"] = {"GPU": 1}
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

super().__init__(
n_workers=0,
threads_per_worker=threads_per_worker,
Expand All @@ -416,7 +422,7 @@ def __init__(
enable_rdmacm=enable_rdmacm,
)
},
**kwargs,
**worker_kwargs,
)

self.new_spec["options"]["preload"] = self.new_spec["options"].get(
Expand Down
42 changes: 42 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,45 @@ def test_worker_cudf_spill_warning(enable_cudf_spill_warning): # noqa: F811
assert b"UserWarning: cuDF spilling is enabled" in ret.stderr
else:
assert b"UserWarning: cuDF spilling is enabled" not in ret.stderr


def test_worker_gpu_resource(loop): # noqa: F811
with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask",
"cuda",
"worker",
"127.0.0.1:9369",
"--no-dashboard",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

workers = client.scheduler_info()["workers"]
for v in workers.values():
assert "GPU" in v["resources"]
assert v["resources"]["GPU"] == 1


def test_worker_gpu_resource_user_defined(loop): # noqa: F811
with popen(["dask", "scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask",
"cuda",
"worker",
"127.0.0.1:9369",
"--resources",
"'GPU=55'",
"--no-dashboard",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

workers = client.scheduler_info()["workers"]
for v in workers.values():
assert "GPU" in v["resources"]
assert v["resources"]["GPU"] == 55
20 changes: 20 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,26 @@ async def test_all_to_all():
assert all(all_data.count(i) == n_workers for i in all_data)


@gen_test(timeout=20)
async def test_worker_gpu_resource():
async with LocalCUDACluster(asynchronous=True) as cluster:
async with Client(cluster, asynchronous=True) as client:
workers = client.scheduler_info()["workers"]
for v in workers.values():
assert "GPU" in v["resources"]
assert v["resources"]["GPU"] == 1


@gen_test(timeout=20)
async def test_worker_gpu_resource_user_defined():
async with LocalCUDACluster(asynchronous=True, resources={"GPU": 55}) as cluster:
async with Client(cluster, asynchronous=True) as client:
workers = client.scheduler_info()["workers"]
for v in workers.values():
assert "GPU" in v["resources"]
assert v["resources"]["GPU"] == 55


@gen_test(timeout=20)
async def test_rmm_pool():
rmm = pytest.importorskip("rmm")
Expand Down
Loading