Skip to content

Commit

Permalink
cleaning out muck in scheduler imports.
Browse files Browse the repository at this point in the history
  • Loading branch information
tskluzac committed Mar 25, 2022
1 parent 32ba75c commit a41a717
Showing 1 changed file with 16 additions and 26 deletions.
42 changes: 16 additions & 26 deletions scheddy/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@

# Absolute imports
import os
import json
import time
import boto3
import logging
import threading

# Native relative imports
from random import shuffle
from queue import Queue, PriorityQueue
from xtract_sdk.packagers.family_batch import FamilyBatch

# External requirement imports
from funcx import FuncXClient
from flask import current_app
from utils.pg_utils import pg_conn
from globus_sdk import AccessTokenAuthorizer

# Xtract SDK imports
from xtract_sdk.packagers.family import Family
from xtract_sdk.packagers.family_batch import FamilyBatch

# Internal imports
from status_checks import get_crawl_status
from scheddy.endpoint_strategies.rand_n_families import RandNFamiliesStrategy
from scheddy.endpoint_strategies.nothing_moves import NothingMovesStrategy
from scheddy.maps.name_to_extractor_map import extractor_map

from scheddy.endpoint_strategies.nothing_moves import NothingMovesStrategy
from scheddy.extractor_strategies.extension_map import ExtensionMapStrategy
from funcx import FuncXClient
from globus_sdk import AccessTokenAuthorizer
from flask import current_app

import logging
from scheddy.endpoint_strategies.rand_n_families import RandNFamiliesStrategy

log = logging.getLogger(f"{__name__}.sub")

Expand All @@ -37,7 +43,6 @@ def __lt__(self, other):

def get_all_extractors(fx_ep_ls):

from utils.pg_utils import pg_conn
cur = pg_conn().cursor() # don't need connection object; don't need to commit.

# Should be {extractor_name: {'funcx_id": uuid}}
Expand All @@ -47,32 +52,22 @@ def get_all_extractors(fx_ep_ls):
get_query = f"""SELECT ext_name, func_uuid from extractors WHERE fx_eid='{fx_ep}';"""
cur.execute(get_query)

# logger.info('We just executed query to get containers!')
log.error('we are here.')
log.error('We just executed query to get containers!')
log.error('Executing query to get containers...')
for item in cur.fetchall():
ext_name, func_uuid = item
if ext_name not in all_extractors:
all_extractors[ext_name] = dict()
all_extractors[ext_name][fx_ep] = func_uuid

# current_app.logger.info(f'These are all of the extractors we found: {all_extractors}')
log.error(f'These are all of the extractors we found: {all_extractors}')

return all_extractors


def get_fx_client(headers):
tokens = headers
print(f"HEADERS IN HERE: {tokens}")
fx_auth = AccessTokenAuthorizer(tokens['Authorization'].replace('Bearer ', ''))
search_auth = AccessTokenAuthorizer(tokens['Search'])
openid_auth = AccessTokenAuthorizer(tokens['Openid'])
print(f"TRYING TO CREATE FUNCX CLIENT")

print(f"fx_auth: {fx_auth}")
print(f"search_auth: {search_auth}")
print(f"openid_auth: {openid_auth}")

fxc = FuncXClient(fx_authorizer=fx_auth,
search_authorizer=search_auth,
Expand Down Expand Up @@ -229,7 +224,6 @@ def schedule(self, tiebreaker='random'):
packed_pri_obj = full_pq.get()
fam = packed_pri_obj.data

# print(f"Fetched: {fam}")
fam['first_extractor'] = extractor_name
self.to_xtract_q.put(fam)
self.counters['cumu_scheduled'] += 1
Expand All @@ -242,9 +236,6 @@ def schedule(self, tiebreaker='random'):
def orch_thread(self, headers):
to_terminate = False

# TYLER: Getting rid of imported function_ids
# from scheddy.maps.function_ids import functions
# TODO: GET ALL OF THE FUNCTIONS RIGHT HERE FROM DB.
print(f"ENDPOINTS TO CHECK: {self.fx_eps_to_check}")
all_extractors = get_all_extractors(self.fx_eps_to_check)
print(f"Fetched all extractors... {all_extractors}")
Expand Down Expand Up @@ -290,7 +281,6 @@ def orch_thread(self, headers):
self.counters['flagged_unknown'] += 1
continue

# *** Spaghetti code zone ***
# We should not need to repack and add an empty base_url
fam_batch = FamilyBatch()
packed_family = Family()
Expand Down

0 comments on commit a41a717

Please sign in to comment.