From a41a717d0e94df8c9b082402ad079cae8cc6c214 Mon Sep 17 00:00:00 2001 From: tskluzac Date: Thu, 24 Mar 2022 21:31:09 -0500 Subject: [PATCH] cleaning out muck in scheduler imports. --- scheddy/scheduler.py | 42 ++++++++++++++++-------------------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/scheddy/scheduler.py b/scheddy/scheduler.py index df2570b..57c40db 100644 --- a/scheddy/scheduler.py +++ b/scheddy/scheduler.py @@ -1,26 +1,32 @@ +# Absolute imports import os import json import time import boto3 +import logging import threading +# Native relative imports from random import shuffle from queue import Queue, PriorityQueue -from xtract_sdk.packagers.family_batch import FamilyBatch + +# External requirement imports +from funcx import FuncXClient +from flask import current_app +from utils.pg_utils import pg_conn +from globus_sdk import AccessTokenAuthorizer + +# Xtract SDK imports from xtract_sdk.packagers.family import Family +from xtract_sdk.packagers.family_batch import FamilyBatch +# Internal imports from status_checks import get_crawl_status -from scheddy.endpoint_strategies.rand_n_families import RandNFamiliesStrategy -from scheddy.endpoint_strategies.nothing_moves import NothingMovesStrategy from scheddy.maps.name_to_extractor_map import extractor_map - +from scheddy.endpoint_strategies.nothing_moves import NothingMovesStrategy from scheddy.extractor_strategies.extension_map import ExtensionMapStrategy -from funcx import FuncXClient -from globus_sdk import AccessTokenAuthorizer -from flask import current_app - -import logging +from scheddy.endpoint_strategies.rand_n_families import RandNFamiliesStrategy log = logging.getLogger(f"{__name__}.sub") @@ -37,7 +43,6 @@ def __lt__(self, other): def get_all_extractors(fx_ep_ls): - from utils.pg_utils import pg_conn cur = pg_conn().cursor() # don't need connection object; don't need to commit. # Should be {extractor_name: {'funcx_id": uuid}} @@ -47,16 +52,12 @@ def get_all_extractors(fx_ep_ls): get_query = f"""SELECT ext_name, func_uuid from extractors WHERE fx_eid='{fx_ep}';""" cur.execute(get_query) - # logger.info('We just executed query to get containers!') - log.error('we are here.') - log.error('We just executed query to get containers!') + log.error('Executing query to get containers...') for item in cur.fetchall(): ext_name, func_uuid = item if ext_name not in all_extractors: all_extractors[ext_name] = dict() all_extractors[ext_name][fx_ep] = func_uuid - - # current_app.logger.info(f'These are all of the extractors we found: {all_extractors}') log.error(f'These are all of the extractors we found: {all_extractors}') return all_extractors @@ -64,15 +65,9 @@ def get_all_extractors(fx_ep_ls): def get_fx_client(headers): tokens = headers - print(f"HEADERS IN HERE: {tokens}") fx_auth = AccessTokenAuthorizer(tokens['Authorization'].replace('Bearer ', '')) search_auth = AccessTokenAuthorizer(tokens['Search']) openid_auth = AccessTokenAuthorizer(tokens['Openid']) - print(f"TRYING TO CREATE FUNCX CLIENT") - - print(f"fx_auth: {fx_auth}") - print(f"search_auth: {search_auth}") - print(f"openid_auth: {openid_auth}") fxc = FuncXClient(fx_authorizer=fx_auth, search_authorizer=search_auth, @@ -229,7 +224,6 @@ def schedule(self, tiebreaker='random'): packed_pri_obj = full_pq.get() fam = packed_pri_obj.data - # print(f"Fetched: {fam}") fam['first_extractor'] = extractor_name self.to_xtract_q.put(fam) self.counters['cumu_scheduled'] += 1 @@ -242,9 +236,6 @@ def schedule(self, tiebreaker='random'): def orch_thread(self, headers): to_terminate = False - # TYLER: Getting rid of imported function_ids - # from scheddy.maps.function_ids import functions - # TODO: GET ALL OF THE FUNCTIONS RIGHT HERE FROM DB. print(f"ENDPOINTS TO CHECK: {self.fx_eps_to_check}") all_extractors = get_all_extractors(self.fx_eps_to_check) print(f"Fetched all extractors... {all_extractors}") @@ -290,7 +281,6 @@ def orch_thread(self, headers): self.counters['flagged_unknown'] += 1 continue - # *** Spaghetti code zone *** # We should not need to repack and add an empty base_url fam_batch = FamilyBatch() packed_family = Family()