-
Notifications
You must be signed in to change notification settings - Fork 0
/
dsml_controller.py
140 lines (122 loc) · 5.52 KB
/
dsml_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from pox.core import core
import pox.openflow.libopenflow_01 as of
import pox.lib.packet as pkt
from pox.lib.addresses import IPAddr
import pox.misc.arp_responder as arp_responder
from pox.entry_operation import *
from scapy.all import *
import sys
import test_dsm as dsm
import threading
import time
class dsml_controller (object):
def __init__ (self, connection):
self.current_timeout_duration = 0
self.current_timeout_time = time.time()
self.timeout_lock = threading.Lock()
self.TIME_GRANULARITY = 5 # Seconds
self.DEBUG_TIME = True # ENABLE THIS FOR DEBUG TIMING INFO
self.packet_average = 0
self.packets_processed = 0
self.timeout_average = 0
self.timeouts_processed = 0
self.connection = connection
self.connection.addListeners(self)
#clear all previous rules
#core.openflow.clear_flows_of_connect = 1
self.engine = dsm.Engine()
starting_of_rules = self.engine.get_initial_rules()
print starting_of_rules[0]
print starting_of_rules[1]
for item in starting_of_rules:
if item.command == "add":
if item.destination_port != None:
tp_dst = int(item.destination_port)
else:
tp_dst = None
if item.source_port != None:
tp_src = int(item.source_port)
else:
tp_src = None
self.waiter_thread = threading.Thread(target=self.timeout_waiter, name="Waiter")
self.waiter_thread.daemon = True
self.waiter_thread.start()
def convert_to_scapy_packet(self, pox_packet):
'''Convert a POX-style packet to a Scapy-style
packet for use in the DSM.'''
raw_bytes = pox_packet.raw
return Ether(raw_bytes)
def debug_wrapper_handle_packet(self, pkt, timeout=0):
time_start = None
return_list = None
if self.DEBUG_TIME:
time_start = time.time()
return_list = self.engine.handle_packet(pkt, timeout)
time_stop = time.time()
diff_time = time_stop - time_start
if pkt and self.packets_processed < 5000:
self.packets_processed += 1
self.packet_average = self.packet_average * (float(self.packets_processed - 1) /
float(self.packets_processed)) + \
diff_time * (1.0 / float(self.packets_processed))
elif timeout > 0 and self.timeouts_processed < 5000:
self.timeouts_processed += 1
self.timeout_average = self.timeout_average * (float(self.timeouts_processed - 1) /
float(self.timeouts_processed)) + \
diff_time * (1.0 / float(self.timeouts_processed))
else:
return_list = self.engine.handle_packet(pkt, timeout)
return return_list
def timeout_waiter(self):
'''Calls handle_packet periodically.'''
while True:
time.sleep(self.TIME_GRANULARITY)
with self.timeout_lock:
self.current_timeout_duration += self.TIME_GRANULARITY
return_list = self.debug_wrapper_handle_packet(None, self.current_timeout_duration)
if return_list[0] == True:
with self.timeout_lock:
self.current_timeout_duration = 0
if len(return_list) > 1:
self.write_entry(return_list[1:])
elif return_list[0] == "Exit":
self.cleanup()
def handle_PacketIn(self, event):
pox_packet = event.parsed
scapy_packet = self.convert_to_scapy_packet(pox_packet)
return_list = self.debug_wrapper_handle_packet(scapy_packet)
if return_list[0] == True:
with self.timeout_lock:
self.current_timeout_duration = 0
if len(return_list) > 1:
self.write_entry(return_list[1:])
elif return_list[0] == "Exit":
self.cleanup()
def cleanup(self):
# TODO: Reset to default L2 learning switch rules.
# Otherwise the switch will require a manual reload.
if self.DEBUG_TIME:
with open("controller_log.txt", "a") as f:
f.write("DEBUG information @ " + `time.time()` + "\n")
f.write("Packets Processed (max 5000): " + `self.packets_processed` + "\n")
f.write("Average Packet Time: " + `self.packet_average` + "\n")
f.write("Timeouts Processed (max 5000): " + `self.timeouts_processed` + "\n")
f.write("Average Timeout Time: " + `self.timeout_average` + "\n\n")
sys.exit(0)
def write_entry(self, rules):
for item in rules:
if item.command == "add":
if item.destination_port != None:
tp_dst = int(item.destination_port)
else:
tp_dst = None
if item.source_port != None:
tp_src = int(item.source_port)
else:
tp_src = None
add_sniff(self.connection, nw_dst=item.destination_ip, nw_src=item.source_ip, tp_dst=tp_dst, tp_src=tp_src)
def launch ():
arp_responder.launch(no_learn = True)
def start_switch (event):
dsml_controller(event.connection)
core.openflow.addListenerByName("ConnectionUp", start_switch)