Skip to content

Commit

Permalink
chore: window function to be a proper window function, stateIter to r…
Browse files Browse the repository at this point in the history
…eturn always the current state as the first element
  • Loading branch information
jvmakine committed Jan 10, 2025
1 parent bbf70b9 commit e8b4edc
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 48 deletions.
2 changes: 1 addition & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
}
logger.Tracef("Seeded %d deployments", initialCount)

for notification := range iterops.Changes(stateIter, view, state.EventExtractor) {
for notification := range iterops.Changes(stateIter, state.EventExtractor) {
switch event := notification.(type) {
case *state.DeploymentCreatedEvent:
err := sendChange(&ftlv1.PullSchemaResponse{ //nolint:forcetypeassert
Expand Down
11 changes: 8 additions & 3 deletions backend/controller/state/eventextractor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package state

import "github.com/alecthomas/types/tuple"
import (
"iter"

"github.com/alecthomas/types/tuple"
"github.com/block/ftl/internal/iterops"
)

// EventExtractor calculates controller events from changes to the state.
func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) []SchemaEvent {
func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) iter.Seq[SchemaEvent] {
var events []SchemaEvent

previous := diff.A
Expand Down Expand Up @@ -43,5 +48,5 @@ func EventExtractor(diff tuple.Pair[SchemaState, SchemaState]) []SchemaEvent {
})
}
}
return events
return iterops.Const(events...)
}
3 changes: 2 additions & 1 deletion backend/controller/state/eventextractor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package state

import (
"slices"
"testing"
"time"

Expand Down Expand Up @@ -127,7 +128,7 @@ func TestEventExtractor(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := EventExtractor(tuple.PairOf(tt.previous, tt.current))
got := slices.Collect(EventExtractor(tuple.PairOf(tt.previous, tt.current)))
assert.Equal(t, tt.want, got)
})
}
Expand Down
45 changes: 45 additions & 0 deletions internal/channels/itercontext_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package channels

import (
"context"
"slices"
"testing"
"time"

"github.com/alecthomas/assert/v2"
)

func TestIterContext(t *testing.T) {
t.Run("iterates until channel closed", func(t *testing.T) {
ch := make(chan int)
ctx := context.Background()

// Start goroutine to send values
go func() {
ch <- 1
ch <- 2
ch <- 3
close(ch)
}()

assert.Equal(t, []int{1, 2, 3}, slices.Collect(IterContext(ctx, ch)))
})

t.Run("stops when context cancelled", func(t *testing.T) {
ch := make(chan int)
ctx, cancel := context.WithCancel(context.Background())

// Start goroutine to send values
go func() {
ch <- 1
ch <- 2
time.Sleep(10 * time.Millisecond) // Small delay to ensure cancel happens
cancel() // Cancel context before sending 3
ch <- 3 // This should not be received
close(ch)
}()

assert.Equal(t, []int{1, 2}, slices.Collect(IterContext(ctx, ch)))
assert.Error(t, ctx.Err())
})
}
6 changes: 3 additions & 3 deletions internal/iterops/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

// ChangeExtractor extracts changes from an old and new state.
type ChangeExtractor[S, C any] func(tuple.Pair[S, S]) []C
type ChangeExtractor[S, C any] func(tuple.Pair[S, S]) iter.Seq[C]

