Skip to content

Commit

Permalink
Refactor one_month_data to support batch processing; clean up unused …
Browse files Browse the repository at this point in the history
…variables and improve logging in model training and data storage methods
  • Loading branch information
petya-vasileva committed Dec 13, 2024
1 parent e358e5a commit 55086f3
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 22 deletions.
13 changes: 9 additions & 4 deletions src/ml/packet_loss_one_month_onehot.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import pandas as pd
import datetime

def one_month_data(plsDf_custom):
def one_month_data(plsDf_custom, batch_size=10000):
# Preprocessing
plsDf_custom = plsDf_custom.drop(['src', 'dest', 'pair', 'src_host', 'dest_host'], axis=1)
plsDf_custom['dt'] = pd.to_datetime(plsDf_custom['to'], utc=True)

plsDf_custom['tests_done'] = plsDf_custom['tests_done'].str.rstrip('%').astype('float') / 100.0

# ONE HOT encoding
plsDf_onehot = pd.get_dummies(plsDf_custom, dtype=int)
# ONE HOT encoding in batches
plsDf_onehot = pd.DataFrame()
for i in range(0, len(plsDf_custom), batch_size):
batch = plsDf_custom.iloc[i:i+batch_size]
batch_onehot = pd.get_dummies(batch, dtype=int)
plsDf_onehot = pd.concat([plsDf_onehot, batch_onehot], ignore_index=True)

# Determine the number of days in the dataframe
num_days = (plsDf_custom['dt'].max() - plsDf_custom['dt'].min()).days
plsDf_onehot.reset_index(drop=True, inplace=True)
plsDf_custom = plsDf_custom.sort_values(by='dt')

if num_days >= 60:
first_month_n = (plsDf_custom['dt'] < (plsDf_custom['dt'].min() + pd.Timedelta(days=28))).sum()
Expand Down
3 changes: 3 additions & 0 deletions src/ml/packet_loss_preprocess_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def packet_loss_preprocess(plsDf_custom_x, model):

print('plsDf_onehot_plot', plsDf_onehot_plot.shape)

del plsDf_custom_y
del y

return df_to_plot, plsDf_onehot_plot


Expand Down
15 changes: 8 additions & 7 deletions src/ml/thrpt_dataset_model_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def preprocess(rawDf_custom):
return rawDf_onehot

def trainMLmodel(rawDf):
print('Starting trainMLmodel')
#taking the index of the first 50 days for further training
rawDf['dt'] = pd.to_datetime(rawDf['dt'].astype(str))
date_s = list(rawDf['dt'][:1])[0]
Expand All @@ -39,6 +40,7 @@ def trainMLmodel(rawDf):

#preprocessing the dataset
rawDf_onehot = preprocess(rawDf)
print('Preprocessed dataset')

# preparing the datasets for ml training
rawDf_custom_y = rawDf_onehot['alarm_created']
Expand All @@ -56,14 +58,13 @@ def trainMLmodel(rawDf):
y_pred = model.predict(X_test)

# evaluation metrics of the model
print("Accuracy of the XGBClassifier classifier:", round(accuracy_score(y_test, y_pred)*100,2), "%")
print("F1 score of the XGB Classifier:", f1_score(y_test, y_pred), "\n")
print('Trained XGBClassifier model')
print("Accuracy of the XGBClassifier classifier: %.2f %%", round(accuracy_score(y_test, y_pred)*100,2))
print("F1 score of the XGB Classifier: %f", f1_score(y_test, y_pred))
print(classification_report(y_test, y_pred))
confusion_matrix_data = confusion_matrix(y_test, y_pred, labels = model.classes_)
print(confusion_matrix_data, "\n")
# disp = ConfusionMatrixDisplay(confusion_matrix = confusion_matrix_data, display_labels = model.classes_)
# disp = disp.plot(cmap=plt.cm.YlGnBu,values_format='g')
# plt.show()
print(confusion_matrix(y_test, y_pred))

del X_train, X_test, y_train, y_test

return rawDf_onehot, model

Expand Down
20 changes: 16 additions & 4 deletions src/model/Updater.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gc
import os
import os.path
import time
Expand All @@ -20,6 +21,7 @@
import os
from datetime import datetime, timedelta


@timer
class ParquetUpdater(object):

Expand Down Expand Up @@ -262,6 +264,7 @@ def createLocation(self, required_folders):

@timer
def storeThroughputDataAndModel(self):
print('Starting storeThroughputDataAndModel')
now = hp.defaultTimeRange(days=90, datesOnly=True)
start_date = now[0]
end_date = now[1]
Expand All @@ -274,16 +277,22 @@ def storeThroughputDataAndModel(self):
# train the ML model on the loaded dataset
rawDf_onehot, model = trainMLmodel(rawDf)
del rawDf
print('Trained ML model')

