-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathopenflow_thread.py
576 lines (527 loc) · 25.6 KB
/
openflow_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##
# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
# This file is part of openmano
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: [email protected]
##
'''
This thread interacts with a openflow floodligth controller to create dataplane connections
'''
__author__="Pablo Montes, Alfonso Tierno"
__date__ ="17-jul-2015"
#import json
import threading
import time
import Queue
import requests
import logging
class FlowBadFormat(Exception):
'''raise when a bad format of flow is found'''
def change_of2db(flow):
'''Change 'flow' dictionary from openflow format to database format
Basically the change consist of changing 'flow[actions] from a list of
double tuple to a string
from [(A,B),(C,D),..] to "A=B,C=D" '''
action_str_list=[]
if type(flow)!=dict or "actions" not in flow:
raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
try:
for action in flow['actions']:
action_str_list.append( action[0] + "=" + str(action[1]) )
flow['actions'] = ",".join(action_str_list)
except:
raise FlowBadFormat("Unexpected format at 'actions'")
def change_db2of(flow):
'''Change 'flow' dictionary from database format to openflow format
Basically the change consist of changing 'flow[actions]' from a string to
a double tuple list
from "A=B,C=D,..." to [(A,B),(C,D),..]
raise FlowBadFormat '''
actions=[]
if type(flow)!=dict or "actions" not in flow or type(flow["actions"])!=str:
raise FlowBadFormat("Bad input parameters, expect dictionary with 'actions' as key")
action_list = flow['actions'].split(",")
for action_item in action_list:
action_tuple = action_item.split("=")
if len(action_tuple) != 2:
raise FlowBadFormat("Expected key=value format at 'actions'")
if action_tuple[0].strip().lower()=="vlan":
if action_tuple[1].strip().lower() in ("none", "strip"):
actions.append( ("vlan",None) )
else:
try:
actions.append( ("vlan", int(action_tuple[1])) )
except:
raise FlowBadFormat("Expected integer after vlan= at 'actions'")
elif action_tuple[0].strip().lower()=="out":
actions.append( ("out", str(action_tuple[1])) )
else:
raise FlowBadFormat("Unexpected '%s' at 'actions'"%action_tuple[0])
flow['actions'] = actions
class of_test_connector():
'''This is a fake openflow connector for testing.
It does nothing and it is used for running openvim without an openflow controller
'''
def __init__(self, params):
self.name = "ofc_test"
self.rules={}
self.logger = logging.getLogger('vim.OF.TEST')
self.logger.setLevel( getattr(logging, params.get("of_debug", "ERROR") ) )
def get_of_switches(self):
return 0, ()
def obtain_port_correspondence(self):
return 0, ()
def del_flow(self, flow_name):
if flow_name in self.rules:
self.logger.debug("del_flow OK")
del self.rules[flow_name]
return 0, None
else:
self.logger.warning("del_flow not found")
return -1, "flow %s not found"
def new_flow(self, data):
self.rules[ data["name"] ] = data
self.logger.debug("new_flow OK")
return 0, None
def get_of_rules(self, translate_of_ports=True):
return 0, self.rules
def clear_all_flows(self):
self.logger.debug("clear_all_flows OK")
self.rules={}
return 0, None
class openflow_thread(threading.Thread):
def __init__(self, OF_connector, db, db_lock, of_test, pmp_with_same_vlan, debug='ERROR'):
threading.Thread.__init__(self)
self.db = db
self.pmp_with_same_vlan = pmp_with_same_vlan
self.name = "openflow"
self.test = of_test
self.db_lock = db_lock
self.OF_connector = OF_connector
self.logger = logging.getLogger('vim.OF')
self.logger.setLevel( getattr(logging, debug) )
self.queueLock = threading.Lock()
self.taskQueue = Queue.Queue(2000)
def insert_task(self, task, *aditional):
try:
self.queueLock.acquire()
task = self.taskQueue.put( (task,) + aditional, timeout=5)
self.queueLock.release()
return 1, None
except Queue.Full:
return -1, "timeout inserting a task over openflow thread " + self.name
def run(self):
while True:
self.queueLock.acquire()
if not self.taskQueue.empty():
task = self.taskQueue.get()
else:
task = None
self.queueLock.release()
if task is None:
time.sleep(1)
continue
if task[0] == 'update-net':
r,c = self.update_of_flows(task[1])
#update database status
self.db_lock.acquire()
if r<0:
UPDATE={'status':'ERROR', 'last_error': str(c)}
self.logger.error("processing task 'update-net' %s: %s", str(task[1]), c)
else:
UPDATE={'status':'ACTIVE', 'last_error': None}
self.logger.debug("processing task 'update-net' %s: OK", str(task[1]))
self.db.update_rows('nets', UPDATE, WHERE={'uuid':task[1]})
self.db_lock.release()
elif task[0] == 'clear-all':
r,c = self.clear_all_flows()
if r<0:
self.logger.error("processing task 'clear-all': %s", c)
else:
self.logger.debug("processing task 'clear-all': OK")
elif task[0] == 'exit':
self.logger.debug("exit from openflow_thread")
self.terminate()
return 0
else:
self.logger.error("unknown task %s", str(task))
def terminate(self):
pass
#print self.name, ": exit from openflow_thread"
def update_of_flows(self, net_id):
ports=()
self.db_lock.acquire()
select_= ('type','admin_state_up', 'vlan', 'provider', 'bind_net','bind_type','uuid')
result, nets = self.db.get_table(FROM='nets', SELECT=select_, WHERE={'uuid':net_id} )
#get all the networks binding to this
if result > 0:
if nets[0]['bind_net']:
bind_id = nets[0]['bind_net']
else:
bind_id = net_id
#get our net and all bind_nets
result, nets = self.db.get_table(FROM='nets', SELECT=select_,
WHERE_OR={'bind_net':bind_id, 'uuid':bind_id} )
self.db_lock.release()
if result < 0:
return -1, "DB error getting net: " + nets
#elif result==0:
#net has been deleted
ifaces_nb = 0
database_flows = []
for net in nets:
net_id = net["uuid"]
if net['admin_state_up'] == 'false':
net['ports'] = ()
else:
self.db_lock.acquire()
nb_ports, net_ports = self.db.get_table(
FROM='ports',
SELECT=('switch_port','vlan','uuid','mac','type','model'),
WHERE={'net_id':net_id, 'admin_state_up':'true', 'status':'ACTIVE'} )
self.db_lock.release()
if nb_ports < 0:
#print self.name, ": update_of_flows() ERROR getting ports", ports
return -1, "DB error getting ports from net '%s': %s" % (net_id, net_ports)
#add the binding as an external port
if net['provider'] and net['provider'][:9]=="openflow:":
external_port={"type":"external","mac":None}
external_port['uuid'] = net_id + ".1" #fake uuid
if net['provider'][-5:]==":vlan":
external_port["vlan"] = net["vlan"]
external_port["switch_port"] = net['provider'][9:-5]
else:
external_port["vlan"] = None
external_port["switch_port"] = net['provider'][9:]
net_ports = net_ports + (external_port,)
nb_ports += 1
net['ports'] = net_ports
ifaces_nb += nb_ports
# Get the name of flows that will be affected by this NET
self.db_lock.acquire()
result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':net_id})
self.db_lock.release()
if result < 0:
#print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
return -1, "DB error getting flows from net '%s': %s" %(net_id, database_net_flows)
database_flows += database_net_flows
# Get the name of flows where net_id==NULL that means net deleted (At DB foreign key: On delete set null)
self.db_lock.acquire()
result, database_net_flows = self.db.get_table(FROM='of_flows', WHERE={'net_id':None})
self.db_lock.release()
if result < 0:
#print self.name, ": update_of_flows() ERROR getting flows from database", database_flows
return -1, "DB error getting flows from net 'null': %s" %(database_net_flows)
database_flows += database_net_flows
#Get the existing flows at openflow controller
result, of_flows = self.OF_connector.get_of_rules()
if result < 0:
#print self.name, ": update_of_flows() ERROR getting flows from controller", of_flows
return -1, "OF error getting flows: " + of_flows
if ifaces_nb < 2:
pass
elif net['type'] == 'ptp':
if ifaces_nb > 2:
#print self.name, 'Error, network '+str(net_id)+' has been defined as ptp but it has '+\
# str(ifaces_nb)+' interfaces.'
return -1, "'ptp' type network cannot connect %d interfaces, only 2" % ifaces_nb
elif net['type'] == 'data':
if ifaces_nb > 2 and self.pmp_with_same_vlan:
# check all ports are VLAN (tagged) or none
vlan_tag = None
for port in ports:
if port["type"]=="external":
if port["vlan"] != None:
if port["vlan"]!=net["vlan"]:
text="External port vlan-tag and net vlan-tag must be the same when flag 'of_controller_nets_with_same_vlan' is True"
#print self.name, "Error", text
return -1, text
if vlan_tag == None:
vlan_tag=True
elif vlan_tag==False:
text="Passthrough and external port vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
#print self.name, "Error", text
return -1, text
else:
if vlan_tag == None:
vlan_tag=False
elif vlan_tag == True:
text="SR-IOV and external port not vlan-tagged can not be connected when flag 'of_controller_nets_with_same_vlan' is True"
#print self.name, "Error", text
return -1, text
elif port["model"]=="PF" or port["model"]=="VFnotShared":
if vlan_tag == None:
vlan_tag=False
elif vlan_tag==True:
text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
#print self.name, "Error", text
return -1, text
elif port["model"] == "VF":
if vlan_tag == None:
vlan_tag=True
elif vlan_tag==False:
text="Passthrough and SR-IOV ports cannot be connected when flag 'of_controller_nets_with_same_vlan' is True"
#print self.name, "Error", text
return -1, text
else:
return -1, 'Only ptp and data networks are supported for openflow'
# calculate new flows to be inserted
result, new_flows = self._compute_net_flows(nets)
if result < 0:
return result, new_flows
#modify database flows format and get the used names
used_names=[]
for flow in database_flows:
try:
change_db2of(flow)
except FlowBadFormat as e:
self.logger.error("Exception FlowBadFormat: '%s', flow: '%s'",str(e), str(flow))
continue
used_names.append(flow['name'])
name_index=0
#insert at database the new flows, change actions to human text
for flow in new_flows:
#1 check if an equal flow is already present
index = self._check_flow_already_present(flow, database_flows)
if index>=0:
database_flows[index]["not delete"]=True
self.logger.debug("Skipping already present flow %s", str(flow))
continue
#2 look for a non used name
flow_name=flow["net_id"]+"."+str(name_index)
while flow_name in used_names or flow_name in of_flows:
name_index += 1
flow_name=flow["net_id"]+"."+str(name_index)
used_names.append(flow_name)
flow['name'] = flow_name
#3 insert at openflow
result, content = self.OF_connector.new_flow(flow)
if result < 0:
#print self.name, ": Error '%s' at flow insertion" % c, flow
return -1, content
#4 insert at database
try:
change_of2db(flow)
except FlowBadFormat as e:
#print self.name, ": Error Exception FlowBadFormat '%s'" % str(e), flow
return -1, str(e)
self.db_lock.acquire()
result, content = self.db.new_row('of_flows', flow)
self.db_lock.release()
if result < 0:
#print self.name, ": Error '%s' at database insertion" % content, flow
return -1, content
#delete not needed old flows from openflow and from DDBB,
#check that the needed flows at DDBB are present in controller or insert them otherwise
for flow in database_flows:
if "not delete" in flow:
if flow["name"] not in of_flows:
#not in controller, insert it
result, content = self.OF_connector.new_flow(flow)
if result < 0:
#print self.name, ": Error '%s' at flow insertion" % c, flow
return -1, content
continue
#Delete flow
if flow["name"] in of_flows:
result, content = self.OF_connector.del_flow(flow['name'])
if result<0:
self.logger.error("cannot delete flow '%s' from OF: %s", flow['name'], content )
continue #skip deletion from database
#delete from database
self.db_lock.acquire()
result, content = self.db.delete_row_by_key('of_flows', 'id', flow['id'])
self.db_lock.release()
if result<0:
self.logger.error("cannot delete flow '%s' from DB: %s", flow['name'], content )
return 0, 'Success'
def clear_all_flows(self):
try:
if not self.test:
self.OF_connector.clear_all_flows()
#remove from database
self.db_lock.acquire()
self.db.delete_row_by_key('of_flows', None, None) #this will delete all lines
self.db_lock.release()
return 0, None
except requests.exceptions.RequestException as e:
#print self.name, ": clear_all_flows Exception:", str(e)
return -1, str(e)
flow_fields=('priority', 'vlan', 'ingress_port', 'actions', 'dst_mac', 'src_mac', 'net_id')
def _check_flow_already_present(self, new_flow, flow_list):
'''check if the same flow is already present in the flow list
The flow is repeated if all the fields, apart from name, are equal
Return the index of matching flow, -1 if not match'''
index=0
for flow in flow_list:
equal=True
for f in self.flow_fields:
if flow.get(f) != new_flow.get(f):
equal=False
break
if equal:
return index
index += 1
return -1
def _compute_net_flows(self, nets):
new_flows=[]
new_broadcast_flows={}
nb_ports = 0
# Check switch_port information is right
self.logger.debug("_compute_net_flows nets: %s", str(nets))
for net in nets:
for port in net['ports']:
nb_ports += 1
if not self.test and str(port['switch_port']) not in self.OF_connector.pp2ofi:
error_text= "switch port name '%s' is not valid for the openflow controller" % str(port['switch_port'])
#print self.name, ": ERROR " + error_text
return -1, error_text
for net_src in nets:
net_id = net_src["uuid"]
for net_dst in nets:
vlan_net_in = None
vlan_net_out = None
if net_src == net_dst:
#intra net rules
priority = 1000
elif net_src['bind_net'] == net_dst['uuid']:
if net_src.get('bind_type') and net_src['bind_type'][0:5] == "vlan:":
vlan_net_out = int(net_src['bind_type'][5:])
priority = 1100
elif net_dst['bind_net'] == net_src['uuid']:
if net_dst.get('bind_type') and net_dst['bind_type'][0:5] == "vlan:":
vlan_net_in = int(net_dst['bind_type'][5:])
priority = 1100
else:
#nets not binding
continue
for src_port in net_src['ports']:
vlan_in = vlan_net_in
if vlan_in == None and src_port['vlan'] != None:
vlan_in = src_port['vlan']
elif vlan_in != None and src_port['vlan'] != None:
#TODO this is something that we can not do. It requires a double VLAN check
#outer VLAN should be src_port['vlan'] and inner VLAN should be vlan_in
continue
# BROADCAST:
broadcast_key = src_port['uuid'] + "." + str(vlan_in)
if broadcast_key in new_broadcast_flows:
flow_broadcast = new_broadcast_flows[broadcast_key]
else:
flow_broadcast = {'priority': priority,
'net_id': net_id,
'dst_mac': 'ff:ff:ff:ff:ff:ff',
"ingress_port": str(src_port['switch_port']),
'actions': []
}
new_broadcast_flows[broadcast_key] = flow_broadcast
if vlan_in is not None:
flow_broadcast['vlan_id'] = str(vlan_in)
for dst_port in net_dst['ports']:
vlan_out = vlan_net_out
if vlan_out == None and dst_port['vlan'] != None:
vlan_out = dst_port['vlan']
elif vlan_out != None and dst_port['vlan'] != None:
#TODO this is something that we can not do. It requires a double VLAN set
#outer VLAN should be dst_port['vlan'] and inner VLAN should be vlan_out
continue
#if src_port == dst_port:
# continue
if src_port['switch_port'] == dst_port['switch_port'] and vlan_in == vlan_out:
continue
flow = {
"priority": priority,
'net_id': net_id,
"ingress_port": str(src_port['switch_port']),
'actions': []
}
if vlan_in is not None:
flow['vlan_id'] = str(vlan_in)
# allow that one port have no mac
if dst_port['mac'] is None or nb_ports==2: # point to point or nets with 2 elements
flow['priority'] = priority-5 # less priority
else:
flow['dst_mac'] = str(dst_port['mac'])
if vlan_out == None:
if vlan_in != None:
flow['actions'].append( ('vlan',None) )
else:
flow['actions'].append( ('vlan', vlan_out ) )
flow['actions'].append( ('out', str(dst_port['switch_port'])) )
if self._check_flow_already_present(flow, new_flows) >= 0:
self.logger.debug("Skipping repeated flow '%s'", str(flow))
continue
new_flows.append(flow)
# BROADCAST:
if nb_ports <= 2: # point to multipoint or nets with more than 2 elements
continue
out = (vlan_out, str(dst_port['switch_port']))
if out not in flow_broadcast['actions']:
flow_broadcast['actions'].append( out )
#BROADCAST
for flow_broadcast in new_broadcast_flows.values():
if len(flow_broadcast['actions'])==0:
continue #nothing to do, skip
flow_broadcast['actions'].sort()
if 'vlan_id' in flow_broadcast:
previous_vlan = 0 # indicates that a packet contains a vlan, and the vlan
else:
previous_vlan = None
final_actions=[]
action_number = 0
for action in flow_broadcast['actions']:
if action[0] != previous_vlan:
final_actions.append( ('vlan', action[0]) )
previous_vlan = action[0]
if self.pmp_with_same_vlan and action_number:
return -1, "Can not interconnect different vlan tags in a network when flag 'of_controller_nets_with_same_vlan' is True."
action_number += 1
final_actions.append( ('out', action[1]) )
flow_broadcast['actions'] = final_actions
if self._check_flow_already_present(flow_broadcast, new_flows) >= 0:
self.logger.debug("Skipping repeated flow '%s'", str(flow_broadcast))
continue
new_flows.append(flow_broadcast)
#UNIFY openflow rules with the same input port and vlan and the same output actions
#These flows differ at the dst_mac; and they are unified by not filtering by dst_mac
#this can happen if there is only two ports. It is converted to a point to point connection
flow_dict={} # use as key vlan_id+ingress_port and as value the list of flows matching these values
for flow in new_flows:
key = str(flow.get("vlan_id"))+":"+flow["ingress_port"]
if key in flow_dict:
flow_dict[key].append(flow)
else:
flow_dict[key]=[ flow ]
new_flows2=[]
for flow_list in flow_dict.values():
convert2ptp=False
if len (flow_list)>=2:
convert2ptp=True
for f in flow_list:
if f['actions'] != flow_list[0]['actions']:
convert2ptp=False
break
if convert2ptp: # add only one unified rule without dst_mac
self.logger.debug("Convert flow rules to NON mac dst_address " + str(flow_list) )
flow_list[0].pop('dst_mac')
flow_list[0]["priority"] -= 5
new_flows2.append(flow_list[0])
else: # add all the rules
new_flows2 += flow_list
return 0, new_flows2