Skip to content

Commit

Permalink
fix(flow): remove priority queue from sliding window
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Jul 29, 2024
1 parent b920e3e commit 5584310
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 170 deletions.
69 changes: 0 additions & 69 deletions flow/queue.go

This file was deleted.

56 changes: 0 additions & 56 deletions flow/queue_test.go

This file was deleted.

87 changes: 50 additions & 37 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package flow

import (
"container/heap"
"sort"
"sync"
"time"

"github.com/reugn/go-streams"
)

// timedElement stores an incoming element along with its timestamp.
type timedElement[T any] struct {
element T
timestamp int64
}

// SlidingWindow assigns elements to windows of fixed length configured by the window
// size parameter.
// An additional window slide parameter controls how frequently a sliding window is started.
Expand All @@ -18,7 +24,7 @@ type SlidingWindow[T any] struct {
sync.Mutex
windowSize time.Duration
slidingInterval time.Duration
queue *PriorityQueue
queue []timedElement[T]
in chan any
out chan any
done chan struct{}
Expand Down Expand Up @@ -65,7 +71,7 @@ func NewSlidingWindowWithExtractor[T any](
slidingWindow := &SlidingWindow[T]{
windowSize: windowSize,
slidingInterval: slidingInterval,
queue: &PriorityQueue{},
queue: make([]timedElement[T], 0),
in: make(chan any),
out: make(chan any),
done: make(chan struct{}),
Expand Down Expand Up @@ -108,21 +114,24 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) {
}

// timestamp extracts the timestamp from an element if the timestampExtractor is set.
// It returns system clock time otherwise.
// Otherwise, the system time is returned.
func (sw *SlidingWindow[T]) timestamp(element T) int64 {
if sw.timestampExtractor == nil {
return time.Now().UnixNano()
}
return sw.timestampExtractor(element)
}

// receive buffers the incoming elements by pushing them into a priority queue,
// ordered by their creation time.
// receive buffers the incoming elements by pushing them into the queue,
// wrapping the original item into a timedElement along with its timestamp.
func (sw *SlidingWindow[T]) receive() {
for element := range sw.in {
item := &Item{element, sw.timestamp(element.(T)), 0}
sw.Lock()
heap.Push(sw.queue, item)
timed := timedElement[T]{
element: element.(T),
timestamp: sw.timestamp(element.(T)),
}
sw.queue = append(sw.queue, timed)
sw.Unlock()
}
close(sw.done)
Expand Down Expand Up @@ -150,47 +159,51 @@ func (sw *SlidingWindow[T]) emit() {
}
}

// dispatchWindow creates a new window and slides the elements queue.
// It sends the slice of elements to the output channel if the window is not empty.
// dispatchWindow is responsible for sending the elements in the current
// window to the output channel and moving the window to the next position.
func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
sw.Lock()
// build a window of elements
var windowBottomIndex int
now := tick.UnixNano()
windowUpperIndex := sw.queue.Len()
slideUpperIndex := windowUpperIndex
slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds()
windowBottomTime := now - sw.windowSize.Nanoseconds()
for i, item := range *sw.queue {
if item.epoch < windowBottomTime {
windowBottomIndex = i
}
if item.epoch > slideUpperTime {
slideUpperIndex = i

// sort elements in the queue by their timestamp
sort.Slice(sw.queue, func(i, j int) bool {
return sw.queue[i].timestamp < sw.queue[j].timestamp
})

// calculate the next window start time
nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval).UnixNano()
// initialize the next window queue
var nextWindowQueue []timedElement[T]
for i, element := range sw.queue {
if element.timestamp > nextWindowStartTime {
nextWindowQueue = make([]timedElement[T], len(sw.queue)-i)
_ = copy(nextWindowQueue, sw.queue[i:])
break
}
}
windowElements := extractWindowElements[T](sw.queue.Slice(windowBottomIndex, windowUpperIndex))
if windowUpperIndex > 0 { // the queue is not empty
// slice the queue using the lower and upper bounds
sliced := sw.queue.Slice(slideUpperIndex, windowUpperIndex)
// reset the queue
sw.queue = &sliced
heap.Init(sw.queue)
}

// extract current window elements
windowElements := extractWindowElements(sw.queue, tick.UnixNano())
// move the window
sw.queue = nextWindowQueue

sw.Unlock()

// send elements if the window is not empty
// send elements downstream if the current window is not empty
if len(windowElements) > 0 {
sw.out <- windowElements
}
}

// extractWindowElements generates a window of elements from a given slice of queue items.
func extractWindowElements[T any](items []*Item) []T {
elements := make([]T, len(items))
for i, item := range items {
elements[i] = item.Msg.(T)
// extractWindowElements extracts current window elements from the given slice
// of timedElement. Elements newer than now will not be included.
func extractWindowElements[T any](timed []timedElement[T], now int64) []T {
elements := make([]T, 0, len(timed))
for _, timedElement := range timed {
if timedElement.timestamp < now {
elements = append(elements, timedElement.element)
} else {
break // we can break since the input is an ordered slice
}
}
return elements
}
21 changes: 13 additions & 8 deletions flow/sliding_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,21 @@ func TestSlidingWindowWithExtractor(t *testing.T) {

now := time.Now()
inputValues := []element{
{"c", now.Add(29 * time.Millisecond).UnixNano()},
{"a", now.Add(2 * time.Millisecond).UnixNano()},
{"b", now.Add(17 * time.Millisecond).UnixNano()},
{"c", now.Add(29 * time.Millisecond).UnixNano()},
{"d", now.Add(35 * time.Millisecond).UnixNano()},
{"e", now.Add(77 * time.Millisecond).UnixNano()},
{"f", now.Add(93 * time.Millisecond).UnixNano()},
{"e", now.Add(77 * time.Millisecond).UnixNano()},
{"g", now.Add(120 * time.Millisecond).UnixNano()},
}
go ingestSlice(inputValues, in)
go closeDeferred(in, 250*time.Millisecond)
// send some out-of-order events
go ingestDeferred(element{"h", now.Add(5 * time.Millisecond).UnixNano()},
in, 145*time.Millisecond)
go ingestDeferred(element{"i", now.Add(3 * time.Millisecond).UnixNano()},
in, 145*time.Millisecond)

go func() {
source.
Expand All @@ -93,14 +98,14 @@ func TestSlidingWindowWithExtractor(t *testing.T) {
}
fmt.Println(outputValues)

assert.Equal(t, 6, len(outputValues)) // [[a b c d e f g] [c d e f g] [e f g] [e f g] [f g] [g]]
assert.Equal(t, 6, len(outputValues)) // [[a b c d] [c d] [e] [e f] [f g] [i h g]]

assert.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g"}, outputValues[0])
assert.Equal(t, []string{"c", "d", "e", "f", "g"}, outputValues[1])
assert.Equal(t, []string{"e", "f", "g"}, outputValues[2])
assert.Equal(t, []string{"e", "f", "g"}, outputValues[3])
assert.Equal(t, []string{"a", "b", "c", "d"}, outputValues[0])
assert.Equal(t, []string{"c", "d"}, outputValues[1])
assert.Equal(t, []string{"e"}, outputValues[2])
assert.Equal(t, []string{"e", "f"}, outputValues[3])
assert.Equal(t, []string{"f", "g"}, outputValues[4])
assert.Equal(t, []string{"g"}, outputValues[5])
assert.Equal(t, []string{"i", "h", "g"}, outputValues[5])
}

func stringValues(elements []element) []string {
Expand Down

0 comments on commit 5584310

Please sign in to comment.