From 1bc9c834c92151246c109d78562d6226e6b91630 Mon Sep 17 00:00:00 2001 From: Madhu Kadiri Date: Thu, 12 Dec 2024 09:36:27 +0000 Subject: [PATCH] long running gluejob in prod - fix - 1212 - 1 --- ...ds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py index 54f947e6464..69e4b1cb67e 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/etl_rds_tbl_rows_hashvalue_to_s3_prq_yyyy_mm.py @@ -384,30 +384,32 @@ def write_rds_df_to_s3_parquet(df_rds_write: DataFrame, coalesce_int = int(args.get('coalesce_int', 0)) if coalesce_int != 0: LOGGER.warn(f"""WARNING ! >> Given coalesce_int = {coalesce_int}""") - rds_hashed_rows_df_write = rds_hashed_rows_df.coalesce(coalesce_int).cache() + rds_hashed_rows_df_write = rds_hashed_rows_df.coalesce(coalesce_int) else: - rds_hashed_rows_df_write = rds_hashed_rows_df.alias("rds_hashed_rows_df_write").cache() + rds_hashed_rows_df_write = rds_hashed_rows_df.alias("rds_hashed_rows_df_write") # ---------------------------------------------------------- - unique_partitions_df = rds_hashed_rows_df_write\ - .select(*yyyy_mm_partition_by_cols)\ - .distinct()\ - .orderBy(yyyy_mm_partition_by_cols, ascending=True) + # rds_hashed_rows_df_write = rds_hashed_rows_df_write.cache() + # unique_partitions_df = rds_hashed_rows_df_write\ + # .select(*yyyy_mm_partition_by_cols)\ + # .distinct()\ + # .orderBy(yyyy_mm_partition_by_cols, ascending=True) - for row in unique_partitions_df.toLocalIterator(): - LOGGER.info(f"""year: {row[yyyy_mm_partition_by_cols[0]]}, - month: {row[yyyy_mm_partition_by_cols[1]]}""") + # for row in unique_partitions_df.toLocalIterator(): + # LOGGER.info(f"""year: {row[yyyy_mm_partition_by_cols[0]]}, + # month: {row[yyyy_mm_partition_by_cols[1]]}""") # write_rds_df_to_s3_parquet_v2(rds_hashed_rows_df_write, # yyyy_mm_partition_by_cols, # prq_table_folder_path) + LOGGER.info(f"""write_rds_df_to_s3_parquet() - function called.""") write_rds_df_to_s3_parquet(rds_hashed_rows_df_write, yyyy_mm_partition_by_cols, prq_table_folder_path) LOGGER.info(f"""'{prq_table_folder_path}' writing completed.""") - rds_hashed_rows_df_write.unpersist() + # rds_hashed_rows_df_write.unpersist() total_files, total_size = S3Methods.get_s3_folder_info(HASHED_OUTPUT_S3_BUCKET_NAME,