Skip to content

Commit

Permalink
Passing over most recent branch, continuing aws testing
Browse files Browse the repository at this point in the history
  • Loading branch information
VMess1 committed Nov 15, 2023
1 parent 3c411d9 commit 06e9770
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 6 deletions.
Binary file modified aws_utils/layer_code2.zip
Binary file not shown.
Binary file modified aws_utils/layer_code3.zip
Binary file not shown.
2 changes: 1 addition & 1 deletion src/processing/fact_table_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def fact_sales_order_tf(sale_order):
returning fact_sales_order format dataframe
'''
new_dataframe = sale_order
new_dataframe['sales_record_id'] = new_dataframe.reset_index().index
# new_dataframe['sales_record_id'] = new_dataframe.reset_index().index
new_dataframe = new_dataframe.rename(
{'staff_id': 'sales_staff_id'}, axis='columns')
new_dataframe['created_date'] = pd.DatetimeIndex(
Expand Down
8 changes: 5 additions & 3 deletions src/storage/read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ def get_file_list(client, target_bucket, table_name, last_timestamp):
def extract_timestamp(filepath):
timestamp = re.findall(r'\d{14,}', filepath)[0]
return int(timestamp)
logger.info("testing2")
response = client.list_objects(Bucket=target_bucket,
Prefix=f'{table_name}/')
logger.info("Objects listed")
contents = response.get('Contents', [])
logger.info(f"Objects listed")
logger.info(response)
if contents:
file_list = [obj['Key'] for obj in response['Contents']]
file_list.sort(key=extract_timestamp)
logger.info(f"Full file list: {file_list}")
last_timestamp_int = int(last_timestamp.replace('-', '')
.replace(':', '').replace(' ', ''))
newest_files = [file for file in file_list if
extract_timestamp(file) > last_timestamp_int]
logger.info(newest_files)
logger.info(f"Filtered file list {newest_files}")
return newest_files
else:
return []
Expand All @@ -43,6 +44,7 @@ def compile_parquet_data(client, target_bucket, table_name, timestamp):
'''Compiles parquet files, if there are new ones, into a dataframe
or returns an empty list if no new files'''
file_list = get_file_list(client, target_bucket, table_name, timestamp)
logger.info(f"File list: {file_list}")
data_rows = []
if file_list:
for filepath in file_list:
Expand Down
5 changes: 3 additions & 2 deletions src/storage/storage_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ def main(event, context):
logger.info(timestamp)
target_bucket = 'nc-group3-transformation-bucket'
for table_name in table_list:
logger.info('hello')
logger.info(f'{table_name}')
dataframe = compile_parquet_data(
s3client, target_bucket, table_name, timestamp)
logger.info('test')
tester = dataframe.head(5)
logger.info(tester)
if not dataframe.empty:
run_insert_query(con_warehouse, table_name, dataframe)
logger.info(f'Updated {table_name}')
Expand Down
1 change: 1 addition & 0 deletions terraform/lambda3.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ resource "aws_lambda_function" "lambda_warehouse" {
layers = [aws_lambda_layer_version.layer_dependencies_3.arn,
"arn:aws:lambda:eu-west-2:336392948345:layer:AWSSDKPandas-Python311:2"]
timeout = 120
memory_size = 512
}

# lambda3 dependencies
Expand Down

0 comments on commit 06e9770

Please sign in to comment.