Skip to content

Commit

Permalink
adding options for Dask cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
gtramonte committed Jul 29, 2024
1 parent 533fdff commit 991f9cf
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions executor/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,27 +231,20 @@ def process(message: Message, compute: bool):
class Executor(metaclass=LoggableMeta):
_LOG = logging.getLogger("geokube.Executor")

def __init__(self, broker, store_path):
def __init__(self, broker, store_path, dask_cluster_opts):
self._store = store_path
broker_conn = pika.BlockingConnection(
pika.ConnectionParameters(host=broker, heartbeat=10),
)
self._conn = broker_conn
self._channel = broker_conn.channel()
self._db = DBManager()
self.dask_cluster_opts = dask_cluster_opts

def create_dask_cluster(self, dask_cluster_opts: dict = None):
if dask_cluster_opts is None:
dask_cluster_opts = {}
dask_cluster_opts["scheduler_port"] = int(
os.getenv("DASK_SCHEDULER_PORT", 8188)
)
dask_cluster_opts["processes"] = True
port = int(os.getenv("DASK_DASHBOARD_PORT", 8787))
dask_cluster_opts["dashboard_address"] = f":{port}"
dask_cluster_opts["n_workers"] = 1
dask_cluster_opts["memory_limit"] = "auto"
dask_cluster_opts['thread_per_worker'] = 8
dask_cluster_opts = self.dask_cluster_opts

self._worker_id = self._db.create_worker(
status="enabled",
dask_scheduler_port=dask_cluster_opts["scheduler_port"],
Expand All @@ -264,13 +257,13 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None):
)
dask_cluster = LocalCluster(
n_workers=dask_cluster_opts['n_workers'],
#scheduler_port=dask_cluster_opts["scheduler_port"],
#dashboard_address=dask_cluster_opts["dashboard_address"],
#memory_limit=dask_cluster_opts["memory_limit"],
scheduler_port=dask_cluster_opts["scheduler_port"],
dashboard_address=dask_cluster_opts["dashboard_address"],
memory_limit=dask_cluster_opts["memory_limit"],
threads_per_worker=dask_cluster_opts['thread_per_worker'],
)
self._LOG.info(
"not creating Dask Client...", extra={"track_id": self._worker_id}
"creating Dask Client...", extra={"track_id": self._worker_id}
)
self._dask_client = Client(dask_cluster)
self._nanny = Nanny(self._dask_client.cluster.scheduler.address)
Expand Down Expand Up @@ -452,11 +445,22 @@ def get_size(self, location_path):
executor_types = os.getenv("EXECUTOR_TYPES", "query").split(",")
store_path = os.getenv("STORE_PATH", ".")

executor = Executor(broker=broker, store_path=store_path)
dask_cluster_opts = {}
dask_cluster_opts["scheduler_port"] = int(
os.getenv("DASK_SCHEDULER_PORT", 8188)
)
dask_cluster_opts["processes"] = True
port = int(os.getenv("DASK_DASHBOARD_PORT", 8787))
dask_cluster_opts["dashboard_address"] = f":{port}"
dask_cluster_opts["n_workers"] = os.getenv("DASK_N_WORKERS", 1)
dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto")
dask_cluster_opts['thread_per_worker'] = os.getenv("DASK_THREADS_PER_WORKER", 8)


executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts)
print("channel subscribe")
for etype in executor_types:
if etype == "query":
#TODO: create dask cluster with options
executor.create_dask_cluster()

executor.subscribe(etype)
Expand Down

0 comments on commit 991f9cf

Please sign in to comment.