From c8baa4bb8ea6909126f4e47fb87756b189e53bed Mon Sep 17 00:00:00 2001 From: Hugon Sknadaj Date: Fri, 29 Jul 2022 12:17:32 +0200 Subject: [PATCH] Add FanOut stream builder and processor that branches stream into N streams. (#102) --- processor.go | 36 +++++++++++++++++++++++++++++++++++- processor_test.go | 36 +++++++++++++++++++++++++++++++++++- stream.go | 13 +++++++++++++ stream_internal_test.go | 17 +++++++++++++++++ 4 files changed, 100 insertions(+), 2 deletions(-) diff --git a/processor.go b/processor.go index 97f0bbd..090bcbd 100644 --- a/processor.go +++ b/processor.go @@ -9,7 +9,7 @@ import ( type Committer interface { Processor - //Commit commits a processors batch. + // Commit commits a processors batch. Commit(ctx context.Context) error } @@ -115,6 +115,40 @@ func (p *BranchProcessor) Close() error { return nil } +// FanOutProcessor is a processor that passes the message to multiple children. +type FanOutProcessor struct { + streams int + pipe Pipe +} + +// NewFanOutProcessor creates a new FanOutProcessor instance. +func NewFanOutProcessor(streams int) Processor { + return &FanOutProcessor{ + streams: streams, + } +} + +// WithPipe sets the pipe on the Processor. +func (p *FanOutProcessor) WithPipe(pipe Pipe) { + p.pipe = pipe +} + +// Process processes the stream nodeMessage. +func (p *FanOutProcessor) Process(msg Message) error { + for i := 0; i < p.streams; i++ { + if err := p.pipe.ForwardToChild(msg, i); err != nil { + return err + } + } + + return nil +} + +// Close closes the processor. +func (p *FanOutProcessor) Close() error { + return nil +} + // FilterProcessor is a processor that filters a stream using a predicate function. type FilterProcessor struct { pipe Pipe diff --git a/processor_test.go b/processor_test.go index 4cd3be2..dac5add 100644 --- a/processor_test.go +++ b/processor_test.go @@ -4,9 +4,10 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" + "github.com/msales/streams/v6" "github.com/msales/streams/v6/mocks" - "github.com/stretchr/testify/assert" ) func TestBranchProcessor_Process(t *testing.T) { @@ -63,6 +64,39 @@ func TestBranchProcessor_Close(t *testing.T) { assert.NoError(t, err) } +func TestFanOutProcessor_Process(t *testing.T) { + pipe := mocks.NewPipe(t) + pipe.ExpectForwardToChild("test", "test", 0) + pipe.ExpectForwardToChild("test", "test", 1) + p := streams.NewFanOutProcessor(2) + p.WithPipe(pipe) + + err := p.Process(streams.NewMessage("test", "test")) + + assert.NoError(t, err) + pipe.AssertExpectations() +} + +func TestFanOutProcessor_ProcessWithForwardError(t *testing.T) { + pipe := mocks.NewPipe(t) + pipe.ExpectForwardToChild("test", "test", 0) + pipe.ShouldError() + p := streams.NewFanOutProcessor(2) + p.WithPipe(pipe) + + err := p.Process(streams.NewMessage("test", "test")) + + assert.Error(t, err) +} + +func TestFanOutProcessor_Close(t *testing.T) { + p := streams.NewFanOutProcessor(2) + + err := p.Close() + + assert.NoError(t, err) +} + func TestFilterProcessor_Process(t *testing.T) { pred := streams.PredicateFunc(func(msg streams.Message) (bool, error) { if _, ok := msg.Key.(string); ok { diff --git a/stream.go b/stream.go index 6ff191b..6486e9b 100644 --- a/stream.go +++ b/stream.go @@ -62,6 +62,19 @@ func (s *Stream) Branch(name string, preds ...Predicate) []*Stream { return streams } +// FanOut creates multiple streams based on the number of streams necessary. +// It should be used when the same message is supposed to be processed by multiple sinks. +func (s *Stream) FanOut(name string, number int) []*Stream { + p := NewFanOutProcessor(number) + n := s.tp.AddProcessor(name, p, s.parents) + + streams := make([]*Stream, 0, number) + for i := 0; i < number; i++ { + streams = append(streams, newStream(s.tp, []Node{n})) + } + return streams +} + // BranchFunc branches a stream based on the given predicates. func (s *Stream) BranchFunc(name string, preds ...PredicateFunc) []*Stream { ps := make([]Predicate, len(preds)) diff --git a/stream_internal_test.go b/stream_internal_test.go index 0abbd8d..b1d32f1 100644 --- a/stream_internal_test.go +++ b/stream_internal_test.go @@ -211,6 +211,23 @@ func TestStream_Process(t *testing.T) { assert.Equal(t, stream.parents[0].(*ProcessorNode).processor, proc) } +func TestStream_FanOut(t *testing.T) { + source := &streamSource{} + builder := NewStreamBuilder() + + streams := builder.Source("source", source).FanOut("test", 2) + + assert.Len(t, streams, 2) + assert.Len(t, streams[0].parents, 1) + assert.IsType(t, &ProcessorNode{}, streams[0].parents[0]) + assert.Equal(t, streams[0].parents[0].(*ProcessorNode).name, "test") + assert.IsType(t, &FanOutProcessor{}, streams[0].parents[0].(*ProcessorNode).processor) + assert.Len(t, streams[1].parents, 1) + assert.IsType(t, &ProcessorNode{}, streams[1].parents[0]) + assert.Equal(t, streams[1].parents[0].(*ProcessorNode).name, "test") + assert.IsType(t, &FanOutProcessor{}, streams[1].parents[0].(*ProcessorNode).processor) +} + type streamSource struct{} func (s streamSource) Consume() (Message, error) {