Skip to content

Commit

Permalink
Merge pull request #10 from RuiFG/master
Browse files Browse the repository at this point in the history
feat: window operator support custom state descriptor
  • Loading branch information
RuiFG authored Jan 9, 2023
2 parents 5c706c6 + de687c6 commit 21e26f0
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 63 deletions.
8 changes: 2 additions & 6 deletions streaming-connector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ module github.com/RuiFG/streaming/streaming-connector
go 1.19

require (
github.com/RuiFG/streaming/streaming-core v0.1.2
github.com/Shopify/sarama v1.35.0
github.com/RuiFG/streaming/streaming-core v0.2.1
)

require (
Expand All @@ -27,14 +26,11 @@ require (
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/twmb/murmur3 v1.1.5 // indirect
github.com/uber-go/tally/v4 v4.1.4 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/nutsdb v0.11.1 // indirect
github.com/xujiajun/nutsdb v0.11.0 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220708220712-1185a9018129 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
Expand Down
10 changes: 8 additions & 2 deletions streaming-core/store/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func RegisterOrGet[T any](controller Controller, descriptor StateDescriptor[T])
func GobRegisterOrGet[T any](controller Controller, key string, initializer StateInitializer[T],
serializePostProcessor StateSerializePostProcessor[T],
deserializePostProcessor StateDeserializePostProcessor[T]) (StateController[T], error) {
return RegisterOrGet[T](controller, GenerateGobStateDescriptor[T](key, initializer, serializePostProcessor, deserializePostProcessor))
}

func GenerateGobStateDescriptor[T any](key string, initializer StateInitializer[T],
serializePostProcessor StateSerializePostProcessor[T],
deserializePostProcessor StateDeserializePostProcessor[T]) StateDescriptor[T] {
if serializePostProcessor == nil {
serializePostProcessor = func(i []byte, err error) ([]byte, error) {
return i, err
Expand All @@ -61,7 +67,7 @@ func GobRegisterOrGet[T any](controller Controller, key string, initializer Stat
if deserializePostProcessor == nil {
deserializePostProcessor = func(v T, err error) (T, error) { return v, err }
}
return RegisterOrGet[T](controller, StateDescriptor[T]{
return StateDescriptor[T]{
Key: key,
Initializer: initializer,
Serializer: func(v T) ([]byte, error) {
Expand All @@ -80,5 +86,5 @@ func GobRegisterOrGet[T any](controller Controller, key string, initializer Stat
return deserializePostProcessor(*vPointer, nil)
}
},
})
}
}
17 changes: 16 additions & 1 deletion streaming-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,20 @@ module github.com/RuiFG/streaming/streaming-operator

go 1.19

require github.com/RuiFG/streaming/streaming-core v0.1.2
require github.com/RuiFG/streaming/streaming-core v0.2.0

require (
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/twmb/murmur3 v1.1.5 // indirect
github.com/uber-go/tally/v4 v4.1.4 // indirect
github.com/xujiajun/mmap-go v1.0.1 // indirect
github.com/xujiajun/nutsdb v0.11.1 // indirect
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/sys v0.0.0-20220405210540-1e041c57c461 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
8 changes: 4 additions & 4 deletions streaming-operator/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/RuiFG/streaming/streaming-core v0.1.0 h1:7va3w2p88QukDlqD8DCNDfUJC8iWxSAFDBWU8d5BXJo=
github.com/RuiFG/streaming/streaming-core v0.1.0/go.mod h1:cZ9UAxZbTlD2ZiVNW0UPh8V7styB24hNaAYasO7F5Nc=
github.com/RuiFG/streaming/streaming-core v0.2.0 h1:1wjFYbAqhSZKtV8/MdrclZzUt0Z9pwKbi6gZ9GQhYD4=
github.com/RuiFG/streaming/streaming-core v0.2.0/go.mod h1:QpIhBUEBuCybjTfTKyAdEbl7a5aZRA2Mm422ztFcuS8=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -102,8 +102,8 @@ github.com/uber-go/tally/v4 v4.1.4/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/
github.com/xujiajun/gorouter v1.2.0/go.mod h1:yJrIta+bTNpBM/2UT8hLOaEAFckO+m/qmR3luMIQygM=
github.com/xujiajun/mmap-go v1.0.1 h1:7Se7ss1fLPPRW+ePgqGpCkfGIZzJV6JPq9Wq9iv/WHc=
github.com/xujiajun/mmap-go v1.0.1/go.mod h1:CNN6Sw4SL69Sui00p0zEzcZKbt+5HtEnYUsc6BKKRMg=
github.com/xujiajun/nutsdb v0.11.0 h1:uzQb3Tvib0R/5kwdt6AcEJJpHtmcunGGMsq63dhnyyM=
github.com/xujiajun/nutsdb v0.11.0/go.mod h1:sAT5Kr8+53X2r1eFMHw2VSPLSAo/PiJCZPK5QtMsw7g=
github.com/xujiajun/nutsdb v0.11.1 h1:zLyIvp3ABHMohtcqi0sbt7gGOFWfse+ZbLv2GVb6ZYw=
github.com/xujiajun/nutsdb v0.11.1/go.mod h1:sAT5Kr8+53X2r1eFMHw2VSPLSAo/PiJCZPK5QtMsw7g=
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235 h1:w0si+uee0iAaCJO9q86T6yrhdadgcsoNuh47LrUykzg=
github.com/xujiajun/utils v0.0.0-20220904132955-5f7c5b914235/go.mod h1:MR4+0R6A9NS5IABnIM3384FfOq8QFVnm7WDrBOhIaMU=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
Expand Down
14 changes: 14 additions & 0 deletions streaming-operator/window/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package window

import (
"errors"
"github.com/RuiFG/streaming/streaming-core/store"
"time"
)

Expand All @@ -11,6 +12,7 @@ type options[KEY comparable, IN, ACC, WIN, OUT any] struct {
assignerFn AssignerFn[KEY, IN]
processWindowFn ProcessWindowFn[KEY, WIN, OUT]
aggregatorFn AggregatorFn[IN, ACC, WIN]
stateDescriptor store.StateDescriptor[map[Window]map[KEY]ACC]
allowedLateness int64
}

Expand Down Expand Up @@ -75,6 +77,7 @@ func WithAllowedLateness[KEY comparable, IN, ACC, WIN, OUT any](allowedLateness
if allowedLateness < 0 {
return errors.New("allowedLateness can't less than 0")
}
opts.allowedLateness = allowedLateness
return nil
}
}
Expand Down Expand Up @@ -116,3 +119,14 @@ func WithPassThroughProcess[KEY comparable, IN, ACC, OUT any]() WithOptions[KEY,
}

}

func WithStateDescriptor[KEY comparable, IN, ACC, WIN, OUT any](stateDescriptor store.StateDescriptor[map[Window]map[KEY]ACC]) WithOptions[KEY, IN, ACC, OUT, OUT] {
return func(opts *options[KEY, IN, ACC, OUT, OUT]) error {
if stateDescriptor.Key == "" || stateDescriptor.Serializer == nil ||
stateDescriptor.Deserializer == nil || stateDescriptor.Initializer == nil {
return errors.New("stateDescriptor value can't be nil")
}
opts.stateDescriptor = stateDescriptor
return nil
}
}
23 changes: 22 additions & 1 deletion streaming-operator/window/window.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package window

import (
"errors"
"fmt"
"github.com/RuiFG/streaming/streaming-core/element"
"math"
Expand Down Expand Up @@ -195,12 +196,32 @@ func (a *operator[KEY, IN, ACC, WIN, OUT]) cleanupTime(window Window) int64 {
}

func Apply[KEY comparable, IN, ACC, WIN, OUT any](upstream stream.Stream[IN], name string, withOptionsFns ...WithOptions[KEY, IN, ACC, WIN, OUT]) (stream.Stream[OUT], error) {
o := &options[KEY, IN, ACC, WIN, OUT]{}
o := &options[KEY, IN, ACC, WIN, OUT]{
stateDescriptor: store.GenerateGobStateDescriptor[map[Window]map[KEY]ACC]("gob-window-aggregate-Key-state",
func() map[Window]map[KEY]ACC {
return map[Window]map[KEY]ACC{}
}, nil, nil),
}
for _, withOptionsFn := range withOptionsFns {
if err := withOptionsFn(o); err != nil {
return nil, fmt.Errorf("%s illegal parameter: %w", name, err)
}
}
if o.selectorFn == nil {
return nil, errors.New("selectorFn can't be nil")
}
if o.triggerFn == nil {
return nil, errors.New("triggerFn can't be nil")
}
if o.assignerFn == nil {
return nil, errors.New("assignerFn can't be nil")
}
if o.aggregatorFn == nil {
return nil, errors.New("aggregatorFn can't be nil")
}
if o.processWindowFn == nil {
return nil, errors.New("processWindowFn can't be nil")
}
return stream.ApplyOneInput[IN, OUT](upstream, stream.OperatorStreamOptions{
Name: name,
Operator: OneInputOperatorToOperator[IN, OUT](&operator[KEY, IN, ACC, WIN, OUT]{
Expand Down
49 changes: 0 additions & 49 deletions streaming-operator/window/window_test.go

This file was deleted.

0 comments on commit 21e26f0

Please sign in to comment.