-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransition.go
1010 lines (844 loc) · 32.8 KB
/
transition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package mrnes
// transition.go holds state and code related to the transition of
// traffic between the application layer and the mrnes layer,
// and contains the methods involved in managing the 'flow' representation of traffic
//
import (
"fmt"
"github.com/iti/evt/evtm"
"github.com/iti/evt/vrtime"
"golang.org/x/exp/slices"
"math"
)
// ConnType tags traffic as discrete or flow
type ConnType int
const (
FlowConn ConnType = iota
DiscreteConn
)
// ConnLatency describes one of three ways that latency is ascribed to
// a source-to-destination connection. 'Zero' ascribes none at all, is instantaneous,
// which is used in defining major flow's to reserve bandwidth. 'Place' means
// that at the time a message arrives to the network, a latency to its destination is
// looked up or computed without simulating packet transit across the network.
// 'Simulate' means the packet is simulated traversing the route, through every interface.
type ConnLatency int
const (
_ = iota
Zero ConnLatency = iota
Place
Simulate
)
// a rtnRecord saves the event handling function to call when the network simulation
// pushes a message back into the application layer. Characteristics gathered through
// the network traversal are included, and so available to the application layer
type rtnRecord struct {
pckts int
prArrvl float64
rtnFunc evtm.EventHandlerFunction
rtnCxt any
rtnID int
rtnLoss *float64
rtnData any
}
// RprtRate is the structure of a message that is scheduled for delivery
// as part of a 'Report' made when a flow rate changes or a packet is lost
type RprtRate struct {
FlowID int
MbrID int
AcceptedRate float64
Action FlowAction
}
// ConnDesc holds characteristics of a connection...the type (discrete or flow),
// the latency (how delay in delivery is ascribed) and in the case of a flow,
// the action (start, end, rate change)
type ConnDesc struct {
Type ConnType
Latency ConnLatency
Action FlowAction
}
// RtnDesc holds the context and event handler
// for scheduling a return
type RtnDesc struct {
Cxt any
EvtHdlr evtm.EventHandlerFunction
}
// RtnDescs hold four RtnDesc structures, for four different use scenarios.
// Bundling in a struct makes code that uses them all more readable at the function call interface
type RtnDescs struct {
Rtn *RtnDesc
Src *RtnDesc
Dst *RtnDesc
Loss *RtnDesc
}
// NetMsgIDs holds four identifies that may be associated with a flow.
// ExecID comes from the application layer and may tie together numbers of communications
// that occur moving application layer messages between endpoints. FlowID
// refer to a flow identity, although the specific value given is created at the application layer
// (as are the flow themselves). ConnectID is created at the mrnes layer, describes a single source-to-destination
// message transfer
type NetMsgIDs struct {
ExecID int // execution id, from application
FlowID int // flow id
ClassID int
ConnectID int // connection id
}
// EnterNetwork is called after the execution from the application layer
// It creates NetworkMsg structs to represent the start and end of the message, and
// schedules their arrival to the egress interface of the message source endpt
//
// Two kinds of traffic may enter the network, Flow and Discrete
// Entries for a flow may establish a new one, modify an existing one, or delete existing ones.
// Messages that notify destinations of these actions may be delivered instantly, may be delivered using
// an estimate of the cross-network latency which depends on queueing network approximations, or may be
// pushed through the network as individual packets, simulated at each network device.
//
// input connType is one of {Flow, Discrete}
//
// flowAction is one of {Srt, End, Chg}
// connLatency is one of {Zero, Place, Simulate}
//
// We approximate the time required for a packet to pass through an interface or network is a transition constant plus
// the mean time in an M/D/1 queuing system. The arrivals are packets whose length is the frame size,
// the deterministic service time D is time required to serve a packet with a server whose bit service rate
// is the bandwidth available at the interface or network to serve (meaning the total capacity minus the
// bandwidth allocations to other flows at that interface or network), and the arrival rate is the accepted
// rate allocated to the flow.
//
// # Description of possible input parameters
//
// | Message | connType | flowAction | connLatency | flowID
// | --------------- | ------------- | ------------- | --------------------- | ----------------------- |
// | Discrete Packet | DiscreteConn | N/A | Zero, Place, Simulate | >0 => embedded |
// | Flow | FlowConn | Srt, Chg, End | Zero, Place, Simulate | flowID>0 |
func (np *NetworkPortal) EnterNetwork(evtMgr *evtm.EventManager, srcDev, dstDev string, msgLen int,
connDesc *ConnDesc, IDs NetMsgIDs, rtns RtnDescs, requestRate float64, msg any) (int, float64, bool) {
// pull out the IDs for clarity
flowID := IDs.FlowID
connectID := IDs.ConnectID
classID := IDs.ClassID
execID := IDs.ExecID
// if connectID>0 make sure that an entry in np.Connections exists
_, present := np.Connections[connectID]
if connectID > 0 && !present {
panic(fmt.Errorf("non-zero connectID offered to EnterNetwork w/o corresponding Connections entry"))
}
// if flowID >0 and flowAction != Srt, make sure that various np data structures that use it for indexing exist
if flowID > 0 && connDesc.Action != Srt {
_, present0 := np.RequestRate[flowID]
_, present1 := np.AcceptedRate[flowID]
if !(present0 && present1) {
panic(fmt.Errorf("flowID>0 presented to EnterNetwork without supporting data structures"))
}
}
// is the message about a discrete packet, or a flow?
isPckt := (connDesc.Type == DiscreteConn)
// find the route, which needs the endpoint IDs
srcID := TopoDevByName[srcDev].DevID()
dstID := TopoDevByName[dstDev].DevID()
route := findRoute(srcID, dstID)
// make sure we have a route to use
if route == nil || len(*route) == 0 {
panic(fmt.Errorf("unable to find a route %s -> %s", srcDev, dstDev))
}
// take the frame size to be the minimum of the message length
// and the minimum MTU on interfaces between source and destination
frameSize := FindFrameSize(flowID, route)
if isPckt && msgLen < frameSize {
frameSize = msgLen
}
// number of frames for a discrete connection may depend on the message length,
// all other connections have just one frame reporting the change
numFrames := 1
if connDesc.Type == DiscreteConn {
numFrames = msgLen / frameSize
if msgLen%frameSize > 0 {
numFrames += 1
}
}
// A packet entry has flowID == 0
if flowID > 0 {
np.RequestRate[flowID] = requestRate
}
if !(connectID > 0) {
// tell the portal about the arrival, passing to it a description of the
// response to be made, and the number of frames of the same message that
// need to be received before reporting completion
connectID = np.Arrive(rtns, numFrames)
// remember the flowIDs, given the connectionID
if flowID > 0 {
np.Connections[connectID] = flowID
np.InvConnection[flowID] = connectID
}
}
// Flows and packets are handled differently
if connDesc.Type == FlowConn {
accepted := np.FlowEntry(evtMgr, srcDev, dstDev, msgLen, connDesc,
flowID, classID, connectID, requestRate, route, msg)
if accepted {
return connectID, np.AcceptedRate[flowID], true
} else {
return connectID, 0.0, false
}
}
// get the interface through which the message passes to get to the network.
// remember that a route step names the srcIntrfcID as the interface used to get into a network,
// and the dstIntrfcID as the interface used to ingress the next device
intrfc := IntrfcByID[(*route)[0].srcIntrfcID]
// ordinary packet entry; make a message wrapper and push the message at the entry
// of the endpt's egress interface. Segment the message into frames and push them individually
delay := float64(0.0)
for fmNumber := 0; fmNumber < numFrames; fmNumber++ {
nm := new(NetworkMsg)
nm.StepIdx = 0
nm.Route = route
nm.Rate = 0.0
nm.PcktRate = math.MaxFloat64 / 4.0
nm.PrArrvl = 1.0
nm.StartTime = evtMgr.CurrentSeconds()
nm.MsgLen = frameSize
nm.ConnectID = connectID
nm.FlowID = flowID
nm.ClassID = classID
nm.ExecID = execID
nm.Connection = *connDesc
nm.PcktIdx = fmNumber
nm.NumPckts = numFrames
nm.Msg = msg
// schedule the message's next destination
np.SendNetMsg(evtMgr, nm, delay)
// Now long to get through the device to the interface?
// The delay above should probably measure CPU bandwidth to deliver to the interface,
// but it is a small number and another parameter to have to deal with, so we just use interface bandwidth
delay += (float64(frameSize*8) / 1e6) / intrfc.State.Bndwdth
delay += intrfc.State.Delay
}
return connectID, requestRate, true
}
// FlowEntry handles the entry of major flows to the network
func (np *NetworkPortal) FlowEntry(evtMgr *evtm.EventManager, srcDev, dstDev string, msgLen int,
connDesc *ConnDesc, flowID int, classID int, connectID int,
requestRate float64, route *[]intrfcsToDev, msg any) bool {
// set the network message and flow connection types
flowAction := connDesc.Action
// revise the requested rate for the major flow
np.RequestRate[flowID] = requestRate
// Setting up the Flow on Srt
if flowAction == Srt {
// include a new flow into the network infrastructure.
// return a structure whose entries are used to estimate latency when requested
np.LatencyConsts[flowID] = BuildFlow(flowID, classID, route)
}
// change the flow rate for the flowID and take note of all
// the major flows that were recomputed
chgFlowIDs, established := np.EstablishFlowRate(evtMgr, flowID, classID, requestRate, route, flowAction)
if !established {
return false
}
// create the network message to be introduced into the network.
//
nm := NetworkMsg{Route: route, Rate: np.AcceptedRate[flowID], PcktRate: math.MaxFloat64 / 4.0,
PrArrvl: 1.0, MsgLen: msgLen, Connection: *connDesc, ConnectID: connectID, FlowID: flowID,
Msg: msg, NumPckts: 1, StartTime: evtMgr.CurrentSeconds()}
// depending on the connLatency we post a message immediately,
// after an approximated delay, or through simulation
latency := np.ComputeFlowLatency(&nm)
np.SendNetMsg(evtMgr, &nm, 0.0)
// if this is End, remove the identified flow
if flowAction == End {
np.RmFlow(evtMgr, flowID, route, latency)
}
// for each changed flow report back the change and the acception rate, if requested
for flwID := range chgFlowIDs {
// probably not needed but cheap protection against changes in EstablishFlowRate
if flwID == flowID {
continue
}
np.ReportFlowChg(evtMgr, flwID, flowAction, latency)
}
return true
}
// ReportFlowChg visits the return record maps to see if the named flow
// asked to have changes reported, and if so does so as requested. The reports
// are schedule to occur 'latency' time in the future, when the effect of
// the triggered action is recognized at the triggering flow's receiving end.
func (np *NetworkPortal) ReportFlowChg(evtMgr *evtm.EventManager, flowID int,
action FlowAction, latency float64) {
acceptedRate := np.AcceptedRate[flowID]
rrec, present := np.ReportRtnSrc[flowID]
// a request for reporting back to the source is indicated by the presence
// of an entry in the ReportRtnSrc map
if present {
rfs := new(RprtRate)
rfs.FlowID = flowID
rfs.AcceptedRate = acceptedRate
rfs.Action = action
// schedule notice of the acception rate
evtMgr.Schedule(rrec.rtnCxt, rfs, rrec.rtnFunc, vrtime.SecondsToTime(latency))
}
rrec, present = np.ReportRtnDst[flowID]
// if requested (by placement of a record in np.ReportRtnDst)
// for a report to the destination
if present {
rfs := new(RprtRate)
rfs.FlowID = flowID
rfs.AcceptedRate = acceptedRate
rfs.Action = action
// schedule notice of the acception rate
evtMgr.Schedule(rrec.rtnCxt, rfs, rrec.rtnFunc, vrtime.SecondsToTime(latency))
}
}
// BuildFlow establishes data structures in the interfaces and networks crossed
// by the given route, with a flow having the given flowID.
// No rate information is passed or set, other than initialization
func BuildFlow(flowID int, classID int, route *[]intrfcsToDev) float64 {
// remember the performance coefficients for 'Place' latency, when requested
var latencyConsts float64
// for every stop on the route
for idx := 0; idx < len((*route)); idx++ {
// remember the step particulars, for later reference
rtStep := (*route)[idx]
// rtStep describes a path across a network.
// the srcIntrfcID is the egress interface on the device that holds
// that interface. rtStep.netID is the network it faces and
// devID is the device on the other side.
//
egressIntrfc := IntrfcByID[rtStep.srcIntrfcID]
egressIntrfc.AddFlow(flowID, classID, false)
// adjust coefficients for embedded packet latency calculation.
// Add the constant delay through the interface for every frame
latencyConsts += egressIntrfc.State.Delay
// if the interface connection is a cable include the interface latency,
// otherwise view the network step like an interface where
// queueing occurs
if egressIntrfc.Cable != nil {
latencyConsts += egressIntrfc.State.Latency
} else {
latencyConsts += egressIntrfc.Faces.NetState.Latency
}
// the device gets a Forward entry for this flowID only if the flow doesn't
// originate there
if idx > 0 {
// For idx > 0 we get the dstIntrfcID of (*route)[idx-1] for
// the ingress interface
ingressIntrfc := IntrfcByID[(*route)[idx-1].dstIntrfcID]
ingressIntrfc.AddFlow(flowID, classID, true)
latencyConsts += ingressIntrfc.State.Delay
dev := ingressIntrfc.Device
// a device's forward entry for a flow associates the interface which admits the flow
// with the interface that exits the flow.
// The information needed for such an entry comes from two route steps.
// With idx>0 and idx < len(*route)-1 we know that the destination of the idx-1 route step
// is the device ingress, and the source of the current route is the destination
if idx < len(*route)-1 {
ip := intrfcIDPair{prevID: ingressIntrfc.Number, nextID: (*route)[idx].srcIntrfcID}
// remember the connection from ingress to egress interface in the device (router or switch)
if dev.DevType() == RouterCode {
rtr := dev.(*routerDev)
rtr.addForward(flowID, ip)
} else if dev.DevType() == SwitchCode {
swtch := dev.(*switchDev)
swtch.addForward(flowID, ip)
}
}
}
// remember the connection from ingress to egress interface in the network
net := NetworkByID[rtStep.netID]
ifcpr := intrfcIDPair{prevID: rtStep.srcIntrfcID, nextID: rtStep.dstIntrfcID}
net.AddFlow(flowID, classID, ifcpr)
}
return latencyConsts
}
// RmFlow de-establishes data structures in the interfaces and networks crossed
// by the given route, with a flow having the given flowID
func (np *NetworkPortal) RmFlow(evtMgr *evtm.EventManager, rmflowID int,
route *[]intrfcsToDev, latency float64) {
var dev TopoDev
// clear the request rate in case of reference before this call completes
oldRate := np.RequestRate[rmflowID]
classID := np.Class[rmflowID]
np.RequestRate[rmflowID] = 0.0
// remove the flow from the data structures of the interfaces, devices, and networks
// along the route
for idx := 0; idx < len((*route)); idx++ {
rtStep := (*route)[idx]
var egressIntrfc *intrfcStruct
var ingressIntrfc *intrfcStruct
// all steps have an egress side.
// get the interface
egressIntrfc = IntrfcByID[rtStep.srcIntrfcID]
dev = egressIntrfc.Device
// remove the flow from the interface
egressIntrfc.RmFlow(rmflowID, classID, oldRate, false)
// adjust the network to the flow departure
net := NetworkByID[rtStep.netID]
ifcpr := intrfcIDPair{prevID: rtStep.srcIntrfcID, nextID: rtStep.dstIntrfcID}
net.RmFlow(rmflowID, ifcpr)
// the device got a Forward entry for this flowID only if the flow doesn't
// originate there
if idx > 0 {
ingressIntrfc = IntrfcByID[(*route)[idx-1].dstIntrfcID]
ingressIntrfc.RmFlow(rmflowID, classID, oldRate, true)
// remove the flow from the device's forward maps
if egressIntrfc.DevType == RouterCode {
rtr := dev.(*routerDev)
rtr.rmForward(rmflowID)
} else if egressIntrfc.DevType == SwitchCode {
swtch := dev.(*switchDev)
swtch.rmForward(rmflowID)
}
}
}
// report the change to src and dst if requested
np.ReportFlowChg(evtMgr, rmflowID, End, latency)
// clear up the maps with indices equal to the ID of the removed flow,
// and maps indexed by connectionID of the removed flow
np.ClearRmFlow(rmflowID)
}
// EstablishFlowRate is given a major flow ID, request rate, and a route,
// and then first figures out what the accepted rate can be given the current state
// of all the major flows (by calling DiscoverFlowRate). It follows up
// by calling SetFlowRate to establish that rate through the route for the named flow.
// Because of congestion, it may be that setting the rate may force recalculation of the
// rates for other major flows, and so SetFlowRate returns a map of flows to be
// revisited, and upper bounds on what their accept rates might be. This leads to
// a recursive call to EstabishFlowRate
func (np *NetworkPortal) EstablishFlowRate(evtMgr *evtm.EventManager, flowID int, classID int,
requestRate float64, route *[]intrfcsToDev, action FlowAction) (map[int]bool, bool) {
flowIDs := make(map[int]bool)
// start off with the asking rate
acceptRate := requestRate
// what rate can be sustained for this major flow?
if action == End {
acceptRate = 0.0
}
var found bool
acceptRate, found = np.DiscoverFlowRate(flowID, requestRate, route)
if !found {
empty := map[int]bool{}
return empty, false
}
// set the rate, and get back a list of ids of major flows whose rates should be recomputed
changes := np.SetFlowRate(evtMgr, flowID, classID, acceptRate, route, action)
// we'll keep track of all the flows calculated (or recalculated)
flowIDs[flowID] = true
// revisit every flow whose converged rate might be affected by the rate setting in flow flowID
for nxtID, nxtRate := range changes {
if nxtID == flowID {
continue
}
moreIDs, established := np.EstablishFlowRate(evtMgr, nxtID, np.Class[nxtID],
math.Min(nxtRate, np.RequestRate[nxtID]), route, action)
if !established {
empty := map[int]bool{}
return empty, false
}
flowIDs[nxtID] = true
for mID := range moreIDs {
flowIDs[mID] = true
}
}
return flowIDs, true
}
// DiscoverFlowRate is called after the infrastructure for new
// flow with ID flowID is set up, to determine what its rate will be
func (np *NetworkPortal) DiscoverFlowRate(flowID int,
requestRate float64, route *[]intrfcsToDev) (float64, bool) {
minRate := requestRate
// is the requestRate a hard ask (inelastic) or best effort
isElastic := np.Elastic[flowID]
// visit each step on the route
for idx := 0; idx < len((*route)); idx++ {
rtStep := (*route)[idx]
// flag indicating whether we need to analyze the ingress side of the route step.
// The egress side is always analyzed
doIngressSide := (idx > 0)
// ingress side first, then egress side
for sideIdx := 0; sideIdx < 2; sideIdx++ {
ingressSide := (sideIdx == 0)
// the analysis looks the same for the ingress and egress sides, so
// the same code block can be used for it. Skip a side that is not
// consistent with the route step
if ingressSide && !doIngressSide {
continue
}
// set up intrfc and depending on which interface side we're analyzing
var intrfc *intrfcStruct
var intrfcMap map[int]float64
if ingressSide {
// router steps describe interface pairs across a network,
// so our ingress interface ID is the destination interface ID
// of the previous routing step
intrfc = IntrfcByID[(*route)[idx-1].dstIntrfcID]
intrfcMap = intrfc.State.ToIngress
} else {
intrfc = IntrfcByID[(*route)[idx].srcIntrfcID]
intrfcMap = intrfc.State.ToEgress
}
// usedBndwdth will accumulate the rates of all existing flows, plus the reservation
usedBndwdth := intrfc.State.RsrvdFrac * intrfc.State.Bndwdth
// fixedBndwdth will accumulate the rates of all inelastic flows, plus the resevation
fixedBndwdth := intrfc.State.RsrvdFrac * intrfc.State.Bndwdth
for flwID, rate := range intrfcMap {
usedBndwdth += rate
if !np.Elastic[flwID] {
fixedBndwdth += rate
}
}
// freeBndwdth is what is freely available to any flow
freeBndwdth := intrfc.State.Bndwdth - usedBndwdth
// useableBndwdth is what is available to an inelastic flow
useableBndwdth := intrfc.State.Bndwdth - fixedBndwdth
// can a request for inelastic bandwidth be satisfied at all?
if !isElastic && useableBndwdth < requestRate {
// no
return 0.0, false
}
// can the request on the ingress (non network) side be immediately satisfied?
if ingressSide && minRate <= freeBndwdth {
// yes
continue
}
// an inelastic flow can just grab what it wants (and we'll figure out the
// squeeze later). On the egress side we will need to look at the network.
// For an elastic flow we may need to squeeze
if np.Elastic[flowID] {
toMap := []int{flowID}
for flwID := range intrfcMap {
// avoid having flowID in more than once
if flwID == flowID {
continue
}
if np.Elastic[flwID] {
toMap = append(toMap, flwID)
}
}
loadFracVec := ActivePortal.requestedLoadFracVec(toMap)
// elastic flowID can get its share of the freely available bandwidth
minRate = math.Min(minRate, loadFracVec[0]*freeBndwdth)
}
// when focused on the egress side consider the network faced by the interface
if !ingressSide {
net := intrfc.Faces
// get a pointer to the interface on the other side of the network
nxtIntrfc := IntrfcByID[rtStep.dstIntrfcID]
// netUsedBndwdth accumulates the bandwidth of all unique flows that
// leave the egress side or enter the other interface's ingress side
var netUsedBndwdth float64
// netFixedBndwdth accumulates the bandwidth of all unique flows that
// leave the egress side or enter the other interface's ingress side
var netFixedBndwdth float64
// create a list of unique flows that leave the egress side or enter the ingress side
// and gather up the netUsedBndwdth and netFixedBndwdth rates
netFlows := make(map[int]bool)
for flwID, rate := range intrfcMap {
_, present := netFlows[flwID]
if present {
continue
}
netFlows[flwID] = true
netUsedBndwdth += rate
if !np.Elastic[flwID] {
netFixedBndwdth += rate
}
}
// incorporate the flows on the ingress side
for flwID, rate := range nxtIntrfc.State.ToIngress {
_, present := netFlows[flwID]
// skip if already seen
if present {
continue
}
netUsedBndwdth += rate
if !np.Elastic[flwID] {
netFixedBndwdth += rate
}
}
// netFreeBndwdth is what is freely available to any flow
netFreeBndwdth := net.NetState.Bndwdth - netUsedBndwdth
// netUseableBndwdth is what is available to an inelastic flow
netUseableBndwdth := net.NetState.Bndwdth - netFixedBndwdth
if netFreeBndwdth <= 0 || netUseableBndwdth <= 0 {
return 0.0, false
}
// admit a flow if its request rate is less than the
// netUseableBndwdth
if requestRate <= netUseableBndwdth {
continue
} else if !isElastic {
return 0.0, false
}
// admit an elastic flow if all the elastic flows can be squeezed to let it in,
// but figure out what its squeezed value needs to be
toMap := []int{flowID}
for flwID := range netFlows {
if flwID == flowID {
continue
}
if np.Elastic[flwID] {
toMap = append(toMap, flwID)
}
}
loadFracVec := ActivePortal.requestedLoadFracVec(toMap)
// elastic flowID can get its share of the freely available bandwidth
minRate = math.Min(minRate, loadFracVec[0]*netFreeBndwdth)
}
}
}
return minRate, true
}
// SetFlowRate sets the accept rate for major flow flowID all along its path,
// and notes the identities of major flows which need attention because this change
// may impact them or other flows they interact with
func (np *NetworkPortal) SetFlowRate(evtMgr *evtm.EventManager, flowID int, classID int, acceptRate float64,
route *[]intrfcsToDev, action FlowAction) map[int]float64 {
// this is for keeps (for now...)
np.AcceptedRate[flowID] = acceptRate
isElastic := np.Elastic[flowID]
// remember the ID of the major flows whose accepted rates may change
changes := make(map[int]float64)
// visit each step on the route
for idx := 0; idx < len((*route)); idx++ {
// remember the step particulars
rtStep := (*route)[idx]
// ifcpr may be needed to index into a map later
ifcpr := intrfcIDPair{prevID: rtStep.srcIntrfcID, nextID: rtStep.dstIntrfcID}
// flag indicating whether we need to analyze the ingress side of the route step.
// The egress side is always analyzed
doIngressSide := (idx > 0)
// ingress side first, then egress side
for sideIdx := 0; sideIdx < 2; sideIdx++ {
ingressSide := (sideIdx == 0)
// the analysis looks the same for the ingress and egress sides, so
// the same code block can be used for it. Skip a side that is not
// consistent with the route step
if ingressSide && !doIngressSide {
continue
}
// set up intrfc and intrfcMap depending on which interface side we're analyzing
var intrfc *intrfcStruct
var intrfcMap map[int]float64
if ingressSide {
// router steps describe interface pairs across a network,
// so our ingress interface ID is the destination interface ID
// of the previous routing step
intrfc = IntrfcByID[(*route)[idx-1].dstIntrfcID]
intrfcMap = intrfc.State.ToIngress
} else {
intrfc = IntrfcByID[(*route)[idx].srcIntrfcID]
intrfcMap = intrfc.State.ToEgress
}
// if the accept rate hasn't changed coming into this interface,
// we can skip it
if math.Abs(acceptRate-intrfcMap[flowID]) < 1e-3 {
continue
}
fixedBndwdth := intrfc.State.RsrvdFrac * intrfc.State.Bndwdth
for flwID, rate := range intrfcMap {
if !np.Elastic[flwID] {
fixedBndwdth += rate
}
}
// if the interface wasn't compressing elastic flows before
// or after the change, its peers aren't needing attention due to this interface
wasCongested := intrfc.IsCongested(ingressSide)
intrfc.ChgFlowRate(flowID, classID, acceptRate, ingressSide)
isCongested := intrfc.IsCongested(ingressSide)
if wasCongested || isCongested {
toMap := []int{}
if isElastic {
toMap = []int{flowID}
}
for flwID := range intrfcMap {
// avoid having flowID in more than once
if flwID == flowID {
continue
}
if np.Elastic[flwID] {
toMap = append(toMap, flwID)
}
}
var rsrvdFracVec []float64
if len(toMap) > 0 {
rsrvdFracVec = np.requestedLoadFracVec(toMap)
}
for idx, flwID := range toMap {
if flwID == flowID {
continue
}
rsvdRate := rsrvdFracVec[idx] * (intrfc.State.Bndwdth - fixedBndwdth)
// remember the least bandwidth upper bound for major flow flwID
chgRate, present := changes[flwID]
if present {
chgRate = math.Min(chgRate, rsvdRate)
changes[flwID] = chgRate
} else {
changes[flwID] = rsvdRate
}
}
}
// for the egress side consider the network
if !ingressSide {
net := intrfc.Faces
dstIntrfc := IntrfcByID[rtStep.dstIntrfcID]
wasCongested := net.IsCongested(intrfc, dstIntrfc)
net.ChgFlowRate(flowID, ifcpr, acceptRate)
isCongested := net.IsCongested(intrfc, dstIntrfc)
if wasCongested || isCongested {
toMap := []int{}
if isElastic {
toMap = []int{flowID}
}
for flwID := range intrfc.State.ThruEgress {
if flwID == flowID {
continue
}
if np.Elastic[flwID] {
toMap = append(toMap, flwID)
}
}
for flwID := range dstIntrfc.State.ToIngress {
if slices.Contains(toMap, flwID) {
continue
}
if np.Elastic[flwID] {
toMap = append(toMap, flwID)
}
}
var rsrvdFracVec []float64
if len(toMap) > 0 {
rsrvdFracVec = np.requestedLoadFracVec(toMap)
}
for idx, flwID := range toMap {
if flwID == flowID {
continue
}
rsvdRate := rsrvdFracVec[idx] * net.NetState.Bndwdth
chgRate, present := changes[flwID]
if present {
chgRate = math.Min(chgRate, rsvdRate)
changes[flwID] = chgRate
} else {
changes[flwID] = rsvdRate
}
}
}
}
}
}
return changes
}
// SendNetMsg moves a NetworkMsg, depending on the latency model.
// If 'Zero' the message goes to the destination instantly, with zero network latency modeled
// If 'Place' the message is placed at the destinatin after computing a delay timing through the network
// If 'Simulate' the message is placed at the egress port of the sending device and the message is simulated
// going through the network to its destination
func (np *NetworkPortal) SendNetMsg(evtMgr *evtm.EventManager, nm *NetworkMsg, offset float64) {
// remember the latency model, and the route
connLatency := nm.Connection.Latency
route := nm.Route
switch connLatency {
case Zero:
// the message's position in the route list---the last step
nm.StepIdx = len(*route) - 1
np.SendImmediate(evtMgr, nm)
case Place:
// the message's position in the route list---the last step
nm.StepIdx = len(*route) - 1
np.PlaceNetMsg(evtMgr, nm, offset)
case Simulate:
// get the interface at the first step
intrfc := IntrfcByID[(*route)[0].srcIntrfcID]
// schedule exit from first interface after msg passes through
evtMgr.Schedule(intrfc, *nm, enterEgressIntrfc, vrtime.SecondsToTime(offset))
}
}
// SendImmediate schedules the message with zero latency
func (np *NetworkPortal) SendImmediate(evtMgr *evtm.EventManager, nm *NetworkMsg) {
// schedule exit from final interface after msg passes through
ingressIntrfcID := (*nm.Route)[len(*nm.Route)-1].dstIntrfcID
ingressIntrfc := IntrfcByID[ingressIntrfcID]
evtMgr.Schedule(ingressIntrfc, *nm, arriveIngressIntrfc, vrtime.SecondsToTime(0.0))
}
// PlaceNetMsg schedules the receipt of the message some deterministic time in the future,
// without going through the details of the intervening network structure
func (np *NetworkPortal) PlaceNetMsg(evtMgr *evtm.EventManager, nm *NetworkMsg, offset float64) {
// get the ingress interface at the end of the route
ingressIntrfcID := (*nm.Route)[len(*nm.Route)-1].dstIntrfcID
ingressIntrfc := IntrfcByID[ingressIntrfcID]
// compute the time through the network if simulated _now_ (and with no packets ahead in queue)
latency := np.ComputeFlowLatency(nm)
// mark the message to indicate arrival at the destination
nm.StepIdx = len((*nm.Route)) - 1
// schedule exit from final interface after msg passes through
evtMgr.Schedule(ingressIntrfc, *nm, arriveIngressIntrfc, vrtime.SecondsToTime(latency+offset))
}
// ComputeFlowLatency approximates the latency from source to destination if compute now,
// with the state of the network frozen and no packets queued up
func (np *NetworkPortal) ComputeFlowLatency(nm *NetworkMsg) float64 {
latencyType := nm.Connection.Latency
if latencyType == Zero {
return 0.0
}
// the latency type will be 'Place' if we reach here,
flowID := nm.FlowID
classID := nm.ClassID
route := nm.Route
frameSize := 1500
if nm.MsgLen < frameSize {
frameSize = nm.MsgLen
}
msgLen := float64(frameSize*8) / 1e+6
// initialize latency with all the constants on the path
latency := np.LatencyConsts[flowID]
for idx := 0; idx < len((*route)); idx++ {
rtStep := (*route)[idx]
srcIntrfc := IntrfcByID[rtStep.srcIntrfcID]
cg := srcIntrfc.State.priQueue.getClassQueue(classID, true)
latency += (cg.waiting + msgLen/srcIntrfc.State.Bndwdth)
dstIntrfc := IntrfcByID[rtStep.dstIntrfcID]
cg = dstIntrfc.State.priQueue.getClassQueue(classID, false)
latency += (cg.waiting + msgLen/dstIntrfc.State.Bndwdth)
net := srcIntrfc.Faces
latency += net.NetLatency(nm)
}
return latency
}
func EstMM1Latency(bitRate, rho float64, msgLen int) float64 {
// mean time in system for M/M/1 queue is
// 1/(mu - lambda)
// in units of pckts/sec.
// Now
//
// bitRate/(msgLen*8) = lambda
//
// and rho = lambda/mu
//
// so mu = lambda/rho
// and (mu-lambda) = lambda*(1.0/rho - 1.0)
// and mean time in system is
//
// 1.0/(lambda*(1/rho - 1.0))
//
if math.Abs(1.0-rho) < 1e-3 {
// force rho to be 95%
rho = 0.95
}
lambda := bitRate / float64(msgLen)
denom := lambda * (1.0/rho - 1.0)
return 1.0 / denom
}
// EstMD1Latency estimates the delay through an M/D/1 queue
func EstMD1Latency(rho float64, msgLen int, bndwdth float64) float64 {
// mean time in waiting for service in M/D/1 queue is
// 1/mu + rho/(2*mu*(1-rho))
//