-
Notifications
You must be signed in to change notification settings - Fork 1
/
nrds_cloud_run.py
320 lines (258 loc) · 10.4 KB
/
nrds_cloud_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
from typing import Dict, List
from google.cloud import aiplatform
import numpy as np
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import json
import time
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import time
# create a client instance of the library
elastic_client = Elasticsearch()
print('************************************')
#import and train word tokenizer
#import csv from github
url = "https://raw.githubusercontent.com/Nawod/Neuro-Realistic-Detection-System/master/archive/url_combined_train.csv"
url_data = pd.read_csv(url)
#url dataset
# url_data = pd.read_csv('archive/url_combined_train.csv')
url_tokenizer = Tokenizer(num_words=50000, split=' ')
url_tokenizer.fit_on_texts(url_data['url'].values)
#traffic dataset
url_t = "https://raw.githubusercontent.com/Nawod/Neuro-Realistic-Detection-System/master/archive/traffic_combine_train.csv"
traffic_data = pd.read_csv(url_t)
# traffic_data = pd.read_csv('archive/traffic_combine_train.csv')
traffic_tokenizer = Tokenizer(num_words=5000, split=' ')
traffic_tokenizer.fit_on_texts(traffic_data['feature'].values)
print('Tokenizer Loaded')
print('************************************')
#retrive elk value
def get_elk_nlp():
response = elastic_client.search(
index='nlp-log-1', #elasticsearch index
body={},
)
# print(type(response))
# nested inside the API response object
elastic_docs_nlp = response["hits"]["hits"]
traffics_n = elastic_docs_nlp
nlp_traffic = {}
#append data
for num, doc in enumerate(traffics_n):
traffic = doc["_source"]
for key, value in traffic.items():
if key == "@timestamp":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "method":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "id_resp_p":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "version":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "host":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "uri":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "user_agent":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "status_msg":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
if key == "response_body_len":
try:
nlp_traffic[key] = np.append(nlp_traffic[key], value)
except KeyError:
nlp_traffic[key] = np.array([value])
return nlp_traffic
#text cleaning
def url_clean_text(df):
spec_chars = ["!",'"',"#","%","&","'","(",")",
"*","+",",","-",".","/",":",";","<",
">","?","@","[","\\","]","^","_",
"`","{","|","}","~","–"]
for char in spec_chars:
df['url'] = df['url'].str.replace(char, ' ')
return df
def traffic_clean_text(df):
spec_chars = ["!",'"',"#","%","&","'","(",")",
"*","+",",","-",".","/",":",";","<",
">","?","@","[","\\","]","^","_",
"`","{","|","}","~","–"]
for char in spec_chars:
df['feature'] = df['feature'].str.replace(char, ' ')
return df
#tokenize inputs
def url_token(text):
X = url_tokenizer.texts_to_sequences(pd.Series(text).values)
Y = pad_sequences(X, maxlen=150)
return Y
def traffic_token(text):
X = traffic_tokenizer.texts_to_sequences(pd.Series(text).values)
Y = pad_sequences(X, maxlen=160)
return Y
#vertex AI API call for get prediction
def predict_mal_url_api(
project: str ="511747508882",
endpoint_id: str ="1302648600030871552",
location: str = "us-central1",
instances: List = list
):
aiplatform.init(project=project, location=location)
endpoint = aiplatform.Endpoint(endpoint_id)
prediction = endpoint.predict(instances=instances)
return prediction
def predict_net_traffic_api(
project: str ="511747508882",
endpoint_id: str ="3266218037564407808",
location: str = "us-central1",
instances: List = list
):
aiplatform.init(project=project, location=location)
endpoint = aiplatform.Endpoint(endpoint_id)
prediction = endpoint.predict(instances=instances)
return prediction
#malicious url classification
def url_predict(body):
#retive the data
input_data = json.loads(body)['data']
embeded_text = url_token(input_data) #tokenize the data
list = embeded_text.tolist()
response = predict_mal_url_api(instances=list) #call vertex AI API
prediction = response.predictions[0][0] #retrive data
# print('url prediction: ', prediction)
#set appropriate sentiment
if prediction < 0.5:
t_sentiment = 'Malicious URL'
elif prediction >= 0.5:
t_sentiment = 'good'
return { #return the dictionary for endpoint
"Label" : t_sentiment
}
#network traffic classification
def traffic_predict(body):
#retive the data
input_data = json.loads(body)['data']
embeded_text = traffic_token(input_data) #tokenize the data
list = embeded_text.tolist()
response = predict_net_traffic_api(instances=list) #call vertex AI API
prediction = response.predictions[0][0] #retrive data
# print('traffic prediction: ', prediction)
#set appropriate sentiment
if prediction < 0.5:
t_sentiment = 'bad traffic'
elif prediction >= 0.5:
t_sentiment = 'good'
return { #return the dictionary for endpoint
"Label" : t_sentiment
}
#nlp models prediciton
def nlp_model(df):
print('Network traffic classifing_#####')
#text pre processing
new_df = df
new_df['url'] = new_df['host'].astype(str).values + new_df['uri'].astype(str).values
new_df['feature'] = new_df['method'].astype(str).values+' '+new_df['id_resp_p'].astype(str).values+' '+new_df['version'].astype(str).values+' '+new_df['host'].astype(str).values+' '+new_df['uri'].astype(str).values+' '+new_df['user_agent'].astype(str).values+' '+new_df['status_msg'].astype(str).values+' '+new_df['response_body_len'].astype(str).values
new_df = url_clean_text(new_df)
new_df= traffic_clean_text(new_df)
#convert dataframe into a array
url_array = new_df[['url']].to_numpy()
traffic_array = new_df[['feature']].to_numpy()
# creating a blank series
url_label_array = pd.Series([])
traffic_label_array = pd.Series([])
for i in range(url_array.shape[0]):
#create json requests
url_lists = url_array[i].tolist() #for urls
url_data = {'data':url_lists}
url_body = str.encode(json.dumps(url_data))
traffic_lists = traffic_array[i].tolist() #for net traffics
traffic_data = {'data':traffic_lists}
traffic_body = str.encode(json.dumps(traffic_data))
#call mal url function to classification
pred_url = url_predict(url_body)
pred_traffic = traffic_predict(traffic_body)
#retrive the outputs
url_output = str.encode(json.dumps(pred_url))
url_label = json.loads(url_output)['Label']
traffic_output = str.encode(json.dumps(pred_traffic))
traffic_label = json.loads(traffic_output)['Label']
#insert labels to series
url_label_array[i] = url_label
traffic_label_array[i] = traffic_label
#inserting new column with labels
df.insert(1, "url_label", url_label_array)
df.insert(2, "traffic_label", traffic_label_array)
return df
#index key values for mal url output
nlp_keys = [ "@timestamp","ID","method","id_resp_p","version","host","uri","user_agent","status_msg","response_body_len","url_label","traffic_label"]
def nlpFilterKeys(document):
return {key: document[key] for key in nlp_keys }
# es_client = Elasticsearch(http_compress=True)
es_client = Elasticsearch([{'host': 'localhost', 'port': 9200}])
def nlp_doc_generator(df):
df_iter = df.iterrows()
for index, document in df_iter:
yield {
"_index": 'nlp_output',
"_type": "_doc",
"_id" : f"{document['ID']}",
"_source": nlpFilterKeys(document),
}
#raise StopIteration
#main loop
def main():
# u = url_data.sample(n = 10).reset_index(drop=True)
# t = traffic_data.sample(n = 10).reset_index(drop=True)
# print(u.head())
# print(t.head())
count = 1
while True:
print('Batch :', count)
# start_time = time.time() #to calculate response time
#retrive data and convert to dataframe
print('Retriving the data batch from ELK_#####')
nlp_traffic = get_elk_nlp()
elk_df_nlp = pd.DataFrame(nlp_traffic)
# elk_df_nlp = u
# elk_df_nlp['feature'] = pd.Series(t['feature'])
# print(elk_df_nlp.head())
#NLP prediction
nlp_df = nlp_model(elk_df_nlp)
nlp_df.insert(0, 'ID', range(count , count + len(nlp_df)))
# print(nlp_df.head())
# print(nlp_df.shape)
# Exporting Pandas Data to Elasticsearch
helpers.bulk(es_client, nlp_doc_generator(nlp_df))
# print("response time %s seconds ---" % (time.time() - start_time))
print('Batch', count , 'exported to ELK')
print('************************************')
count = count + len(elk_df_nlp)
# get new records in every 10 seconds
time.sleep(10)
if __name__ == '__main__':
main()