-
Notifications
You must be signed in to change notification settings - Fork 0
/
process_data.py
128 lines (98 loc) · 4.15 KB
/
process_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""This module includes the code used for the ETL pipeline."""
import logging
import re
import sys
import pandas as pd
from sqlalchemy import create_engine
# set the logger config
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def load_data(messages_path: str, categories_path: str) -> pd.DataFrame:
"""This utility function is used to load messages and categories dataframes
and merge them together.
Args:
messages_path: Path of the messages file.
categories_path: Path of the file containing annotations.
Returns:
data: Dataframe containing messages and categories.
"""
try:
messages = pd.read_csv(messages_path)
categories = pd.read_csv(categories_path)
data = pd.merge(messages, categories, on=("id"), how="inner")
return data
except FileNotFoundError:
logging.error("Check the provided filepaths")
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""This utility function is used to clean the data.
Args:
df: The merged dataframe.
Returns:
data: The cleaned dataframe.
"""
categories_df = df[["id", "categories"]]
# create a dataframe of the 36 individual category columns
categories_df = categories_df["categories"].str.split(";", expand=True)
# extract the column names from the first row
row = categories_df.loc[0]
category_colnames = list(row.apply(lambda x: re.sub(r"-\d", "", x)))
# set the new dataframe columns
categories_df.columns = category_colnames
# extract the binary values from each column
for column in categories_df:
# set each value to be the last character of the string
categories_df[column] = categories_df[column].astype(str).str[-1]
# Convert all the categories to binary values
categories_df[column] = categories_df[column].str.replace("2", "1")
# convert column from string to numeric
categories_df[column] = pd.to_numeric(categories_df[column])
# drop the original categories column from `df`
df = df.drop(columns=["categories"])
# concatenate the original dataframe with the new `categories` dataframe
data = pd.concat([df, categories_df], axis=1)
# remove the duplicates
number_of_duplicates = data.duplicated(keep="first").sum()
logging.info(f"Number of duplicated elements {number_of_duplicates}")
logging.info("Removing duplicates")
# drop duplicates
data = data.drop_duplicates()
dupplicated_elements = data.duplicated(keep="first").sum()
# checking the number of duplicates
logging.info(f"Remaining duplicated elements {dupplicated_elements}")
return data
def save_data(df, database_filename):
"""This utility function is used to save the cleaned data into a database.
Args:
df: The cleaned data.
database_filename: The name of the database to save.
"""
engine = create_engine(f"sqlite:///{database_filename}")
df.to_sql("messages", engine, index=False, if_exists="replace")
def main():
"""Execute the ETL pipeline"""
if len(sys.argv) == 4:
messages_path, categories_path, database_filepath = sys.argv[1:]
logging.info(
"Loading data...\n MESSAGES: {}\n CATEGORIES: {}".format(
messages_path, categories_path
)
)
df = load_data(messages_path, categories_path)
logging.info("Cleaning data...")
df = clean_data(df)
logging.info("Saving data...\n DATABASE: {}".format(database_filepath))
save_data(df, database_filepath)
logging.info("Cleaned data saved to database!")
else:
logging.error(
"Please provide the filepaths of the messages and categories "
"datasets as the first and second argument respectively, as "
"well as the filepath of the database to save the cleaned data "
"to as the third argument. \n\nExample: python process_data.py "
"disaster_messages.csv disaster_categories.csv "
"DisasterResponse.db"
)
if __name__ == "__main__":
# execute the ETL pipeline to extract, transform and load data
# into a database
main()