Skip to content

Commit

Permalink
feat: improve telemetry (#153)
Browse files Browse the repository at this point in the history
* feat(telemetry): use opencensus

* feat(telemetry): add opentelemetry

* feat(postgres): add wrapper functions

* feat(postgres): extract postgres client to pgc package

* feat(telemetry): add roundtripper for http client

* chore: remove unnecessary code

* feat(telemetry): add app metrics

* chore: bump up go version to 1.18

* chore: bump up golangci-lint to 1.50.1

* feat(otel): add otel collector in docker

* fix(otel): simplify otel collector config

* Update docs/docs/reference/server_configuration.md

Co-authored-by: Shivaprasad Bhat <[email protected]>

Co-authored-by: Shivaprasad Bhat <[email protected]>
  • Loading branch information
mabdh and spy16 authored Nov 25, 2022
1 parent 193787a commit 3b5647b
Show file tree
Hide file tree
Showing 77 changed files with 1,890 additions and 570 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18
- name: Install dependencies
run: go mod tidy
- name: Test end-to-end
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
skip-go-installation: true
version: v1.41.1
version: v1.50.1
args: --timeout=10m
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ jobs:
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Set up Go 1.16
- name: Set up Go 1.18
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18
id: go
- name: Install dependencies
run: sudo apt-get install build-essential
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Explore the following resources to get started with Siren:
Siren requires the following dependencies:

- Docker
- Golang (version 1.16 or above)
- Golang (version 1.18 or above)
- Git

Run the application dependencies using Docker:
Expand Down
3 changes: 2 additions & 1 deletion cli/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/odpf/siren/core/template"
"github.com/odpf/siren/internal/api"
"github.com/odpf/siren/internal/store/postgres"
"github.com/odpf/siren/pkg/pgc"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/plugins/providers/cortex"
"github.com/odpf/siren/plugins/receivers/file"
Expand All @@ -24,7 +25,7 @@ import (
func InitAPIDeps(
logger log.Logger,
cfg config.Config,
pgClient *postgres.Client,
pgClient *pgc.Client,
encryptor *secret.Crypto,
queue notification.Queuer,
) (*api.Deps, map[string]notification.Notifier, error) {
Expand Down
23 changes: 16 additions & 7 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"time"

"github.com/MakeNowJust/heredoc"
"github.com/newrelic/go-agent/v3/newrelic"
"github.com/odpf/salt/db"
"github.com/odpf/salt/log"
"github.com/odpf/salt/printer"
"github.com/odpf/siren/config"
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/internal/server"
"github.com/odpf/siren/internal/store/postgres"
"github.com/odpf/siren/pkg/pgc"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/pkg/telemetry"
"github.com/odpf/siren/pkg/worker"
Expand Down Expand Up @@ -112,7 +113,7 @@ func serverMigrateCommand() *cobra.Command {
return err
}

if err := postgres.Migrate(cfg.DB); err != nil {
if err := pgc.Migrate(cfg.DB); err != nil {
return err
}
printer.Success("Migration done")
Expand All @@ -127,19 +128,23 @@ func serverMigrateCommand() *cobra.Command {
}

func StartServer(ctx context.Context, cfg config.Config) error {
nr, err := telemetry.New(cfg.NewRelic)
logger := initLogger(cfg.Log)

telemetry.Init(ctx, cfg.Telemetry, logger)
nrApp, err := newrelic.NewApplication(
newrelic.ConfigAppName(cfg.Telemetry.ServiceName),
newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey),
)
if err != nil {
return err
}

logger := initLogger(cfg.Log)

dbClient, err := db.New(cfg.DB)
if err != nil {
return err
}

pgClient, err := postgres.NewClient(logger, dbClient)
pgClient, err := pgc.NewClient(logger, dbClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -204,7 +209,7 @@ func StartServer(ctx context.Context, cfg config.Config) error {
ctx,
cfg.Service,
logger,
nr,
nrApp,
apiDeps,
)

Expand All @@ -226,6 +231,10 @@ func StartServer(ctx context.Context, cfg config.Config) error {
logger.Error("error stopping dlq", "error", err)
}

if err := pgClient.Close(); err != nil {
return err
}

return err
}

Expand Down
6 changes: 3 additions & 3 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/odpf/salt/db"
"github.com/odpf/siren/config"
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/internal/store/postgres"
"github.com/odpf/siren/pkg/pgc"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/pkg/worker"
"github.com/odpf/siren/plugins/queues"
Expand Down Expand Up @@ -85,7 +85,7 @@ func workerStartNotificationHandlerCommand() *cobra.Command {
return err
}

pgClient, err := postgres.NewClient(logger, dbClient)
pgClient, err := pgc.NewClient(logger, dbClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func workerStartNotificationDLQHandlerCommand() *cobra.Command {
return err
}

pgClient, err := postgres.NewClient(logger, dbClient)
pgClient, err := pgc.NewClient(logger, dbClient)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ type Log struct {

// Config contains the application configuration
type Config struct {
DB db.Config `mapstructure:"db"`
NewRelic telemetry.NewRelicConfig `mapstructure:"newrelic" yaml:"newrelic"`
Service server.Config `mapstructure:"service" yaml:"service"`
Log Log `mapstructure:"log" yaml:"log"`
Providers providers.Config `mapstructure:"providers" yaml:"providers"`
Receivers receivers.Config `mapstructure:"receivers" yaml:"receivers"`
Notification notification.Config `mapstructure:"notification" yaml:"notification"`
DB db.Config `mapstructure:"db"`
Telemetry telemetry.Config `mapstructure:"telemetry" yaml:"telemetry"`
Service server.Config `mapstructure:"service" yaml:"service"`
Log Log `mapstructure:"log" yaml:"log"`
Providers providers.Config `mapstructure:"providers" yaml:"providers"`
Receivers receivers.Config `mapstructure:"receivers" yaml:"receivers"`
Notification notification.Config `mapstructure:"notification" yaml:"notification"`
}
29 changes: 28 additions & 1 deletion core/notification/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/odpf/salt/log"
"github.com/odpf/siren/pkg/errors"
"github.com/odpf/siren/pkg/telemetry"
"go.opencensus.io/tag"
)

const (
Expand All @@ -20,6 +22,7 @@ type Handler struct {
identifier string
notifierRegistry map[string]Notifier
supportedReceiverTypes []string
messagingTracer *telemetry.MessagingTracer

batchSize int
}
Expand Down Expand Up @@ -64,6 +67,8 @@ func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[str
opt(h)
}

h.messagingTracer = telemetry.NewMessagingTracer(q.Type())

return h
}

Expand All @@ -80,6 +85,9 @@ func (h *Handler) Process(ctx context.Context, runAt time.Time) error {
if len(receiverTypes) == 0 {
return errors.New("no receiver type plugin registered, skipping dequeue")
} else {
ctx, span := h.messagingTracer.StartSpan(ctx, "batch_dequeue", nil)
defer span.End()

h.logger.Debug("dequeueing and publishing messages", "scope", "notification.handler", "receivers", receiverTypes, "batch size", h.batchSize, "running_at", runAt, "id", h.identifier)
if err := h.q.Dequeue(ctx, receiverTypes, h.batchSize, h.MessageHandler); err != nil {
if errors.Is(err, ErrNoMessage) {
Expand All @@ -95,17 +103,29 @@ func (h *Handler) Process(ctx context.Context, runAt time.Time) error {
// MessageHandler is a function to handler dequeued message
func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error {
for _, message := range messages {

notifier, err := h.getNotifierPlugin(message.ReceiverType)
if err != nil {
return err
}

message.MarkPending(time.Now())

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessagePending,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

newConfig, err := notifier.PostHookQueueTransformConfigs(ctx, message.Configs)
if err != nil {
message.MarkFailed(time.Now(), false, err)

telemetry.IncrementInt64Counter(ctx, telemetry.MetricReceiverPostHookQueueFailed,
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageFailed,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

if err := h.q.ErrorCallback(ctx, message); err != nil {
return err
}
Expand All @@ -114,9 +134,12 @@ func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error
message.Configs = newConfig

if retryable, err := notifier.Send(ctx, message); err != nil {

message.MarkFailed(time.Now(), retryable, err)

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageFailed,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

if err := h.q.ErrorCallback(ctx, message); err != nil {
return err
}
Expand All @@ -125,6 +148,10 @@ func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

message.MarkPublished(time.Now())

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessagePublished,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

if err := h.q.SuccessCallback(ctx, message); err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions core/notification/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func TestHandler_MessageHandler(t *testing.T) {
ReceiverType: "random",
},
},
setup: func(q *mocks.Queuer, n *mocks.Notifier) {
q.EXPECT().Type().Return("postgresql")
},
wantErr: true,
},
{
Expand All @@ -37,6 +40,7 @@ func TestHandler_MessageHandler(t *testing.T) {
},
},
setup: func(q *mocks.Queuer, n *mocks.Notifier) {
q.EXPECT().Type().Return("postgresql")
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("map[string]interface {}")).Return(nil, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
},
Expand All @@ -50,6 +54,7 @@ func TestHandler_MessageHandler(t *testing.T) {
},
},
setup: func(q *mocks.Queuer, n *mocks.Notifier) {
q.EXPECT().Type().Return("postgresql")
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("map[string]interface {}")).Return(nil, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(errors.New("some error"))
},
Expand All @@ -63,6 +68,7 @@ func TestHandler_MessageHandler(t *testing.T) {
},
},
setup: func(q *mocks.Queuer, n *mocks.Notifier) {
q.EXPECT().Type().Return("postgresql")
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("map[string]interface {}")).Return(map[string]interface{}{}, nil)
n.EXPECT().Send(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(false, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(errors.New("some error"))
Expand All @@ -77,6 +83,7 @@ func TestHandler_MessageHandler(t *testing.T) {
},
},
setup: func(q *mocks.Queuer, n *mocks.Notifier) {
q.EXPECT().Type().Return("postgresql")
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("map[string]interface {}")).Return(map[string]interface{}{}, nil)
n.EXPECT().Send(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(false, errors.New("some error"))
q.EXPECT().ErrorCallback(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
Expand All @@ -91,6 +98,7 @@ func TestHandler_MessageHandler(t *testing.T) {
},
},
setup: func(q *mocks.Queuer, n *mocks.Notifier) {
q.EXPECT().Type().Return("postgresql")
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("map[string]interface {}")).Return(map[string]interface{}{}, nil)
n.EXPECT().Send(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(false, nil)
q.EXPECT().SuccessCallback(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(errors.New("some error"))
Expand All @@ -105,6 +113,7 @@ func TestHandler_MessageHandler(t *testing.T) {
},
},
setup: func(q *mocks.Queuer, n *mocks.Notifier) {
q.EXPECT().Type().Return("postgresql")
n.EXPECT().PostHookQueueTransformConfigs(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("map[string]interface {}")).Return(map[string]interface{}{}, nil)
n.EXPECT().Send(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(false, nil)
q.EXPECT().SuccessCallback(mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("notification.Message")).Return(nil)
Expand Down
5 changes: 5 additions & 0 deletions core/notification/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
MessageStatusPublished MessageStatus = "published"
)

func (ms MessageStatus) String() string {
return string(ms)
}

// MessageOption provides ability to configure the message initialization
type MessageOption func(*Message)

Expand Down Expand Up @@ -96,6 +100,7 @@ func (m *Message) Initialize(
for k, v := range n.Data {
details[k] = v
}

m.Details = details

m.MaxTries = DefaultMaxTries
Expand Down
3 changes: 2 additions & 1 deletion core/notification/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestMessage_Initialize(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
m := &notification.Message{}
m.Initialize(tc.n,
m.Initialize(
tc.n,
tc.receiverType,
tc.notificationConfigs,
notification.InitWithID(testID),
Expand Down
Loading

0 comments on commit 3b5647b

Please sign in to comment.