-
Notifications
You must be signed in to change notification settings - Fork 0
/
bda_final_project_code (2).txt
418 lines (317 loc) · 18.9 KB
/
bda_final_project_code (2).txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
"""# Load the Dataset from Github"""
import os
import requests
# URL of the data file hosted on GitHub
data_url = 'https://raw.githubusercontent.com/tesla07/BDA_Project/main/stock_data_medium.csv'
# Fetch the data from git hub
res = requests.get(data_url)
if res.status_code == 200:
data_dir = 'data'
os.makedirs(data_dir, exist_ok=True)
fpath = os.path.join(data_dir, 'stock_data_medium.csv')
with open(fpath, 'wb') as f:
f.write(res.content)
print(f'Download success file saved at {fpath}')
else:
print('Error downloading the file.')
import pandas as pd
fin_sentiments = pd.read_csv('data/stock_data_medium.csv')
fin_sentiments
fin_sentiments_df = spark.createDataFrame(fin_sentiments)
fin_sentiments_df
fin_sentiments_df.show()
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')
import re
import string
def data_preprocessing(stock_news):
stock_news = stock_news.lower()
stock_news = re.sub('\[.*?\]', '', stock_news)
stock_news = re.sub('[%s]' % re.escape(string.punctuation), '', stock_news)
stock_news = re.sub('\w*\d\w*', '', stock_news)
# Remove the stop words from the news
stock_news = ' '.join([tkn for tkn in stock_news.split()
if tkn not in eng_stopwords])
return stock_news
# Sample of cleaned text
print(fin_sentiments['Text'].iloc[2])
data_preprocessing(fin_sentiments['Text'].iloc[2])
#add a new column to store the cleaned text
fin_sentiments['cleaned_text']=fin_sentiments['Text'].map(data_preprocessing)
# Convert sentiment labels to numerical values
sentiment_map = {"positive": 1, "negative": 0}
fin_sentiments["label"] = fin_sentiments["Sentiment"].map(sentiment_map)
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
from collections import defaultdict
from pyspark.sql.functions import col, size, split
from pyspark.sql.functions import max as max_
fin_sentiments_df = spark.createDataFrame(fin_sentiments)
# Tokenize text data
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="tokens")
tokenized_df = tokenizer.transform(fin_sentiments_df)
# Define the token_to_index to hold numerical values of the tokens
token_to_index = defaultdict(lambda: len(token_to_index))
# Define a function to map tokens to numerical sequences and pad sequences
def map_tokens_to_sequences_and_pad(tokens, max_length):
# Convert to numerical sequence
numerical_sequence = [token_to_index[token] for token in tokens]
# Padding
padded_sequence = numerical_sequence[:max_length] + [0] * (max_length - len(numerical_sequence))
return padded_sequence
# Calculate max sequence length
max_sequence_length = tokenized_df.select(size(split(col("cleaned_text"), "\s+")).alias("size")).agg({"size": "max"}).collect()[0][0]
# Register the function as a UDF
map_tokens_to_sequences_and_pad_udf = udf(lambda tokens: map_tokens_to_sequences_and_pad(tokens, max_sequence_length), ArrayType(IntegerType()))
# Apply the mapping function to the tokenized text data
padded_df = tokenized_df.withColumn("padded_sequence", map_tokens_to_sequences_and_pad_udf("tokens"))
# Show the resulting DataFrame
padded_df.show()
dataset = padded_df.select("padded_sequence","label")
dataset.show()
print("Number of rows = ", dataset.count())
# Train test split
train_df, test_df = padded_df.randomSplit([0.8, 0.2], seed=42)
max_sequence_length
"""# Hyperparameter Tuning
For hyperparameter tuning purpose we will use a keras LSTM model. The best paramters will then be used to train our from scratch model
"""
# Tokenization
keras_tk = Tokenizer()
keras_tk.fit_on_texts(fin_sentiments['cleaned_text'])
seq_data = keras_tk.texts_to_sequences(fin_sentiments['cleaned_text'])
mlen = max(len(seq) for seq in seq_data)
padded_data = pad_sequences(seq_data, maxlen=mlen)
# Encoding sentiment labels
lbl = LabelEncoder()
fin_sentiments['label'] = lbl.fit_transform(fin_sentiments['Sentiment'])
# Splitting the dataset into train and test sets
train_feat, test_feat, train_lbl, test_lbl = train_test_split(padded_data, fin_sentiments['label'], test_size=0.2, random_state=42)
# Define LSTM model
def create_lstm_model(units=64, epochs=5, learning_rate=0.001, embedding_dim=64,):
mdl_lstm = Sequential()
mdl_lstm.add(Embedding(input_dim=len(keras_tk.word_index) + 1, output_dim=embedding_dim))
mdl_lstm.add(LSTM(units))
mdl_lstm.add(Dense(1, activation='sigmoid'))
a_opt = Adam(learning_rate=learning_rate)
mdl_lstm.compile(optimizer=a_opt, loss='binary_crossentropy', metrics=['accuracy'])
return mdl_lstm
# Define hyperparameters to test
units_list = [32, 64, 128]
learning_rate = [0.01, 0.001, 0.00001]
epochs_list = [5, 10, 15]
best_acc = 0
best_hyperparameters = {}
# Initialize variables to store performance metrics
train_accuracy_list = []
test_accuracy_list = []
train_rmse_list = []
test_rmse_list = []
train_pre_list = []
test_pre_list = []
# Iterate over hyperparameter combinations
for units in units_list:
for lr in learning_rate:
for epochs in epochs_list:
print(f"Training model with units={units}, learning_rate={lr}, epochs={epochs}...")
model = create_lstm_model(units=units, learning_rate=lr, epochs=epochs)
model.fit(train_feat, train_lbl, epochs=epochs, batch_size=32, verbose=0)
# Predictions
train_prediction = (model.predict(train_feat) > 0.5).astype("int32")
test_prediction = (model.predict(test_feat) > 0.5).astype("int32")
# Metrics calculation
train_acc = accuracy_score(train_lbl, train_prediction)
test_acc = accuracy_score(test_lbl, test_prediction)
train_rmse = np.sqrt(mean_squared_error(train_lbl, train_prediction))
test_rmse = np.sqrt(mean_squared_error(test_lbl, test_prediction))
train_prec = precision_score(train_lbl, train_prediction)
test_prec = precision_score(test_lbl, test_prediction)
# Append to lists
train_accuracy_list.append(train_acc)
test_accuracy_list.append(test_acc)
train_rmse_list.append(train_rmse)
test_rmse_list.append(test_rmse)
train_pre_list.append(train_prec)
test_pre_list.append(test_prec)
print(f"Training Accuracy: {train_acc}")
print(f"Test Accuracy: {test_acc}")
print(f"Training RMSE: {train_rmse}")
print(f"Test RMSE: {test_rmse}")
print(f"Training Precision: {train_prec}")
print(f"Test Precision: {test_prec}")
if test_acc > best_acc:
best_acc = test_acc
best_hyperparameters = {'units': units, 'learning_rate': lr, 'epochs': epochs}
print("Best hyperparameters:", best_hyperparameters)
print("Best accuracy:", best_acc)
"""# LSTM from scratch"""
import numpy as np
class LSTM:
accuracies = []
def __init__(self, num_neurons, num_cells, learning_rate):
self.accuracies = []
self.num_neurons, self.num_cells, self.learning_rate = num_neurons, num_cells, learning_rate
self.forget_state = self.input_state = self.candidate_state = self.cell_state = self.output_state = self.hidden_state = list(np.zeros((num_cells, num_neurons, 1)))
lower_weight, upper_weight = -(1.0 / np.sqrt(num_cells)), (1.0 / np.sqrt(num_cells))
self.weight_forget = lower_weight + np.random.random((num_neurons, 1 + num_neurons)) * (upper_weight - lower_weight)
self.weight_input = lower_weight + np.random.random((num_neurons, 1 + num_neurons)) * (upper_weight - lower_weight)
self.weight_cell = lower_weight + np.random.random((num_neurons, 1 + num_neurons)) * (upper_weight - lower_weight)
self.weight_output = lower_weight + np.random.random((num_neurons, 1 + num_neurons)) * (upper_weight - lower_weight)
self.bias_forget = lower_weight + np.random.random((num_neurons, 1)) * (upper_weight - lower_weight)
self.bias_input = lower_weight + np.random.random((num_neurons, 1)) * (upper_weight - lower_weight)
self.bias_cell = lower_weight + np.random.random((num_neurons, 1)) * (upper_weight - lower_weight)
self.bias_output = lower_weight + np.random.random((num_neurons, 1)) * (upper_weight - lower_weight)
self.output_weight = lower_weight + np.random.random((1, num_neurons)) * (upper_weight - lower_weight)
self.output_bias = lower_weight + np.random.random((1, 1)) * (upper_weight - lower_weight)
def get_weights(self):
# Return weights and biases as a list of numpy arrays
weights = [self.weight_forget, self.bias_forget, self.weight_input, self.bias_input,
self.weight_cell, self.bias_cell, self.weight_output, self.bias_output,
self.output_weight, self.output_bias]
return weights
def set_weights(self, weights):
# Set weights and biases from the provided list of numpy arrays
self.weight_forget, self.bias_forget, self.weight_input, self.bias_input, \
self.weight_cell, self.bias_cell, self.weight_output, self.bias_output, \
self.output_weight, self.output_bias = weights
def forward_propagation(self, X_test):
for curr_cell in range(1, self.num_cells):
helper = np.concatenate((self.hidden_state[curr_cell - 1], X_test[curr_cell].reshape(-1, 1)), axis=0)
forget_gate = self.sigmoid(self.weight_forget @ helper + self.bias_forget)
input_gate = self.sigmoid(self.weight_input @ helper + self.bias_input)
output_gate = self.sigmoid(self.weight_output @ helper + self.bias_output)
candidate_state = self.tanh(self.weight_cell @ helper + self.bias_cell)
internal_state = self.cell_state[curr_cell - 1] * forget_gate + input_gate * candidate_state
hidden_state = self.tanh(internal_state) * output_gate
self.forget_state[curr_cell] = forget_gate
self.input_state[curr_cell] = input_gate
self.candidate_state[curr_cell] = candidate_state
self.cell_state[curr_cell] = internal_state
self.output_state[curr_cell] = output_gate
self.hidden_state[curr_cell] = hidden_state
Y_pred = self.sigmoid(self.output_weight @ self.hidden_state[-1] + self.output_bias)
return Y_pred
def backward_propagation(self, Y_true, Y_pred, X_test):
forget_gate_delta = input_gate_delta = candidate_state_delta = cell_state_delta = output_gate_delta = hidden_state_delta = list(np.zeros((self.num_cells, self.num_neurons, 1)))
gradient_weight_output = gradient_weight_input = gradient_weight_forget = gradient_weight_cell = np.zeros((self.num_neurons, 1 + self.num_neurons))
gradient_bias_output = gradient_bias_input = gradient_bias_forget = np.zeros((self.num_neurons, 1))
gradient_bias_cell = np.zeros_like(a=self.bias_cell)
prediction_difference = Y_true - Y_pred
gradient_bias_final = prediction_difference
gradient_weight_output_layer = prediction_difference * self.hidden_state[-1].T
for curr_index in reversed(range(self.num_cells - 1)):
hidden_state_delta[curr_index] = self.output_weight.T @ prediction_difference + hidden_state_delta[curr_index + 1]
cell_state_delta[curr_index] = hidden_state_delta[curr_index] * self.output_state[curr_index] * (1 - (self.tanh(self.cell_state[curr_index])) ** 2) + cell_state_delta[curr_index + 1] * self.forget_state[curr_index + 1]
candidate_state_delta[curr_index] = cell_state_delta[curr_index] * self.input_state[curr_index] * (1 - (self.candidate_state[curr_index]) ** 2)
input_gate_delta[curr_index] = cell_state_delta[curr_index] * self.candidate_state[curr_index] * self.input_state[curr_index] * (1 - self.input_state[curr_index])
forget_gate_delta[curr_index] = cell_state_delta[curr_index] * self.cell_state[curr_index - 1] * self.forget_state[curr_index] * (1 - self.forget_state[curr_index])
output_gate_delta[curr_index] = hidden_state_delta[curr_index] * self.tanh(self.cell_state[curr_index]) * self.output_state[curr_index] * (1 - self.output_state[curr_index])
x_t = np.concatenate((self.hidden_state[curr_index - 1], X_test[curr_index].reshape(-1, 1)), axis=0)
gradient_weight_forget = gradient_weight_forget + forget_gate_delta[curr_index] @ x_t.T
gradient_weight_input = gradient_weight_input + np.matmul(input_gate_delta[curr_index], x_t.T)
gradient_weight_cell = gradient_weight_cell + np.matmul(cell_state_delta[curr_index], x_t.T)
gradient_weight_output = gradient_weight_output + np.matmul(output_gate_delta[curr_index], x_t.T)
gradient_bias_input = gradient_bias_input + input_gate_delta[curr_index]
gradient_bias_forget = gradient_bias_forget + forget_gate_delta[curr_index]
gradient_bias_output = gradient_bias_output + output_gate_delta[curr_index]
gradient_bias_cell = gradient_bias_cell + cell_state_delta[curr_index]
return gradient_weight_output_layer, gradient_bias_final, gradient_weight_forget, gradient_bias_forget, gradient_weight_input, gradient_bias_input, gradient_weight_output, gradient_bias_output, gradient_weight_cell, gradient_bias_cell
def fit(self, epochs, X_test, Y_actual):
average_y = sum(list(Y_actual)) / len(Y_actual)
for epoch in range(epochs):
training_loss_per_epoch = 0
for curr_index in range(len(X_test)):
Y_pred = self.forward_propagation(X_test[curr_index])
loss = self.binary_cross_entropy_loss(Y_actual[curr_index], Y_pred)
self.backward_propagation(Y_actual[curr_index], Y_pred, X_test[curr_index])
training_loss_per_epoch += loss
print("Epoch:", epoch, "Training loss:", training_loss_per_epoch / len(X_test))
def predict(self, test_x):
preds = []
for curr_index in range(len(test_x)):
pr = self.forward_propagation(test_x[curr_index])
preds.append(1 if pr > 0.5 else 0) # Predicting binary labels based on the probability
return preds
def sigmoid(self, X_test):
return 1 / (1 + np.exp(-X_test.astype(np.float32)))
def tanh(self, X_test):
return np.tanh(X_test.astype(np.float32))
def binary_cross_entropy_loss(self, Y_true, Y_pred):
epsilon = 1e-15 # Small constant to avoid division by zero
loss = - (Y_true * np.log(Y_pred + epsilon) + (1 - Y_true) * np.log(1 - Y_pred + epsilon))
return loss
"""# Run LSTM code without Map Reduce"""
import time
X_train = np.array(train_df.select("padded_sequence").rdd.flatMap(lambda x: x).collect())
y_train = np.array(train_df.select("label").rdd.flatMap(lambda x: x).collect())
beginning = time.time()
# Creating and training the LSTM model using the fit function
lstm_model = LSTM(num_neurons = 62, num_cells=max_sequence_length, learning_rate=0.0001)
lstm_model.fit(epochs=10, X_test=X_train, Y_actual=y_train)
end = time.time()
total_time = end - beginning
print("Train time without map reduce:", total_time, "seconds")
"""## Evaluate the model
"""
def evaluate_model(model, test_x, test_y):
predictions = model.predict(test_x)
correct_predictions = sum(1 for pred, true_label in zip(predictions, test_y) if pred == true_label)
accuracy = correct_predictions / len(test_y)
return accuracy
accuracy = evaluate_model(lstm_model, X_train, y_train)
print("Train Accuracy:", accuracy)
X_test = np.array(test_df.select("padded_sequence").rdd.flatMap(lambda x: x).collect())
y_test = np.array(test_df.select("label").rdd.flatMap(lambda x: x).collect())
accuracy = evaluate_model(lstm_model, X_train, y_train)
print("Test Accuracy:", accuracy)
"""# Run LSTM with Map Reduce"""
from pyspark import SparkContext
import numpy as np
# Define your fit function using PySpark map-reduce paradigm
def fit_spark(X_train, Y_train, epochs, num_neurons, num_cells, learning_rate):
# Broadcast model parameters to all nodes
model_params = sc.broadcast((num_neurons, num_cells, learning_rate))
# Partition the training data
partitions = 10 # You can adjust this based on your cluster size and data size
partitioned_data = sc.parallelize(zip(X_train, Y_train), partitions)
# Map function for training
def train_partition(data):
lstm_model = LSTM(*model_params.value)
for _ in range(epochs):
x, y = data # Unpack the single element from data
# Forward propagation
y_pred = lstm_model.forward_propagation(x)
# Backward propagation
gradient_weight_output_layer, gradient_bias_final, gradient_weight_forget, gradient_bias_forget, gradient_weight_input, gradient_bias_input, gradient_weight_output, gradient_bias_output, gradient_weight_cell, gradient_bias_cell = lstm_model.backward_propagation(y, y_pred, x)
# Update weights
lstm_model.output_weight += learning_rate * gradient_weight_output_layer
lstm_model.output_bias += learning_rate * gradient_bias_final
lstm_model.weight_forget += learning_rate * gradient_weight_forget
lstm_model.bias_forget += learning_rate * gradient_bias_forget
lstm_model.weight_input += learning_rate * gradient_weight_input
lstm_model.bias_input += learning_rate * gradient_bias_input
lstm_model.weight_output += learning_rate * gradient_weight_output
lstm_model.bias_output += learning_rate * gradient_bias_output
lstm_model.weight_cell += learning_rate * gradient_weight_cell
lstm_model.bias_cell += learning_rate * gradient_bias_cell
return lstm_model.get_weights()
# Map-reduce for training
updated_weights = partitioned_data.map(train_partition).reduce(lambda w1, w2: [w + w2 for w, w2 in zip(w1, w2)])
# Create a new LSTM model
lstm_model = LSTM(*model_params.value)
# Set updated weights
lstm_model.set_weights([w / partitions for w in updated_weights])
return lstm_model, updated_weights # Return trained model
import time
beginning = time.time()
# Creating and training the LSTM model using PySpark
trained_model, weights = fit_spark(X_train, y_train, epochs=10, num_neurons=62, num_cells=max_sequence_length, learning_rate=1e-5)
end = time.time()
total_time = end - beginning
print("Train time without map reduce:", total_time, "seconds")
"""## Evaluate Model"""
accuracy = evaluate_model(trained_model, X_train, y_train)
print("Train Accuracy:", accuracy)
accuracy = evaluate_model(trained_model, X_test, y_test)
print("Test Accuracy:", accuracy)