- Usage
- Project Components
- Python
- Terraform
- CICD
- You must have AWS credentials set up in your terminal (
AWS_ACCESS_KEY_ID
,AWS_SECRET_ACCESS_KEY
,AWS_DEFAULT_REGION
). - You must have the credentials for a database with populated tables matching this schema: https://dbdiagram.io/d/SampleDB-6332fecf7b3d2034ffcaaa92
- You must have the credentials for a data warehouse with unpopulated tables matching this schema: https://dbdiagram.io/d/RevisedDW-63a19c5399cb1f3b55a27eca
- An email for logging problems.
Prepare credentials for the project so that the code can access the necessary databases and email for logging. Credentials needed includes the original database credentials, warehouse credentials and email for logging.
- Rename
db_credentials.json.example
todb_credentials.json
- Modify the values that say
CHANGE_ME
to your own details
- Rename
warehouse_credentials.json.example
towarehouse_credentials.json
- Modify the values that say
CHANGE_ME
to your own details
- Rename
email.txt.example
toemail.txt
- Modify the text from
[email protected]
to your own email of choice
Run make install-requirements
. This will set up your environment and create a dependencies folder to be deployed as a lambda layer.
Next the code will be tested to make sure all the code is running correctly, is safe and conforms to pep8.
- Run
make install-dev-tools
- Run
make security-checks
- Run
make check-pep8-compliance
- Run
make run-pytest
Make sure all these tests are successful before continuing.
Navigate to the immutable-terraform
folder by running cd immutable-terraform
in the root directory. Then run
init terraform
plan terraform
If plan terraform is successful then runapply terraform -auto-approve
. Finally navigate back to the root directory withcd ..
Navigate to the main-terraform
folder by running cd main-terraform
in the root directory. Then run
init terraform
plan terraform
If plan terraform is successful then runapply terraform -auto-approve
. Finally navigate back to the root directory withcd ..
The ingest process aims to:
- Copy data from the ToteSys database
- Convert this data into Parquet format
- Save this data into an S3 bucket with an identifiable file name
lambda_handler
- Gets the start_time from
get_last_ingest_time
end_time
is the time at the start of the run- generates a SQL query for each table name
- queries the database
- converts the data to parquet
- adds it to the s3 bucket
-
event
Mandatory, no default. Structure of event:event = {"tables_to_query": ["table_name",...]}
-
context
Mandatory, no default. Metadata about the lambda handler.
-
A dictionary containing the table names as keys and file_key as value.
-
{'table_name_1' : 'table_name/yyyy/mm/dd/hhmmssmmmmmm.parquet','table_name_2' : 'table_name/yyyy/mm/dd/hhmmssmmmmmm.parquet', ... }
INFO
when each function is successfulCRITICAL
when each function fails fatallyWARNING
when query_db returns an empty list
This package provides the ability to connect to the Terrific Totes database and to close the connection. It also manages the secret retrieval needed to connect.
db_connections_get_secret
- Returns a dictionary containing the requested secret.
-
client
Mandatory, no default. Takes a boto3.client secretsmanager object. -
secret_name
Mandatory, no default. Takes a string representing the name of the secret to be retrieved.
secret
Returns a dictionary containing the secret retrieved
- Currently none
connect_to_db
- Returns an authenticated pg8000 connection object for the totesysDB
None
connection
Returns a pg8000 native connection object
- Currently none
close_db_connection
- Closes the an existing database connection.
conn
Mandatory, no default. Takes a pg8000 native connection object
None
- Currently None
Returns a date string in the agreed format.
format_time
date_time
Mandatory, no default. a date time object to convert
formatted_time
a string in the format YYYY-MM-DD HH:MM:SS.sss
- Currently None
Generates file names based on table name, time, and file extension.
generate_file_key
-
table_name
Mandatory, no default. String representing a database table name -
end_time
Mandatory, no default. Datetime object representing the end time for the file name. -
extension
Optional, default "parquet". String representing the file extension. Omit the '.'
Currently None
Return a valid PostgreSQL query string for rows which were modified in the given table between the start_time and end_time. Will raise DateFormatError if start_time or end_time are in an invalid format.
generate_new_entry_query
-
table_name
Mandatory, no default. String representing the table name in the database. -
start_time
Mandatory, no default. Aformat_time
string representing the start of the time range (inclusive) -
end_time
Mandatory, no default. Aformat_time
string representing the end of the time range (exclusive)
query
A string containing a SQL query for extracting from the database.
- Currently None
Inspect the ingested files for the time of the last successfully ingest.
get_latest_filename
- Return the file in the bucket bucket_name with the prefix table_name which has the "biggest" name, ie. the name containing the latest timestamp.
-
s3_client
Mandatory, no default. boto3 s3 client connection object -
bucket_name
Mandatory, no default. String contacting the name of the s3 bucket -
table_name
Mandatory, no default. String containing the table name
most_recent_filename
String, name of the most recent file for the tablename, bucket combination.
- Currently None
Return a datetime object corresponding to the key of the most-recently-modified file in bucket bucket_name with the prefix table_name.
get_last_ingest_time
-
bucket_name
Mandatory, no default. String presenting the s3 bucket name -
table_name
Mandatory, no default. String representing the db table name
datetime
Returns a datetime object representing the latest extraction time for the bucket/table name combination
- Currently None
Takes dictionaries and converts them to bytes in parquet format
parquet_data
- Takes a dictionary and turns it into a parquet formatted byte stream
py_dict
Mandatory, no default. Pythin dictionary to be converted.
out_buffer
BytesIO object containing the parquet data.
- Currently None
Take an sql query string and return the result of the query as a dictionary formatted like a json object with table names etc.
query_db
-
Take an sql query string and return the result of the query as a dictionary formatted like a json object with table names etc.
-
If
dict_name = ""
, only return the first row of the query response as a dictionary with column keys. Otherwise, return a dictionary containing a list of dictionaries, where each dictionary contains a row of the query response.
-
sql_string
: Mandatory, no default. String containing valid PostgreSQL query -
connect_to_db
: Mandatory, no default. Function which returns connection to a database -
close_db_connection
Mandatory, no default. Function which closes database connection -
dict_name
Optional, default "response" String name used in the key of the response dictionary -
kwargs
Optional, no defaault. Keys and values passed into SQL query using :-syntax
dict
dictionary containing response, depending on dict_name.
- Currently None
Writes provided date to s3
write_to_s3
-
s3_client
Mandatory, no default. Boto3.client("s3") object -
bucket_name
Mandatory, no default. String containing the name of the bucket (must already exist) -
file_key
Mandatory, no default, String,name of the file in the bucket, including directory and file extension -
data
contents of the file`.
None
- Currently None
The transform process aims to:
- Read the parquet files from the ingest process
- Process and transform data to the correct star schema
- Write parquet files for upload to the data warehouse
lambda_handler
- Expects an event containing a dictionary with the table from totesysDB that has been updated
- Adds the files stored at these paths in the ingest bucket to a Warehouse object
- It then extracts the corresponding warehouse tables and places them in the transform s3 bucket
- Returns a dictionary of the dim tables and fact tables and their corresponding keys that have been added to the s3 transform bucket
-
event
Mandatory, no default. Structure of event:{"address": "address/yadayada.parquet","counterparty": "counterparty/yadayada.parquet", "currency": "currency/yadayada.parquet","design": "design/yadayada.parquet"...}
-
context
Mandatory, no default. Metadata about the lambda handler.
- A dictionary containing the table names as keys and file_key as value.
{"table_name_1": "path_to_dim/fact_table_1_in_tf_bucket", "dim_counterparty": "pathtolatestdimcounterparty", "fact_payment": "path...", ...}
CRITICAL
when each function fails fatally
Warehouse object expects parquet file from ingest bucket and creates dim- and fact- tables.
Warning: Only access properties for which you have ingested the relevant dependencies, otherwise will raise a KeyError.
Warehouse
__init__
-
list_of_filenames
list of file keys in an s3 bucket -
bucket_name
name of the s3 bucket to access ingest files from
Each produces a pandas dataframe representing a dimension or fact table for the data warehouse
-
dim_design
(depends on design)- retains:
- design_id
- design_name
- file_location
- file_name
- retains:
-
dim_transation
(depends on transaction)- retains:
- transaction_id
- transaction_type
- sales_order_id
- purchase_order_id
- retains:
-
dim_counterparty
(depends on counterparty and address)- retains(from address):
- address_id
- address_line_1
- address_line_2
- district
- city
- postal_code
- country
- phone
- remaps:
- address_line_1:
- counterparty_legal_address_line_1
- address_line_2:
- counterparty_legal_address_line_2
- district:
- counterparty_legal_district
- city:
- counterparty_legal_city
- postal_code:
- counterparty_legal_postal_code
- country:
- counterparty_legal_country
- phone:
- counterparty_legal_phone_number
- address_line_1:
- retains (from counter_party):
- counterparty_id
- counterparty_legal_name
- joined on address.address_id = counterparty.legal_address_id
- retains(from address):
-
dim_currency
(depends on currency)- creates fixed mapping for currency_code to currency name
- retains:
- currency_id
- currenct_code
-
dim_payment_type
(depends on payment_type)- retains:
- payment_type_id
- payment_type_name
- retains:
-
dim_location
(depends on address)- retains:
- address_id
- address_line_1
- address_line_2
- district
- city
- postal_code
- country
- phone
- remaps:
- address_id:
- location_id
- address_id:
- retains:
-
dim_staff
(depends on staff and department)- retains (from staff):
- staff_id
- first_name
- last_name
- email_address
- department_id
- retains (from department):
- department_id
- department_name
- location
- joined on staff.department = department.department_id
- retains (from staff):
-
fact_sales_order
(depends on sales_order)- retains:
- sales_order_id
- created_at
- last_updated_at
- design_id
- staff_id
- counterparty_id
- units_sold
- unit_price
- currency_id
- agreed_delivery_date
- agreed_payment_date
- agreed_delivery_location_id
- remaps:
- staff_id:
- sales_staff_id
- created_at:
- created_date
- created_time
- last_updated_at:
- last_updated_date
- last_updated_time
- staff_id:
- retains:
-
fact_payment
(depends on payment)- retains:
- payment_id
- created_at
- last_updated_at
- transaction_id
- counterparty_id
- payment_amount
- currency_id
- payment_type_id
- paid
- payment_date
- remaps:
- created_at:
- created_date
- created_time
- last_updated_at:
- last_updated_date
- last_updated_time
- created_at:
- retains:
-
fact_purchase_order
(depends on purchase_order)- retains:
- purchase_order_id
- created_at
- last_updated_at
- staff_id
- counterparty_id
- item_code
- item_quantity
- item_unit_price
- currency_id
- agreed_delivery_date
- agreed_payment_date
- agreed_delivery_location_id
- remaps:
- created_at:
- created_date
- created_time
- last_updated_at:
- last_updated_date
- last_updated_time
- created_at:
-
format_date_for_db
- takes a Series as a parameter, which is a column that has a datetime.date format, and casts it to a string in the format of
%Y-%m-%d
- takes a Series as a parameter, which is a column that has a datetime.date format, and casts it to a string in the format of
-
format_time_for_db
- takes a Series as a parameter, which is a column that has a datetime.time format, and casts it to a string in the format of
%H:%M:%S.%f
- takes a Series as a parameter, which is a column that has a datetime.time format, and casts it to a string in the format of
-
format_str_to_int
- takes a Series as a parameter, which is a column that has an id reference, and checks if a value is a NaN and replaces it with
NULL
, otherwise if the value is a float it get casted to a int before casting it as a string.
- takes a Series as a parameter, which is a column that has an id reference, and checks if a value is a NaN and replaces it with
-
none_to_NULL
- takes the whole DataFrame and checks if there is any
None
value and changes it toNULL
- takes the whole DataFrame and checks if there is any
- retains:
Returns the DataFrame of the parquet data from the corresponding s3 bucket and key.
get_df_from_parquet
- Gets the parquet data from the corresponding table name and file key.
- Returns the DataFrame from the parquet data
-
s3_client
Mandatory, no default. Takes a boto3.client s3 object. -
bucket_name
Mandatory, no default. Takes a string representing the name of the bucket. -
filename
Mandatory, no default. Takes a string representing the key of the file the is going to be accessed.
read_parquet(buffer)
Returns a DataFrame of the paraquet data accessed by the s3 client.
- Currently none
Returns the parquet data from the corresponding DataFrame.
generate_parquet_of_df
- Takes the DataFrame from the corrsponding table
- Converts it to parquet data ready to be uploaded to an s3 bucket.
df
Mandatory, no default. Takes the DataFrame that will be .
out_buffer.getvalue()
Returns a paraquet data from the corresponding DataFrame.
- Currently none
Creates a DataFrame of dates with information ready to be analysed.
dim_date
- Creates dim_date DataFrame between two specified years which contains the following columns:
- data_id
- year
- month
- day
- day_of_week
- day_name
- month_name
- quarter
-
start_year
Mandatory, no default. Takes an int for the year to start the data. -
end_year
Mandatory, no default. Takes an int for the year to end the the data
dim_date
Returns a DataFrame containing all the breakdown of dates between the two years given.
- Currently none
format_date_for_db
- Takes a Panda Series as a parameter, which is a column that has a datetime.date format, and casts it to a string in the format of
%Y-%m-%d
series
Mandatory, no default. Takes a Panda Series of datas.
- Returns the Series in the correct format.
- Currently none
The load process aims to:
- Copy parquet data from the s3 transform bucket
- Uploads the data to the warehouse database
lambda_handler
- Sorts the tables so that the dim tables will be updated before the fact tables
- Generates queries for the corresponding tables that needs to be updated
- Uploads the data to the warehouse database
-
event
Mandatory, no default. Takes the output fromtransform.py
as the event -
{"table_name_1": "path_to_dim/fact_table_1_in_tf_bucket", "dim_counterparty": "pathtolatestdimcounterparty", "fact_payment": "path...", ...}
-
context
Mandatory, no default. Metadata about the lambda handler.
None
CRITICAL
when each function fails fatally
Creates the query that will insert the updated values to the dim tables.
create_dim_query
- Generates a query to delete all entries in a table and insert values from a parquet file.
-
table_name
Mandatory, no default. Takes the table name of the file to be accessed from the s3 transform bucket. -
table_path
Mandatory, no default. Takes the key of which file that needs to be accessed from the s3 transform bucket. -
s3_client
Mandatory, no default. Boto3.client("s3") object
- Returns a string of the query to be ran and appends/inserts values to the dim tables in the data warehouse.
- Currently None
generate_insert_into_statement
- Appends
INSERT INTO
statement to the query with corresponding values from the DataFrame
-
table_name
Mandatory, no default. Takes the name of the dim table that will be use to query the warehouse database. -
columns
Mandatory, no default. Takes a list of names from the columns of the DataFrame. -
df
Mandatory, no default. Takes the DataFrame of the corresponding dim table.
output
The string query of the INSERT statement
- Currently None
format_value
- Checks if the value give is a
None
orNULL
value and returns the string"NULL"
. Otherwise it returns the string of the value.
value
Mandatory, no default. Takes any value and formats it.
- The string of the value or
"NULL"
- Currently None
Creates the query that will insert the updated values to the fact tables.
create_fact_query
- Generates a query by fetching the DataFrame from the corresponding table name and key.
-
table_name
Mandatory, no default. Takes the table name of the file to be accessed from the s3 transform bucket. -
table_path
Mandatory, no default. Takes the key of which file that needs to be accessed from the s3 transform bucket. -
s3_client
Mandatory, no default. Boto3.client("s3") object
- Returns a string of the query to be ran and appends/inserts values to the fact tables in the data warehouse.
Currently None
Returns the database query string for updating the table table_name.
create_fact_query
- First it checks whether the table name is of a dim table or fact table then gets the correct type of query to return from the corresponding table.
-
table_name
Mandatory, no default. Takes the table name accessed from the s3 transform bucket. -
table_path
Mandatory, no default. Takes the key of which file that needs to be accessed from the s3 transform bucket. -
s3_client
Mandatory, no default. Boto3.client("s3") object
- Returns a string of the query to be ran and appends/inserts values to a tables in the data warehouse.
- Currently None
Makes sure that each time when changes are made and then deployed, all the data in each of the fact tables then dim tables are deleted. This is to make sure that the data inside the warehouse is accurate and up to date.
The immutable-terraform is the process which allows for the initial set up of AWS. Allowing database credentials as well as warehouse credentials to be uploaded to AWS secretsmanager. Also, providing the creation of the ingest and transform buckets, in addition to the use of AWS Simple Notification Service (SNS).