Skip to content

Commit

Permalink
Merge pull request #87 from blendle/easy-errors
Browse files Browse the repository at this point in the history
Add convenience function to handle stream errors
  • Loading branch information
JeanMertz authored May 22, 2018
2 parents 338d65e + b916931 commit 79ecd70
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 26 deletions.
20 changes: 20 additions & 0 deletions stream/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package stream

// ErrorCloser interface contains a shared subset of methods between consumers
// and producers. This subset can be used to collectively listen to errors from
// any of the configured stream consumers or producers, and close them all when
// one triggers an error.
type ErrorCloser interface {
// Errors is a read-only channel on which the consumer or producer delivers
// any errors that occurred while consuming from, or producing to the stream.
Errors() <-chan error

// Close closes the consumer or producer. After calling this method, the
// consumer or producer is no longer in a usable state, and future method
// calls can result in panics.
//
// Check the specific implementations to know what happens when calling close,
// but in general any active connection to the message stream is terminated
// and the messages channel is closed.
Close() error
}
15 changes: 2 additions & 13 deletions stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package stream

// Consumer interface to be implemented by different stream clients.
type Consumer interface {
ErrorCloser

// Messages is a read-only channel on which the consumer delivers any messages
// being read from the stream.
//
// The channel returns each message as a `stream.Message` value object.
Messages() <-chan Message

// Errors is a read-only channel on which the consumer delivers any errors
// that occurred while consuming from the stream.
Errors() <-chan error

// Ack can be used to acknowledge that a message was processed and should not
// be delivered again.
Ack(Message) error
Expand All @@ -20,15 +18,6 @@ type Consumer interface {
// was _not_ processed, and should be delivered again in the future.
Nack(Message) error

// Close closes the consumer. After calling this method, the consumer is no
// longer in a usable state, and subsequent method calls can result in
// panics.
//
// Check the specific implementations to know what exactly happens when
// calling close, but in general any active connection to the message stream
// is terminated and the messages channel is closed.
Close() error

// Config returns the final configuration used by the consumer as an
// interface. To access the configuration, cast the interface to a
// `streamconfig.Consumer` struct.
Expand Down
15 changes: 2 additions & 13 deletions stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,14 @@ package stream

// Producer interface to be implemented by different stream clients.
type Producer interface {
ErrorCloser

// Messages is a write-only channel on which you can deliver any messages that
// need to be produced on the message stream.
//
// The channel accepts `stream.Message` value objects.
Messages() chan<- Message

// Errors is a read-only channel on which the producer delivers any errors
// that occurred while producing to the stream.
Errors() <-chan error

// Close closes the producer. After calling this method, the producer is no
// longer in a usable state, and subsequent method calls can result in
// panics.
//
// Check the specific implementations to know what exactly happens when
// calling close, but in general no new messages will be delivered to the
// message stream and the messages channel is closed.
Close() error

// Config returns the final configuration used by the producer as an
// interface. To access the configuration, cast the interface to a
// `streamconfig.Producer` struct.
Expand Down
18 changes: 18 additions & 0 deletions streamutil/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,28 @@ package streamutil
import (
"errors"

"github.com/blendle/go-streamprocessor/stream"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// Errors takes a list of stream consumers and/or producers, and returns a
// combined errors channel on which any errors reported by the processors are
// delivered.
func Errors(errs ...stream.ErrorCloser) <-chan error {
errChan := make(chan error)

for _, e := range errs {
go func(c <-chan error) {
for {
errChan <- (<-c)
}
}(e.Errors())
}

return errChan
}

// HandleErrors listens to the provided channel, and triggers a fatal error when
// any error is received.
func HandleErrors(ch chan error, logger func(msg string, fields ...zapcore.Field)) {
Expand Down
41 changes: 41 additions & 0 deletions streamutil/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,44 @@ func TestErrorsChan(t *testing.T) {
})
}
}

type errorCloserStub struct{ errors chan error }

func (ec *errorCloserStub) Errors() <-chan error { return ec.errors }
func (ec *errorCloserStub) Close() error { return nil }

func TestErrors(t *testing.T) {
t.Parallel()

ec1 := &errorCloserStub{errors: make(chan error)}
ec2 := &errorCloserStub{errors: make(chan error)}

ch := streamutil.Errors(ec1, ec2)

go func() { ec1.errors <- errors.New("error 1") }()

select {
case err := <-ch:
assert.Equal(t, err, errors.New("error 1"))
case <-time.After(time.Second):
t.Fatal("timeout while waiting for error to return")
}

go func() { ec1.errors <- errors.New("error 2") }()

select {
case err := <-ch:
assert.Equal(t, err, errors.New("error 2"))
case <-time.After(time.Second):
t.Fatal("timeout while waiting for error to return")
}

go func() { ec2.errors <- errors.New("error 3") }()

select {
case err := <-ch:
assert.Equal(t, err, errors.New("error 3"))
case <-time.After(time.Second):
t.Fatal("timeout while waiting for error to return")
}
}

0 comments on commit 79ecd70

Please sign in to comment.