From ea8b2619f7768e1916b6dce3bdf2829db76c891b Mon Sep 17 00:00:00 2001 From: cpelley Date: Tue, 8 Oct 2024 07:16:49 +0100 Subject: [PATCH] optional dashboard for distributed --- dagrunner/runner/schedulers/dask.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/dagrunner/runner/schedulers/dask.py b/dagrunner/runner/schedulers/dask.py index 1e82745..768cc6c 100644 --- a/dagrunner/runner/schedulers/dask.py +++ b/dagrunner/runner/schedulers/dask.py @@ -89,6 +89,8 @@ def __init__(self, num_workers, profiler_filepath=None, **kwargs): self._profiler_output = profiler_filepath self._kwargs = kwargs self._cluster = None + self._client = None + self._dashboard_address = None def __enter__(self): """Create a local cluster and connect a client to it.""" @@ -98,7 +100,9 @@ def __enter__(self): threads_per_worker=1, **self._kwargs, ) - Client(self._cluster) + self._client = Client(self._cluster, dashboard_address=self._dashboard_address) + if self._dashboard_address: + print(f"dashboard link: {self._client.dashboard_link}") return self def __exit__(self, exc_type, exc_value, exc_traceback): @@ -182,6 +186,7 @@ def run(self, dask_graph, verbose=False): scheduler=self._scheduler, num_workers=self._num_workers, chunksize=1, + **self._kwargs, ) visualize( [prof, rprof, cprof], @@ -193,7 +198,10 @@ def run(self, dask_graph, verbose=False): print(f"{max([res.mem for res in rprof.results])}MB total memory used") else: res = self._dask_container.compute( - scheduler=self._scheduler, num_workers=self._num_workers, chunksize=1 + scheduler=self._scheduler, + num_workers=self._num_workers, + chunksize=1, + **self._kwargs, ) return res