Skip to content

Commit

Permalink
optional dashboard for distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
cpelley committed Oct 8, 2024
1 parent 22ffa8d commit ea8b261
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions dagrunner/runner/schedulers/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def __init__(self, num_workers, profiler_filepath=None, **kwargs):
self._profiler_output = profiler_filepath
self._kwargs = kwargs
self._cluster = None
self._client = None
self._dashboard_address = None

def __enter__(self):
"""Create a local cluster and connect a client to it."""
Expand All @@ -98,7 +100,9 @@ def __enter__(self):
threads_per_worker=1,
**self._kwargs,
)
Client(self._cluster)
self._client = Client(self._cluster, dashboard_address=self._dashboard_address)
if self._dashboard_address:
print(f"dashboard link: {self._client.dashboard_link}")
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
Expand Down Expand Up @@ -182,6 +186,7 @@ def run(self, dask_graph, verbose=False):
scheduler=self._scheduler,
num_workers=self._num_workers,
chunksize=1,
**self._kwargs,
)
visualize(
[prof, rprof, cprof],
Expand All @@ -193,7 +198,10 @@ def run(self, dask_graph, verbose=False):
print(f"{max([res.mem for res in rprof.results])}MB total memory used")
else:
res = self._dask_container.compute(
scheduler=self._scheduler, num_workers=self._num_workers, chunksize=1
scheduler=self._scheduler,
num_workers=self._num_workers,
chunksize=1,
**self._kwargs,
)
return res

Expand Down

0 comments on commit ea8b261

Please sign in to comment.