Skip to content

Commit

Permalink
fix: session window inactivity timer reset mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Jan 11, 2024
1 parent b09bd83 commit 7cbfb5d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 6 deletions.
29 changes: 23 additions & 6 deletions flow/session_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
type SessionWindow struct {
sync.Mutex
inactivityGap time.Duration
timer *time.Timer
in chan interface{}
out chan interface{}
reset chan struct{}
done chan struct{}
buffer []interface{}
}
Expand All @@ -28,9 +28,9 @@ var _ streams.Flow = (*SessionWindow)(nil)
func NewSessionWindow(inactivityGap time.Duration) *SessionWindow {
sessionWindow := &SessionWindow{
inactivityGap: inactivityGap,
timer: time.NewTimer(inactivityGap),
in: make(chan interface{}),
out: make(chan interface{}),
reset: make(chan struct{}),
done: make(chan struct{}),
}
go sessionWindow.emit()
Expand Down Expand Up @@ -74,24 +74,41 @@ func (sw *SessionWindow) receive() {
for element := range sw.in {
sw.Lock()
sw.buffer = append(sw.buffer, element)
sw.timer.Reset(sw.inactivityGap) // reset the inactivity timer
sw.Unlock()
sw.notifyTimerReset() // signal to reset the inactivity timer
}
close(sw.done)
}

// notifyTimerReset sends a notification to reset the inactivity timer.
func (sw *SessionWindow) notifyTimerReset() {
select {
case sw.reset <- struct{}{}:
default:
}
}

// emit captures and emits a session window based on the gap of inactivity.
// When this period expires, the current session closes and subsequent elements
// are assigned to a new session window.
func (sw *SessionWindow) emit() {
defer sw.timer.Stop()

timer := time.NewTimer(sw.inactivityGap)
for {
select {
case <-sw.timer.C:
case <-timer.C:
sw.dispatchWindow()

case <-sw.reset:
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(sw.inactivityGap)

case <-sw.done:
timer.Stop()
sw.dispatchWindow()
close(sw.out)
return
Expand Down
35 changes: 35 additions & 0 deletions flow/session_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,38 @@ func TestSessionWindow(t *testing.T) {
assertEquals(t, []interface{}{"d"}, outputValues[1])
assertEquals(t, []interface{}{"e"}, outputValues[2])
}

func TestLongSessionWindow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})

source := ext.NewChanSource(in)
sessionWindow := flow.NewSessionWindow(20 * time.Millisecond)
sink := ext.NewChanSink(out)

inputValues := []string{"a", "b", "c", "d", "e", "f", "g"}
go func() {
for _, e := range inputValues {
ingestDeferred(e, in, 10*time.Millisecond)
}
}()
go ingestDeferred("h", in, 140*time.Millisecond)
go closeDeferred(in, 150*time.Millisecond)

go func() {
source.
Via(sessionWindow).
To(sink)
}()

var outputValues [][]interface{}
for e := range sink.Out {
outputValues = append(outputValues, e.([]interface{}))
}
fmt.Println(outputValues)

assertEquals(t, 2, len(outputValues)) // [[a b c d e f g] [h]]

assertEquals(t, []interface{}{"a", "b", "c", "d", "e", "f", "g"}, outputValues[0])
assertEquals(t, []interface{}{"h"}, outputValues[1])
}

0 comments on commit 7cbfb5d

Please sign in to comment.