Skip to content

Commit

Permalink
Fixes bug in group accumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
sterlingdeng committed Nov 24, 2024
1 parent fbbe759 commit 75f2dab
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 16 deletions.
9 changes: 8 additions & 1 deletion pkg/gcc/arrival_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@ type arrivalGroup struct {
arrival time.Time
}

func newArrivalGroup(a cc.Acknowledgment) arrivalGroup {
return arrivalGroup{
packets: []cc.Acknowledgment{a},
departure: a.Departure,
arrival: a.Arrival,
}
}

func (g *arrivalGroup) add(a cc.Acknowledgment) {
g.packets = append(g.packets, a)
g.arrival = a.Arrival
g.departure = a.Departure
}

func (g arrivalGroup) String() string {
Expand Down
24 changes: 14 additions & 10 deletions pkg/gcc/arrival_group_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter fu
for acks := range in {
for _, next := range acks {
if !init {
group.add(next)
group = newArrivalGroup(next)
init = true
continue
}
Expand All @@ -38,36 +38,40 @@ func (a *arrivalGroupAccumulator) run(in <-chan []cc.Acknowledgment, agWriter fu
continue
}
if next.Departure.After(group.departure) {
// A sequence of packets which are sent within a burst_time interval
// constitute a group.
if interDepartureTimePkt(group, next) <= a.interDepartureThreshold {
group.add(next)
continue
}

// A Packet which has an inter-arrival time less than burst_time and
// an inter-group delay variation d(i) less than 0 is considered
// being part of the current group of packets.
if interArrivalTimePkt(group, next) <= a.interArrivalThreshold &&
interGroupDelayVariationPkt(group, next) < a.interGroupDelayVariationTreshold {
group.add(next)
continue
}

agWriter(group)
group = arrivalGroup{}
group.add(next)
group = newArrivalGroup(next)
}
}
}
}

func interArrivalTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
return b.Arrival.Sub(a.arrival)
func interArrivalTimePkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration {
return ack.Arrival.Sub(group.arrival)
}

func interDepartureTimePkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
if len(a.packets) == 0 {
func interDepartureTimePkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration {
if len(group.packets) == 0 {
return 0
}
return b.Departure.Sub(a.packets[len(a.packets)-1].Departure)
return ack.Departure.Sub(group.departure)
}

func interGroupDelayVariationPkt(a arrivalGroup, b cc.Acknowledgment) time.Duration {
return b.Arrival.Sub(a.arrival) - b.Departure.Sub(a.departure)
func interGroupDelayVariationPkt(group arrivalGroup, ack cc.Acknowledgment) time.Duration {
return ack.Arrival.Sub(group.arrival) - ack.Departure.Sub(group.departure)
}
4 changes: 2 additions & 2 deletions pkg/gcc/arrival_group_accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestArrivalGroupAccumulator(t *testing.T) {
},
},
arrival: time.Time{}.Add(20 * time.Millisecond),
departure: time.Time{}.Add(3 * time.Millisecond),
departure: time.Time{},
}},
},
{
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestArrivalGroupAccumulator(t *testing.T) {
},
},
arrival: time.Time{}.Add(20 * time.Millisecond),
departure: time.Time{}.Add(3 * time.Millisecond),
departure: time.Time{}.Add(0 * time.Millisecond),
},
{
packets: []cc.Acknowledgment{
Expand Down
49 changes: 46 additions & 3 deletions pkg/gcc/arrival_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,46 @@ func TestArrivalGroup(t *testing.T) {
Arrival: time.Time{}.Add(time.Second),
}},
arrival: time.Time{}.Add(time.Second),
departure: time.Time{}.Add(time.Second),
departure: time.Time{},
},
},
{
name: "departure time of group is the departure time of the first packet in the group",
acks: []cc.Acknowledgment{{
SequenceNumber: 0,
Size: 0,
Departure: time.Time{}.Add(27 * time.Millisecond),
Arrival: time.Time{},
}, {
SequenceNumber: 1,
Size: 1,
Departure: time.Time{}.Add(32 * time.Millisecond),
Arrival: time.Time{}.Add(37 * time.Millisecond),
}, {
SequenceNumber: 2,
Size: 2,
Departure: time.Time{}.Add(50 * time.Millisecond),
Arrival: time.Time{}.Add(56 * time.Millisecond),
}},
expected: arrivalGroup{
packets: []cc.Acknowledgment{{
SequenceNumber: 0,
Size: 0,
Departure: time.Time{}.Add(27 * time.Millisecond),
Arrival: time.Time{},
}, {
SequenceNumber: 1,
Size: 1,
Departure: time.Time{}.Add(32 * time.Millisecond),
Arrival: time.Time{}.Add(37 * time.Millisecond),
}, {
SequenceNumber: 2,
Size: 2,
Departure: time.Time{}.Add(50 * time.Millisecond),
Arrival: time.Time{}.Add(56 * time.Millisecond),
}},
arrival: time.Time{}.Add(56 * time.Millisecond),
departure: time.Time{}.Add(27 * time.Millisecond),
},
},
}
Expand All @@ -80,8 +119,12 @@ func TestArrivalGroup(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ag := arrivalGroup{}
for _, ack := range tc.acks {
ag.add(ack)
for i, ack := range tc.acks {
if i == 0 {
ag = newArrivalGroup(ack)
} else {
ag.add(ack)
}
}
assert.Equal(t, tc.expected, ag)
})
Expand Down

0 comments on commit 75f2dab

Please sign in to comment.