-
Notifications
You must be signed in to change notification settings - Fork 3
/
middleware.go
39 lines (31 loc) · 983 Bytes
/
middleware.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package baleen
import (
"errors"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/rotationalio/watermill-ensign/pkg/ensign"
)
var (
ErrUnhandledType = errors.New("ensign type not handled")
ErrUnhandledMIME = errors.New("ensign mimetype not handled")
)
func TypeFilter(mime string, etypes ...string) message.HandlerMiddleware {
typeFilter := make(map[string]struct{}, len(etypes))
for _, etype := range etypes {
typeFilter[etype] = struct{}{}
}
return func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
if _, ok := typeFilter[msg.Metadata.Get(ensign.TypeNameKey)]; !ok {
// TODO: when ensign has topics return ErrUnhandledType
// return nil, ErrUnhandledType
// HACK: to prevent tons of error logs we're just returning nil.
msg.Nack()
return nil, nil
}
if msg.Metadata.Get(ensign.MIMEKey) != mime {
return nil, ErrUnhandledMIME
}
return h(msg)
}
}
}