self.pq.writeToFile(rawDf_onehot, f'{self.location}ml-datasets/throughput_onehot_Df.parquet')
# save the classification model as a pickle file
model_pkl_file = f'{self.location}ml-datasets/XGB_Classifier_model_throughput.pkl'
with open(model_pkl_file, 'wb') as file:
pickle.dump(model, file)
print('Saved XGB_Classifier_model_throughput.pkl')
del rawDf_onehot, model
gc.collect()


@timer
def storePacketLossDataAndModel(self):
start_time = time.time()
print("Starting storePacketLossDataAndModel")
now = hp.defaultTimeRange(days=60, datesOnly=True)
start_date = now[0]
end_date = now[1]
Expand All @@ -292,20 +301,23 @@ def storePacketLossDataAndModel(self):
plsDf = createPcktDataset(start_date, end_date)
self.pq.writeToFile(plsDf, f'{self.location}ml-datasets/packet_loss_Df.parquet')

# onehot encode the whole dataset and leave only one month for further ML training
print("One-hot encoding the dataset")
plsDf_onehot_month, plsDf_onehot = one_month_data(plsDf)
self.pq.writeToFile(plsDf_onehot, f'{self.location}ml-datasets/packet_loss_onehot_Df.parquet')
del plsDf_onehot

# train the model on one month data
print("Training the model on one month data")
model = packet_loss_train_model(plsDf_onehot_month)
del plsDf_onehot_month

# save the classification model as a pickle file
print("Saving the classification model as a pickle file")
model_pkl_file = f'{self.location}ml-datasets/XGB_Classifier_model_packet_loss.pkl'
with open(model_pkl_file, 'wb') as file:
pickle.dump(model, file)

del plsDf_onehot, model
gc.collect()
end_time = time.time()
print(f"Finished storePacketLossDataAndModel in {end_time - start_time} seconds")


class Scheduler(object):
Expand Down
11 changes: 4 additions & 7 deletions src/pages/throughput-ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,12 @@ def colorMap(eventTypes):
],
State("sites-dropdown-thrpt", "value"))
def update_output(start_date, end_date, sensitivity, sitesState):
print('11111 start_date', start_date, 'end_date', end_date)
if not start_date and not end_date:
start_date, end_date = hp.defaultTimeRange(days=90)

print('22222 start_date', start_date, 'end_date', end_date)
# check if the date range is default
start_date_check, end_date_check = hp.defaultTimeRange(days=90)
# start_date_check, end_date_check = [f'{start_date_check}T00:01:00.000Z', f'{end_date_check}T23:59:59.000Z']
print('start_date_check', start_date_check, 'end_date_check', end_date_check)
# query for the dataset
if (start_date, end_date) == (start_date_check, end_date_check):
pq = Parquet()
Expand All @@ -306,7 +303,7 @@ def update_output(start_date, end_date, sensitivity, sitesState):
# predict the data on the model and return the dataset with original alarms and the ML alarms
global rawDf_onehot_plot, df_to_plot
rawDf_onehot_plot, df_to_plot = predictData(rawDf_onehot, model)
del rawDf_onehot
del rawDf_onehot, model

# create a list with all sites as sources
src_sites = rawDf_onehot_plot.loc[:, rawDf_onehot_plot.columns.str.startswith("src_site")].columns.values.tolist()
Expand Down Expand Up @@ -339,7 +336,7 @@ def update_output(start_date, end_date, sensitivity, sitesState):

for date, alarm_num in alarm_nums.items():
# use the sensitivity chosen by user (default 5)
if alarm_num > alarm_nums_mean * sensitivity:
if (alarm_num > alarm_nums_mean * sensitivity):
# print(site_name, 'alarm mean:', alarm_nums_mean)
# print(alarm_num, 'alarms on', site_name, date)
in_a_row += 1
Expand All @@ -359,8 +356,8 @@ def update_output(start_date, end_date, sensitivity, sitesState):
except:
j += 1

print("\nnumber of successful occurrences of host being both a src and dest:", i)
print("number of unsuccessful occurrences of host being both a src and dest:", j)
print('number of successful occurrences of host being both a src and dest: %d', i)
print('number of unsuccessful occurrences of host being both a src and dest: %d', j)

# making a pretty df and preparing it for converting to a plotly DataTable
data = pd.DataFrame(alarms_list).fillna(value='-')
Expand Down

0 comments on commit 55086f3

Please sign in to comment.