-
Notifications
You must be signed in to change notification settings - Fork 3
/
baleen.go
139 lines (116 loc) · 4 KB
/
baleen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
Package baleen is the top level library of the baleen language ingestion service. This
library provides the primary components for running the service as a long running
background daemon including the main service itself, configuration and other utilities.
*/
package baleen
import (
"context"
"os"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/rotationalio/baleen/config"
"github.com/rotationalio/baleen/logger"
"github.com/rotationalio/baleen/metrics"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func init() {
// Initializes zerolog with our default logging requirements
zerolog.TimeFieldFormat = time.RFC3339
zerolog.TimestampFieldName = logger.GCPFieldKeyTime
zerolog.MessageFieldName = logger.GCPFieldKeyMsg
zerolog.DurationFieldInteger = false
zerolog.DurationFieldUnit = time.Millisecond
// Add the severity hook for GCP logging
var gcpHook logger.SeverityHook
log.Logger = zerolog.New(os.Stdout).Hook(gcpHook).With().Timestamp().Logger()
}
// Baleen is essentially a wrapper for a watermill router that configures different
// event handlers depending on the context of the process. Calling Run() will start
// the Baleen service, which will handle incoming events and dispatch new events.
type Baleen struct {
router *message.Router
conf config.Config
publisher message.Publisher
subscriber message.Subscriber
}
func New(conf config.Config) (svc *Baleen, err error) {
if conf.IsZero() {
if conf, err = config.New(); err != nil {
return nil, err
}
}
// Configure logging (will modify logging globally for all packages!)
zerolog.SetGlobalLevel(conf.GetLogLevel())
if conf.ConsoleLog {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}
svc = &Baleen{
conf: conf,
}
var logger watermill.LoggerAdapter = logger.New()
if svc.router, err = message.NewRouter(conf.RouterConfig(), logger); err != nil {
return nil, err
}
// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
// You can also close the router by just calling `r.Close()`.
svc.router.AddPlugin(plugin.SignalsHandler)
// Router level middleware are executed for every message sent to the router
svc.router.AddMiddleware(
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,
// The handler function is retried if it returns an error.
// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
// TODO: configure max retries from environment
middleware.Retry{
MaxRetries: 0,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer handles panics from handlers.
// In this case, it passes them as errors to the Retry middleware.
middleware.Recoverer,
)
if svc.publisher, err = CreatePublisher(conf.Publisher, logger); err != nil {
return nil, err
}
if svc.subscriber, err = CreateSubscriber(conf.Subscriber, logger); err != nil {
return nil, err
}
// Add Handlers
if conf.FeedSync.Enabled {
if err = svc.AddFeedSync(conf.FeedSync, svc.publisher); err != nil {
return nil, err
}
}
if conf.PostFetch.Enabled {
if err = svc.AddPostFetch(conf.PostFetch); err != nil {
return nil, err
}
}
return svc, nil
}
func (s *Baleen) Run(ctx context.Context) error {
// Run the metrics server if it is enabled for Prometheus
if s.conf.Monitoring.Enabled {
if err := metrics.Serve(s.conf.Monitoring); err != nil {
return err
}
}
return s.router.Run(ctx)
}
func (s *Baleen) Close() error {
// Shutdown the metrics server if it was enabled
if s.conf.Monitoring.Enabled {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if err := metrics.Shutdown(ctx); err != nil {
log.Error().Err(err).Msg("could not gracefully shutdown metrics server")
}
}
return s.router.Close()
}