Skip to content

Commit

Permalink
Add FanOut stream builder and processor that branches stream into N s…
Browse files Browse the repository at this point in the history
…treams. (#102)
  • Loading branch information
Skandalik authored Jul 29, 2022
1 parent b907dcc commit c8baa4b
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 2 deletions.
36 changes: 35 additions & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type Committer interface {
Processor

//Commit commits a processors batch.
// Commit commits a processors batch.
Commit(ctx context.Context) error
}

Expand Down Expand Up @@ -115,6 +115,40 @@ func (p *BranchProcessor) Close() error {
return nil
}

// FanOutProcessor is a processor that passes the message to multiple children.
type FanOutProcessor struct {
streams int
pipe Pipe
}

// NewFanOutProcessor creates a new FanOutProcessor instance.
func NewFanOutProcessor(streams int) Processor {
return &FanOutProcessor{
streams: streams,
}
}

// WithPipe sets the pipe on the Processor.
func (p *FanOutProcessor) WithPipe(pipe Pipe) {
p.pipe = pipe
}

// Process processes the stream nodeMessage.
func (p *FanOutProcessor) Process(msg Message) error {
for i := 0; i < p.streams; i++ {
if err := p.pipe.ForwardToChild(msg, i); err != nil {
return err
}
}

return nil
}

// Close closes the processor.
func (p *FanOutProcessor) Close() error {
return nil
}

// FilterProcessor is a processor that filters a stream using a predicate function.
type FilterProcessor struct {
pipe Pipe
Expand Down
36 changes: 35 additions & 1 deletion processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"

"github.com/msales/streams/v6"
"github.com/msales/streams/v6/mocks"
"github.com/stretchr/testify/assert"
)

func TestBranchProcessor_Process(t *testing.T) {
Expand Down Expand Up @@ -63,6 +64,39 @@ func TestBranchProcessor_Close(t *testing.T) {
assert.NoError(t, err)
}

func TestFanOutProcessor_Process(t *testing.T) {
pipe := mocks.NewPipe(t)
pipe.ExpectForwardToChild("test", "test", 0)
pipe.ExpectForwardToChild("test", "test", 1)
p := streams.NewFanOutProcessor(2)
p.WithPipe(pipe)

err := p.Process(streams.NewMessage("test", "test"))

assert.NoError(t, err)
pipe.AssertExpectations()
}

func TestFanOutProcessor_ProcessWithForwardError(t *testing.T) {
pipe := mocks.NewPipe(t)
pipe.ExpectForwardToChild("test", "test", 0)
pipe.ShouldError()
p := streams.NewFanOutProcessor(2)
p.WithPipe(pipe)

err := p.Process(streams.NewMessage("test", "test"))

assert.Error(t, err)
}

func TestFanOutProcessor_Close(t *testing.T) {
p := streams.NewFanOutProcessor(2)

err := p.Close()

assert.NoError(t, err)
}

func TestFilterProcessor_Process(t *testing.T) {
pred := streams.PredicateFunc(func(msg streams.Message) (bool, error) {
if _, ok := msg.Key.(string); ok {
Expand Down
13 changes: 13 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ func (s *Stream) Branch(name string, preds ...Predicate) []*Stream {
return streams
}

// FanOut creates multiple streams based on the number of streams necessary.
// It should be used when the same message is supposed to be processed by multiple sinks.
func (s *Stream) FanOut(name string, number int) []*Stream {
p := NewFanOutProcessor(number)
n := s.tp.AddProcessor(name, p, s.parents)

streams := make([]*Stream, 0, number)
for i := 0; i < number; i++ {
streams = append(streams, newStream(s.tp, []Node{n}))
}
return streams
}

// BranchFunc branches a stream based on the given predicates.
func (s *Stream) BranchFunc(name string, preds ...PredicateFunc) []*Stream {
ps := make([]Predicate, len(preds))
Expand Down
17 changes: 17 additions & 0 deletions stream_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ func TestStream_Process(t *testing.T) {
assert.Equal(t, stream.parents[0].(*ProcessorNode).processor, proc)
}

func TestStream_FanOut(t *testing.T) {
source := &streamSource{}
builder := NewStreamBuilder()

streams := builder.Source("source", source).FanOut("test", 2)

assert.Len(t, streams, 2)
assert.Len(t, streams[0].parents, 1)
assert.IsType(t, &ProcessorNode{}, streams[0].parents[0])
assert.Equal(t, streams[0].parents[0].(*ProcessorNode).name, "test")
assert.IsType(t, &FanOutProcessor{}, streams[0].parents[0].(*ProcessorNode).processor)
assert.Len(t, streams[1].parents, 1)
assert.IsType(t, &ProcessorNode{}, streams[1].parents[0])
assert.Equal(t, streams[1].parents[0].(*ProcessorNode).name, "test")
assert.IsType(t, &FanOutProcessor{}, streams[1].parents[0].(*ProcessorNode).processor)
}

type streamSource struct{}

func (s streamSource) Consume() (Message, error) {
Expand Down

0 comments on commit c8baa4b

Please sign in to comment.