Skip to content

Commit

Permalink
Decoder injection integration (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
drakos74 authored and Sotirios Mantziaris committed Nov 11, 2019
1 parent ef20a4b commit cbfecb5
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 68 deletions.
90 changes: 53 additions & 37 deletions async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"
"os"
"reflect"
"runtime"
"strconv"

"github.com/Shopify/sarama"
Expand All @@ -13,7 +15,7 @@ import (
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/trace"
"github.com/google/uuid"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -44,14 +46,17 @@ type message struct {
dec encoding.DecodeRawFunc
}

// Context returns the context encapsulated in the message
func (m *message) Context() context.Context {
return m.ctx
}

// Decode will implement the decoding logic in order to transform the message bytes to a business entity
func (m *message) Decode(v interface{}) error {
return m.dec(m.msg.Value, v)
}

// Ack sends aknowledges the message has been processed
func (m *message) Ack() error {
if m.sess != nil {
m.sess.MarkMessage(m.msg, "")
Expand All @@ -60,6 +65,7 @@ func (m *message) Ack() error {
return nil
}

// Nack signals the producing side an erroring condition or inconsistency
func (m *message) Nack() error {
trace.SpanError(m.span)
return nil
Expand All @@ -68,15 +74,14 @@ func (m *message) Nack() error {
// Factory definition of a consumer factory.
type Factory struct {
name string
ct string
topic string
group string
brokers []string
oo []OptionFunc
}

// New constructor.
func New(name, ct, topic, group string, brokers []string, oo ...OptionFunc) (*Factory, error) {
func New(name, topic, group string, brokers []string, oo ...OptionFunc) (*Factory, error) {

if name == "" {
return nil, errors.New("name is required")
Expand All @@ -90,7 +95,7 @@ func New(name, ct, topic, group string, brokers []string, oo ...OptionFunc) (*Fa
return nil, errors.New("topic is required")
}

return &Factory{name: name, ct: ct, topic: topic, group: group, brokers: brokers, oo: oo}, nil
return &Factory{name: name, topic: topic, group: group, brokers: brokers, oo: oo}, nil
}

// Create a new consumer.
Expand All @@ -107,11 +112,10 @@ func (f *Factory) Create() (async.Consumer, error) {
config.Version = sarama.V0_11_0_0

c := &consumer{
brokers: f.brokers,
topic: f.topic,
cfg: config,
contentType: f.ct,
buffer: 1000,
brokers: f.brokers,
topic: f.topic,
cfg: config,
buffer: 1000,
}

if f.group != "" {
Expand All @@ -123,24 +127,25 @@ func (f *Factory) Create() (async.Consumer, error) {
for _, o := range f.oo {
err = o(c)
if err != nil {
return nil, err
return nil, fmt.Errorf("Could not apply OptionFunc '%v' to consumer : %v", runtime.FuncForPC(reflect.ValueOf(o).Pointer()).Name(), err)
}
}

return c, nil
}

// consumer members can be injected or overwritten with the usage of OptionFunc arguments
type consumer struct {
brokers []string
topic string
group string
buffer int
traceTag opentracing.Tag
cfg *sarama.Config
contentType string
cnl context.CancelFunc
cg sarama.ConsumerGroup
ms sarama.Consumer
brokers []string
topic string
group string
buffer int
traceTag opentracing.Tag
cfg *sarama.Config
cnl context.CancelFunc
cg sarama.ConsumerGroup
ms sarama.Consumer
dec encoding.DecodeRawFunc
}

// Consume starts consuming messages from a Kafka topic.
Expand Down Expand Up @@ -207,7 +212,7 @@ func consume(ctx context.Context, c *consumer) (<-chan async.Message, <-chan err
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get partitions")
}
// When kafka cluster is not fully initialized, we may get 0 partions.
// When kafka cluster is not fully initialized, we may get 0 partitions.
if len(pcs) == 0 {
return nil, nil, errors.New("got 0 partitions")
}
Expand Down Expand Up @@ -253,26 +258,15 @@ func claimMessage(ctx context.Context, c *consumer, msg *sarama.ConsumerMessage)
trace.KafkaConsumerComponent,
mapHeader(msg.Headers),
)
var ct string
var err error
if c.contentType != "" {
ct = c.contentType
} else {
ct, err = determineContentType(msg.Headers)
if err != nil {
trace.SpanError(sp)
return nil, errors.Wrap(err, "failed to determine content type")
}
}

dec, err := async.DetermineDecoder(ct)
chCtx = log.WithContext(chCtx, log.Sub(map[string]interface{}{"messageID": uuid.New().String()}))

dec, err := determineDecoder(c, msg, sp)

if err != nil {
trace.SpanError(sp)
return nil, errors.Wrapf(err, "failed to determine decoder for %s", ct)
return nil, fmt.Errorf("Could not determine decoder %v", err)
}

chCtx = log.WithContext(chCtx, log.Sub(map[string]interface{}{"messageID": uuid.New().String()}))

return &message{
ctx: chCtx,
dec: dec,
Expand All @@ -281,6 +275,28 @@ func claimMessage(ctx context.Context, c *consumer, msg *sarama.ConsumerMessage)
}, nil
}

func determineDecoder(c *consumer, msg *sarama.ConsumerMessage, sp opentracing.Span) (encoding.DecodeRawFunc, error) {

if c.dec != nil {
return c.dec, nil
}

ct, err := determineContentType(msg.Headers)
if err != nil {
trace.SpanError(sp)
return nil, fmt.Errorf("failed to determine content type from message headers %v : %v", msg.Headers, err)
}

dec, err := async.DetermineDecoder(ct)

if err != nil {
trace.SpanError(sp)
return nil, fmt.Errorf("failed to determine decoder from message content type %v %v", ct, err)
}

return dec, nil
}

// Close handles closing consumer.
func (c *consumer) Close() error {
if c.cnl != nil {
Expand Down
Loading

0 comments on commit cbfecb5

Please sign in to comment.