// Changes returns a stream of change events from a stream of evolving state.
func Changes[S, C any](in iter.Seq[S], start S, extractor ChangeExtractor[S, C]) iter.Seq[C] {
return FlatMap(WindowPair(in, start), extractor)
func Changes[S, C any](in iter.Seq[S], extractor ChangeExtractor[S, C]) iter.Seq[C] {
return FlatMap(WindowPair(in), extractor)
}
15 changes: 15 additions & 0 deletions internal/iterops/concat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package iterops

import "iter"

func Concat[T any](in ...iter.Seq[T]) iter.Seq[T] {
return func(yield func(T) bool) {
for _, n := range in {
for m := range n {
if !yield(m) {
return
}
}
}
}
}
13 changes: 13 additions & 0 deletions internal/iterops/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package iterops

import "iter"

func Const[T any](in ...T) iter.Seq[T] {
return func(yield func(T) bool) {
for _, n := range in {
if !yield(n) {
return
}
}
}
}
20 changes: 20 additions & 0 deletions internal/iterops/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package iterops

import (
"iter"
"reflect"
)

// Dedup returns an iterator that yields values from the input iterator, removing consecutive duplicates.
func Dedup[T any](seq iter.Seq[T]) iter.Seq[T] {
return func(yield func(T) bool) {
var last T
seq(func(v T) bool {
if reflect.DeepEqual(v, last) {
return true
}
last = v
return yield(v)
})
}
}
8 changes: 8 additions & 0 deletions internal/iterops/empty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package iterops

import "iter"

// Empty returns an empty iterator.
func Empty[T any]() iter.Seq[T] {
return func(yield func(T) bool) {}
}
52 changes: 49 additions & 3 deletions internal/iterops/interops_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iterops_test

import (
"iter"
"slices"
"testing"

Expand All @@ -11,9 +12,8 @@ import (

func TestWindowPair(t *testing.T) {
input := slices.Values([]int{1, 2, 3, 4})
result := slices.Collect(iterops.WindowPair(input, 0))
result := slices.Collect(iterops.WindowPair(input))
assert.Equal(t, result, []tuple.Pair[int, int]{
tuple.PairOf(0, 1),
tuple.PairOf(1, 2),
tuple.PairOf(2, 3),
tuple.PairOf(3, 4),
Expand All @@ -28,6 +28,52 @@ func TestMap(t *testing.T) {

func TestFlatMap(t *testing.T) {
input := slices.Values([]int{1, 2, 3, 4})
result := slices.Collect(iterops.FlatMap(input, func(v int) []int { return []int{v, v * 2} }))
result := slices.Collect(iterops.FlatMap(input, func(v int) iter.Seq[int] { return iterops.Const(v, v*2) }))
assert.Equal(t, result, []int{1, 2, 2, 4, 3, 6, 4, 8})
}

func TestConcat(t *testing.T) {
input := slices.Values([]int{1, 2, 3, 4})
result := slices.Collect(iterops.Concat(input, input))
assert.Equal(t, result, []int{1, 2, 3, 4, 1, 2, 3, 4})

result = slices.Collect(iterops.Concat(
iterops.Const(1),
iterops.Const(2),
iterops.Const(3),
iterops.Const(4),
))
assert.Equal(t, result, []int{1, 2, 3, 4})
}

func TestConst(t *testing.T) {
input := 1
result := slices.Collect(iterops.Const(input))
assert.Equal(t, result, []int{1})
}

func TestEmpty(t *testing.T) {
result := slices.Collect(iterops.Empty[int]())
assert.Equal(t, result, nil)

assert.Equal(t, slices.Collect(iterops.Concat(
iterops.Empty[int](),
iterops.Empty[int](),
)), nil)

assert.Equal(t, slices.Collect(iterops.Concat(
iterops.Const(1),
iterops.Empty[int](),
)), []int{1})

assert.Equal(t, slices.Collect(iterops.Concat(
iterops.Empty[int](),
iterops.Const(1),
)), []int{1})
}

func TestDedup(t *testing.T) {
input := slices.Values([]int{1, 2, 2, 3, 3, 4, 1})
result := slices.Collect(iterops.Dedup(input))
assert.Equal(t, result, []int{1, 2, 3, 4, 1})
}
4 changes: 2 additions & 2 deletions internal/iterops/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ func Map[T any, U any](in iter.Seq[T], fn func(T) U) iter.Seq[U] {
}
}

func FlatMap[T any, U any](in iter.Seq[T], fn func(T) []U) iter.Seq[U] {
func FlatMap[T any, U any](in iter.Seq[T], fn func(T) iter.Seq[U]) iter.Seq[U] {
return func(yield func(U) bool) {
for n := range in {
for _, u := range fn(n) {
for u := range fn(n) {
if !yield(u) {
return
}
Expand Down
15 changes: 9 additions & 6 deletions internal/iterops/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ package iterops
import (
"iter"

"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/tuple"
)

// WindowPair returns a window of size 2 of the input iterator.
func WindowPair[T any](in iter.Seq[T], start T) iter.Seq[tuple.Pair[T, T]] {
func WindowPair[T any](in iter.Seq[T]) iter.Seq[tuple.Pair[T, T]] {
return func(yield func(tuple.Pair[T, T]) bool) {
previous := start
previous := optional.None[T]()
for n := range in {
result := tuple.PairOf(previous, n)
previous = n
if !yield(result) {
return
if val, ok := previous.Get(); ok {
result := tuple.PairOf(val, n)
if !yield(result) {
return
}
}
previous = optional.Some(n)
}
}
}
13 changes: 11 additions & 2 deletions internal/raft/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
raftpbconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/raft/v1/raftpbconnect"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/iterops"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/retry"
"github.com/block/ftl/internal/rpc"
Expand Down Expand Up @@ -208,9 +209,16 @@ func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq
panic("cluster not started")
}

result := make(chan R)
result := make(chan R, 64)
logger := log.FromContext(ctx).Scope("raft")

previous, err := s.Query(ctx, query)
if err != nil {
return nil, err
}

result <- previous

// get the last known index as the starting point
last, err := s.getLastIndex()
if err != nil {
Expand Down Expand Up @@ -253,7 +261,8 @@ func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq
}
}()

return channels.IterContext(ctx, result), nil
// dedup, as we might get false positives due to index changes caused by membership changes
return iterops.Dedup(channels.IterContext(ctx, result)), nil
}

func (s *ShardHandle[Q, R, E]) getLastIndex() (uint64, error) {
Expand Down
9 changes: 5 additions & 4 deletions internal/raft/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestLeavingCluster(t *testing.T) {
assertShardValue(ctx, t, 2, shards[1:]...)
}

func TestChanges(t *testing.T) {
func TestStateIter(t *testing.T) {
ctx := testContext(t)

_, shards := startClusters(ctx, t, 2, func(b *raft.Builder) sm.Handle[int64, int64, IntEvent] {
Expand All @@ -156,9 +156,10 @@ func TestChanges(t *testing.T) {
assert.NoError(t, shards[1].Publish(ctx, IntEvent(1)))

next, _ := iter.Pull(changes)
_, _ = next()
v, _ := next()
assert.Equal(t, v, 2)
v1, _ := next()
v2, _ := next()
// we might get the value in the first call sometimes if both events are processed in the same tick
assert.True(t, v1 == 2 || v2 == 2)
}

func testBuilder(t *testing.T, addresses []*net.TCPAddr, id uint64, address string, controlBind *url.URL) *raft.Builder {
Expand Down
Loading

0 comments on commit e8b4edc

Please sign in to comment.