Skip to content

Commit

Permalink
Added Logs for better understanding
Browse files Browse the repository at this point in the history
  • Loading branch information
kshitijrajsharma committed Dec 25, 2023
1 parent 5268cc5 commit 6fd2afd
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1335,10 +1335,14 @@ def query_to_file(self, query, category_name, feature_type, export_formats):
self.default_export_path, category_name, feature_type
)
resources = []
start_export_formats_time = time.time()

def process_export_format(export_format):
export_format_path = os.path.join(file_export_path, export_format.suffix)
os.makedirs(export_format_path, exist_ok=True)
logging.info(
"Processing %s:%s", category_name.lower(), export_format.suffix
)

export_filename = f"""{self.params.dataset.dataset_prefix}_{category_name}_{feature_type}_{export_format.suffix}"""
export_file_path = os.path.join(
Expand All @@ -1364,10 +1368,13 @@ def process_export_format(export_format):
resource["zip_path"] = zip_path
resource["format_suffix"] = export_format.suffix
resource["format_description"] = export_format.driver_name

logging.info("Done %s:%s", category_name.lower(), export_format.suffix)
return resource

if self.parallel_process_state is False and len(export_formats) > 1:
logging.info(
"Using Parallel Processing for %s Export formats", category_name.lower()
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
Expand All @@ -1383,7 +1390,13 @@ def process_export_format(export_format):
for exf in export_formats:
resource = process_export_format(exf)
resources.append(resource)

logging.info(
"Processing Done of all Export formats for %s in %s ",
category_name.lower(),
humanize.naturaldelta(
timedelta(seconds=(time.time() - start_export_formats_time))
),
)
return resources

def process_category_result(self, category_result):
Expand Down Expand Up @@ -1418,7 +1431,8 @@ def process_category(self, category):
- List of resource dictionaries containing export information.
"""
category_name, category_data = list(category.items())[0]
logging.info(f"Started Processing {category_name}")
category_start_time = time.time()
logging.info("Started Processing %s", category_name)
all_uploaded_resources = []
for feature_type in category_data.types:
extract_query = extract_features_duckdb(
Expand All @@ -1432,7 +1446,13 @@ def process_category(self, category):
)
uploaded_resources = self.zip_to_s3(resources)
all_uploaded_resources.extend(uploaded_resources)
logging.info(f"Done Processing {category_name}")
logging.info(
"Done Processing %s in %s ",
category_name,
humanize.naturaldelta(
timedelta(seconds=(time.time() - category_start_time))
),
)
return all_uploaded_resources

def resource_to_response(self, uploaded_resources, category):
Expand Down Expand Up @@ -1556,6 +1576,7 @@ def process_hdx_tags(self):
dataset_results = []
if len(self.params.categories) > 1:
self.parallel_process_state = True
logging.info("Starting to Use Parallel Processes")
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
Expand Down

0 comments on commit 6fd2afd

Please sign in to comment.