Skip to content

Commit

Permalink
Implemented channel source and sink (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkurzeja committed Dec 13, 2018
1 parent 921465e commit 77f7fde
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 0 deletions.
34 changes: 34 additions & 0 deletions channel/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package channel

import "github.com/msales/streams/v2"

// Sink represents a channel sink.
type Sink struct {
pipe streams.Pipe

ch chan *streams.Message
}

// NewSink creates a new channel Sink.
func NewSink(ch chan *streams.Message) *Sink {
return &Sink{ch: ch}
}

// WithPipe sets the pipe on the Processor.
func (s *Sink) WithPipe(pipe streams.Pipe) {
s.pipe = pipe
}

// Process processes the stream Message.
func (s *Sink) Process(msg *streams.Message) error {
s.ch <- msg

return s.pipe.Mark(msg)
}

// Close closes the processor.
func (s *Sink) Close() error {
close(s.ch)

return nil
}
45 changes: 45 additions & 0 deletions channel/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package channel_test

import (
"testing"

"github.com/msales/streams/v2"
"github.com/msales/streams/v2/channel"
"github.com/msales/streams/v2/mocks"
"github.com/stretchr/testify/assert"
)

func TestNewSink(t *testing.T) {
sink := channel.NewSink(nil)

assert.Equal(t, &channel.Sink{}, sink)
}

func TestSink_Close(t *testing.T) {
ch := make(chan *streams.Message)
sink := channel.NewSink(ch)

err := sink.Close()
_, open := <-ch

assert.NoError(t, err)
assert.False(t, open)
}

func TestSink_Process(t *testing.T) {
ch := make(chan *streams.Message, 1)
sink := channel.NewSink(ch)

pipe := mocks.NewPipe(t)
pipe.ExpectMark(nil, "test")

sink.WithPipe(pipe)

msg := &streams.Message{Value: "test"}

err := sink.Process(msg)

assert.NoError(t, err)
assert.Equal(t, msg, <-ch)
pipe.AssertExpectations()
}
42 changes: 42 additions & 0 deletions channel/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package channel

import (
"time"

"github.com/msales/streams/v2"
)

// Compile-time interface check.
var _ streams.Source = (*Source)(nil)

// Source represents a source that consumes messages from a channel.
type Source struct {
ch chan *streams.Message
}

// NewSource creates a new channel Source.
func NewSource(ch chan *streams.Message) *Source {
return &Source{ch: ch}
}

// Consume gets the next record from the Source.
func (s *Source) Consume() (*streams.Message, error) {
select {

case msg := <-s.ch:
return msg.WithMetadata(nil, nil), nil

case <-time.After(100 * time.Millisecond):
return streams.NewMessage(nil, nil), nil
}
}

// Commit marks the consumed records as processed.
func (s *Source) Commit(interface{}) error {
return nil
}

// Close closes the Source.
func (s *Source) Close() error {
return nil
}
91 changes: 91 additions & 0 deletions channel/source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package channel_test

import (
"testing"

"github.com/msales/streams/v2"
"github.com/msales/streams/v2/channel"
"github.com/stretchr/testify/assert"
)

func TestNewSource(t *testing.T) {
src := channel.NewSource(nil)

assert.Equal(t, &channel.Source{}, src)
}

func TestSource_Consume(t *testing.T) {
msgs := make([]*streams.Message, 3)
for i := 0; i < len(msgs); i++ {
msgs[i] = streams.NewMessage(i, i).WithMetadata(mockSource{}, mockMetadata{})
}

ch := make(chan *streams.Message, len(msgs))

for _, msg := range msgs {
ch <- msg
}

src := channel.NewSource(ch)

for i := 0; i < len(msgs); i++ {
msg, err := src.Consume()
src, meta := msg.Metadata()

assert.NoError(t, err)
assert.Equal(t, msgs[i].Key, msg.Key)
assert.Equal(t, msgs[i].Value, msg.Value)
assert.Nil(t, src)
assert.Nil(t, meta)
}
}

func TestSource_Consume_WithEmptyMessage(t *testing.T) {
src := channel.NewSource(nil)

msg, err := src.Consume()
assert.NoError(t, err)
assert.True(t, msg.Empty())
}

func TestSource_Commit(t *testing.T) {
src := channel.NewSource(nil)

err := src.Commit(nil)

assert.NoError(t, err)
}

func TestSource_Close(t *testing.T) {
ch := make(chan *streams.Message)
src := channel.NewSource(ch)

err := src.Close()

assert.NoError(t, err)
assert.NotPanics(t, func() { // Assert that the ch is not closed.
close(ch)
})
}

type mockSource struct{}

func (mockSource) Consume() (*streams.Message, error) {
return nil, nil
}

func (mockSource) Commit(interface{}) error {
return nil
}

func (mockSource) Close() error {
return nil
}

type mockMetadata struct{}

func (mockMetadata) WithOrigin(streams.MetadataOrigin) {}

func (m mockMetadata) Merge(streams.Metadata, streams.MetadataStrategy) streams.Metadata {
return m
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/magiconair/properties v1.8.0
github.com/msales/pkg/v3 v3.1.0
github.com/pierrec/lz4 v0.0.0-20181005164709-635575b42742 // indirect
github.com/pkg/errors v0.8.0
Expand Down

0 comments on commit 77f7fde

Please sign in to comment.