Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): remove priority queue from sliding window #137

Merged
merged 1 commit into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 0 additions & 69 deletions flow/queue.go

This file was deleted.

56 changes: 0 additions & 56 deletions flow/queue_test.go

This file was deleted.

87 changes: 50 additions & 37 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package flow

import (
"container/heap"
"sort"
"sync"
"time"

"github.com/reugn/go-streams"
)

// timedElement stores an incoming element along with its timestamp.
type timedElement[T any] struct {
element T
timestamp int64
}

// SlidingWindow assigns elements to windows of fixed length configured by the window
// size parameter.
// An additional window slide parameter controls how frequently a sliding window is started.
Expand All @@ -18,7 +24,7 @@ type SlidingWindow[T any] struct {
sync.Mutex
windowSize time.Duration
slidingInterval time.Duration
queue *PriorityQueue
queue []timedElement[T]
in chan any
out chan any
done chan struct{}
Expand Down Expand Up @@ -65,7 +71,7 @@ func NewSlidingWindowWithExtractor[T any](
slidingWindow := &SlidingWindow[T]{
windowSize: windowSize,
slidingInterval: slidingInterval,
queue: &PriorityQueue{},
queue: make([]timedElement[T], 0),
in: make(chan any),
out: make(chan any),
done: make(chan struct{}),
Expand Down Expand Up @@ -108,21 +114,24 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) {
}

// timestamp extracts the timestamp from an element if the timestampExtractor is set.
// It returns system clock time otherwise.
// Otherwise, the system time is returned.
func (sw *SlidingWindow[T]) timestamp(element T) int64 {
if sw.timestampExtractor == nil {
return time.Now().UnixNano()
}
return sw.timestampExtractor(element)
}

// receive buffers the incoming elements by pushing them into a priority queue,
// ordered by their creation time.
// receive buffers the incoming elements by pushing them into the queue,
// wrapping the original item into a timedElement along with its timestamp.
func (sw *SlidingWindow[T]) receive() {
for element := range sw.in {
item := &Item{element, sw.timestamp(element.(T)), 0}
sw.Lock()
heap.Push(sw.queue, item)
timed := timedElement[T]{
element: element.(T),
timestamp: sw.timestamp(element.(T)),
}
sw.queue = append(sw.queue, timed)
sw.Unlock()
}
close(sw.done)
Expand Down Expand Up @@ -150,47 +159,51 @@ func (sw *SlidingWindow[T]) emit() {
}
}

// dispatchWindow creates a new window and slides the elements queue.
// It sends the slice of elements to the output channel if the window is not empty.
// dispatchWindow is responsible for sending the elements in the current
// window to the output channel and moving the window to the next position.
func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
sw.Lock()
// build a window of elements
var windowBottomIndex int
now := tick.UnixNano()
windowUpperIndex := sw.queue.Len()
slideUpperIndex := windowUpperIndex
slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds()
windowBottomTime := now - sw.windowSize.Nanoseconds()
for i, item := range *sw.queue {
if item.epoch < windowBottomTime {
windowBottomIndex = i
}
if item.epoch > slideUpperTime {
slideUpperIndex = i

// sort elements in the queue by their timestamp
sort.Slice(sw.queue, func(i, j int) bool {
return sw.queue[i].timestamp < sw.queue[j].timestamp
})

// calculate the next window start time
nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval).UnixNano()
// initialize the next window queue
var nextWindowQueue []timedElement[T]
for i, element := range sw.queue {
if element.timestamp > nextWindowStartTime {
nextWindowQueue = make([]timedElement[T], len(sw.queue)-i)
_ = copy(nextWindowQueue, sw.queue[i:])
break
}
}
windowElements := extractWindowElements[T](sw.queue.Slice(windowBottomIndex, windowUpperIndex))
if windowUpperIndex > 0 { // the queue is not empty
// slice the queue using the lower and upper bounds
sliced := sw.queue.Slice(slideUpperIndex, windowUpperIndex)
// reset the queue
sw.queue = &sliced
heap.Init(sw.queue)
}

// extract current window elements
windowElements := extractWindowElements(sw.queue, tick.UnixNano())
// move the window
sw.queue = nextWindowQueue

sw.Unlock()

// send elements if the window is not empty
// send elements downstream if the current window is not empty
if len(windowElements) > 0 {
sw.out <- windowElements
}
}

// extractWindowElements generates a window of elements from a given slice of queue items.
func extractWindowElements[T any](items []*Item) []T {
elements := make([]T, len(items))
for i, item := range items {
elements[i] = item.Msg.(T)
// extractWindowElements extracts current window elements from the given slice
// of timedElement. Elements newer than now will not be included.
func extractWindowElements[T any](timed []timedElement[T], now int64) []T {
elements := make([]T, 0, len(timed))
for _, timedElement := range timed {
if timedElement.timestamp < now {
elements = append(elements, timedElement.element)
} else {
break // we can break since the input is an ordered slice
}
}
return elements
}
21 changes: 13 additions & 8 deletions flow/sliding_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,21 @@ func TestSlidingWindowWithExtractor(t *testing.T) {

now := time.Now()
inputValues := []element{
{"c", now.Add(29 * time.Millisecond).UnixNano()},
{"a", now.Add(2 * time.Millisecond).UnixNano()},
{"b", now.Add(17 * time.Millisecond).UnixNano()},
{"c", now.Add(29 * time.Millisecond).UnixNano()},
{"d", now.Add(35 * time.Millisecond).UnixNano()},
{"e", now.Add(77 * time.Millisecond).UnixNano()},
{"f", now.Add(93 * time.Millisecond).UnixNano()},
{"e", now.Add(77 * time.Millisecond).UnixNano()},
{"g", now.Add(120 * time.Millisecond).UnixNano()},
}
go ingestSlice(inputValues, in)
go closeDeferred(in, 250*time.Millisecond)
// send some out-of-order events
go ingestDeferred(element{"h", now.Add(5 * time.Millisecond).UnixNano()},
in, 145*time.Millisecond)
go ingestDeferred(element{"i", now.Add(3 * time.Millisecond).UnixNano()},
in, 145*time.Millisecond)

go func() {
source.
Expand All @@ -93,14 +98,14 @@ func TestSlidingWindowWithExtractor(t *testing.T) {
}
fmt.Println(outputValues)

assert.Equal(t, 6, len(outputValues)) // [[a b c d e f g] [c d e f g] [e f g] [e f g] [f g] [g]]
assert.Equal(t, 6, len(outputValues)) // [[a b c d] [c d] [e] [e f] [f g] [i h g]]

assert.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g"}, outputValues[0])
assert.Equal(t, []string{"c", "d", "e", "f", "g"}, outputValues[1])
assert.Equal(t, []string{"e", "f", "g"}, outputValues[2])
assert.Equal(t, []string{"e", "f", "g"}, outputValues[3])
assert.Equal(t, []string{"a", "b", "c", "d"}, outputValues[0])
assert.Equal(t, []string{"c", "d"}, outputValues[1])
assert.Equal(t, []string{"e"}, outputValues[2])
assert.Equal(t, []string{"e", "f"}, outputValues[3])
assert.Equal(t, []string{"f", "g"}, outputValues[4])
assert.Equal(t, []string{"g"}, outputValues[5])
assert.Equal(t, []string{"i", "h", "g"}, outputValues[5])
}

func stringValues(elements []element) []string {
Expand Down
Loading