Skip to content

Commit

Permalink
feat: instrument service with OpenTelemetry (#47)
Browse files Browse the repository at this point in the history
- Add Config.Telemetry for configuring Newrelic and OpenTelemetry.
- Deprecate Config.Newrelic - recommended to use
  Config.Telemetry.Newrelic instead.
- Deprecate Newrelic.Appname - recommended to use
  Config.Telemetry.AppName instead.
- Instrument the following:
  - internal/server/v1beta1.APIServer: Counter for compass.asset.update
    with the following attributes:
    - compass.update_method: asset_upsert_patch/asset_upsert/
      asset_upsert_patch_without_lineage.
    - asset.type: job/table etc.
    - asset.service: caramlstore/firehose etc.
    - operation.success: true/false.
  - core/asset/service.Service: Counter for compass.asset.operation with
    the following attributes:
    - compass.asset_operation: GetAssetByID/GetAssetByVersion/
      DeleteAsset.
    - asset.identifier: UUID/URN
    - operation.success: true/false.
  - internal/store/elasticsearch.DiscoveryRepository: Histogram for
    compass.es.client.duration with the following attributes:
    - es.operation: search/index_exists/create_index/index/
      delete_by_query.
    - es.status_code: ok/mapper_parsing_exception/
      cluster_block_exception.
    - compass.discovery_operation: Search/GroupAssets/Suggest/Upsert/
      DeleteByID/DeleteByURN.
- Integrate OpenTelemetry libraries:
  - https://github.com/open-telemetry/opentelemetry-go-contrib/tree/main/instrumentation/host
  - https://github.com/open-telemetry/opentelemetry-go-contrib/tree/main/instrumentation/google.golang.org/grpc/otelgrpc
  - https://github.com/nhatthm/otelsql
  - https://github.com/open-telemetry/opentelemetry-go-contrib/tree/main/instrumentation/runtime

Co-authored-by: Haveiss <[email protected]>
  • Loading branch information
sudo-suhas and haveiss authored Aug 2, 2023
1 parent b70fd31 commit 94d5baf
Show file tree
Hide file tree
Showing 29 changed files with 855 additions and 203 deletions.
21 changes: 17 additions & 4 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
esStore "github.com/goto/compass/internal/store/elasticsearch"
"github.com/goto/compass/internal/store/postgres"
"github.com/goto/compass/internal/workermanager"
"github.com/goto/compass/pkg/metrics"
"github.com/goto/compass/pkg/statsd"
"github.com/goto/compass/pkg/telemetry"
"github.com/goto/salt/cmdx"
"github.com/goto/salt/config"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -80,11 +80,14 @@ type Config struct {
// Log
LogLevel string `yaml:"log_level" mapstructure:"log_level" default:"info"`

// OpenTelemetry and Newrelic
Telemetry telemetry.Config `mapstructure:"telemetry"`

// StatsD
StatsD statsd.Config `mapstructure:"statsd"`

// NewRelic
NewRelic metrics.NewRelicConfig `mapstructure:"newrelic"`
// Deprecated: Use Config.Telemetry instead
NewRelic telemetry.NewRelicConfig `mapstructure:"newrelic"`

// Elasticsearch
Elasticsearch esStore.Config `mapstructure:"elasticsearch"`
Expand All @@ -104,10 +107,20 @@ type Config struct {

func LoadConfig() (*Config, error) {
var cfg Config
defer func() {
if cfg.NewRelic != (telemetry.NewRelicConfig{}) && cfg.Telemetry.NewRelic == (telemetry.NewRelicConfig{}) {
cfg.Telemetry.NewRelic = cfg.NewRelic
}
}()

err := LoadFromCurrentDir(&cfg)

if errors.As(err, &config.ConfigFileNotFoundError{}) {
err := cmdx.SetConfig("compass").Load(&cfg)
err := cmdx.SetConfig("compass").
Load(&cfg, cmdx.WithLoaderOptions(
config.WithEnvKeyReplacer(".", "_"),
config.WithEnvPrefix("COMPASS"),
))
if err != nil {
if errors.As(err, &config.ConfigFileNotFoundError{}) {
return &cfg, ErrConfigNotFound
Expand Down
50 changes: 18 additions & 32 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/goto/compass/internal/store/postgres"
"github.com/goto/compass/internal/workermanager"
"github.com/goto/compass/pkg/statsd"
"github.com/goto/compass/pkg/telemetry"
"github.com/goto/salt/log"
"github.com/newrelic/go-agent/v3/newrelic"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -82,25 +82,28 @@ func serverMigrateCommand(cfg *Config) *cobra.Command {
return c
}

func runServer(ctx context.Context, config *Config) error {
logger := initLogger(config.LogLevel)
func runServer(ctx context.Context, cfg *Config) error {
logger := initLogger(cfg.LogLevel)
logger.Info("compass starting", "version", Version)

nrApp, err := initNewRelicMonitor(config, logger)
nrApp, cleanUp, err := telemetry.Init(ctx, cfg.Telemetry, logger)
if err != nil {
return err
}
statsdReporter, err := statsd.Init(logger, config.StatsD)

defer cleanUp()

statsdReporter, err := statsd.Init(logger, cfg.StatsD)
if err != nil {
return err
}

esClient, err := initElasticsearch(logger, config.Elasticsearch)
esClient, err := initElasticsearch(logger, cfg.Elasticsearch)
if err != nil {
return err
}

pgClient, err := initPostgres(logger, config)
pgClient, err := initPostgres(ctx, logger, cfg)
if err != nil {
return err
}
Expand All @@ -124,7 +127,7 @@ func runServer(ctx context.Context, config *Config) error {
}
userService := user.NewService(logger, userRepository, user.ServiceWithStatsDReporter(statsdReporter))

assetRepository, err := postgres.NewAssetRepository(pgClient, userRepository, 0, config.Service.Identity.ProviderDefaultName)
assetRepository, err := postgres.NewAssetRepository(pgClient, userRepository, 0, cfg.Service.Identity.ProviderDefaultName)
if err != nil {
return fmt.Errorf("create new asset repository: %w", err)
}
Expand All @@ -135,7 +138,7 @@ func runServer(ctx context.Context, config *Config) error {
}

wrkr, err := initAssetWorker(ctx, workermanager.Deps{
Config: config.Worker,
Config: cfg.Worker,
DiscoveryRepo: discoveryRepository,
Logger: logger,
})
Expand Down Expand Up @@ -172,7 +175,7 @@ func runServer(ctx context.Context, config *Config) error {

return compassserver.Serve(
ctx,
config.Service,
cfg.Service,
logger,
pgClient,
nrApp,
Expand Down Expand Up @@ -207,8 +210,8 @@ func initElasticsearch(logger log.Logger, config esStore.Config) (*esStore.Clien
return esClient, nil
}

func initPostgres(logger log.Logger, config *Config) (*postgres.Client, error) {
pgClient, err := postgres.NewClient(config.DB)
func initPostgres(ctx context.Context, logger log.Logger, config *Config) (*postgres.Client, error) {
pgClient, err := postgres.NewClient(ctx, config.DB)
if err != nil {
return nil, fmt.Errorf("error creating postgres client: %w", err)
}
Expand All @@ -217,23 +220,6 @@ func initPostgres(logger log.Logger, config *Config) (*postgres.Client, error) {
return pgClient, nil
}

func initNewRelicMonitor(config *Config, logger log.Logger) (*newrelic.Application, error) {
if !config.NewRelic.Enabled {
logger.Info("New Relic monitoring is disabled.")
return nil, nil
}
app, err := newrelic.NewApplication(
newrelic.ConfigAppName(config.NewRelic.AppName),
newrelic.ConfigLicense(config.NewRelic.LicenseKey),
)
if err != nil {
return nil, fmt.Errorf("unable to create New Relic Application: %w", err)
}
logger.Info("New Relic monitoring is enabled for", "config", config.NewRelic.AppName)

return app, nil
}

func initAssetWorker(ctx context.Context, deps workermanager.Deps) (asset.Worker, error) {
if !deps.Config.Enabled {
return workermanager.NewInSituWorker(deps), nil
Expand All @@ -254,18 +240,18 @@ func runMigrations(ctx context.Context, config *Config) error {
logger.Info("compass is migrating", "version", Version)

logger.Info("Migrating Postgres...")
if err := migratePostgres(logger, config); err != nil {
if err := migratePostgres(ctx, logger, config); err != nil {
return err
}
logger.Info("Migration Postgres done.")

return nil
}

func migratePostgres(logger log.Logger, config *Config) (err error) {
func migratePostgres(ctx context.Context, logger log.Logger, config *Config) (err error) {
logger.Info("Initiating Postgres client...")

pgClient, err := postgres.NewClient(config.DB)
pgClient, err := postgres.NewClient(ctx, config.DB)
if err != nil {
logger.Error("failed to prepare migration", "error", err)
return err
Expand Down
14 changes: 10 additions & 4 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"fmt"

"github.com/MakeNowJust/heredoc"
esStore "github.com/goto/compass/internal/store/elasticsearch"
"github.com/goto/compass/internal/store/elasticsearch"
"github.com/goto/compass/internal/workermanager"
"github.com/goto/compass/pkg/telemetry"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -53,16 +54,21 @@ func runWorker(ctx context.Context, cfg *Config) error {
logger := initLogger(cfg.LogLevel)
logger.Info("Compass worker starting", "version", Version)

esClient, err := initElasticsearch(logger, cfg.Elasticsearch)
_, cleanUp, err := telemetry.Init(ctx, cfg.Telemetry, logger)
if err != nil {
return err
}

discoveryRepo := esStore.NewDiscoveryRepository(esClient, logger)
defer cleanUp()

esClient, err := initElasticsearch(logger, cfg.Elasticsearch)
if err != nil {
return err
}

mgr, err := workermanager.New(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: discoveryRepo,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger),
Logger: logger,
})
if err != nil {
Expand Down
24 changes: 23 additions & 1 deletion compass.yaml.example
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
log_level: info

telemetry:
# app_name to be used for all telemetry tags (NewRelic & OpenTelemetry).
app_name: compass-local

newrelic:
enabled: false
licensekey: ____LICENSE_STRING_OF_40_CHARACTERS_____

open_telemetry:
# Setting this to false will disable both traces and metrics.
enabled: false

# Address of the OpenTelemetry Collector gRPC receiver.
collector_addr: "localhost:4317"

# Configures the intervening time between exports for periodically
# collected metrics.
periodic_read_interval: 15s

# Probability of a trace being included in the published sample.
# 1 means always.
trace_sample_probability: 1

statsd:
enabled: false
address: 127.0.0.1:8125
prefix: compass

newrelic:
enabled: false
appname: compass
licensekey: ____LICENSE_STRING_OF_40_CHARACTERS_____

elasticsearch:
Expand Down
12 changes: 8 additions & 4 deletions core/asset/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func (err InvalidError) Error() string {
}

type DiscoveryError struct {
Op string
ID string
Index string
Err error
Op string
ID string
Index string
ESCode string
Err error
}

func (err DiscoveryError) Error() string {
Expand All @@ -56,6 +57,9 @@ func (err DiscoveryError) Error() string {
if err.Index != "" {
s.WriteString("index '" + err.Index + "': ")
}
if err.ESCode != "" {
s.WriteString("elasticsearch code '" + err.ESCode + "': ")
}
s.WriteString(err.Err.Error())
return s.String()
}
54 changes: 46 additions & 8 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import (
"fmt"

"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

type Service struct {
assetRepository Repository
discoveryRepository DiscoveryRepository
lineageRepository LineageRepository
worker Worker

assetOpCounter metric.Int64Counter
}

//go:generate mockery --name=Worker -r --case underscore --with-expecter --structname Worker --filename worker_mock.go --output=./mocks
Expand All @@ -30,11 +35,19 @@ type ServiceDeps struct {
}

func NewService(deps ServiceDeps) *Service {
assetOpCounter, err := otel.Meter("github.com/goto/compass/core/asset").
Int64Counter("compass.asset.operation")
if err != nil {
otel.Handle(err)
}

return &Service{
assetRepository: deps.AssetRepo,
discoveryRepository: deps.DiscoveryRepo,
lineageRepository: deps.LineageRepo,
worker: deps.Worker,

assetOpCounter: assetOpCounter,
}
}

Expand Down Expand Up @@ -82,29 +95,37 @@ func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (st
return assetID, nil
}

func (s *Service) DeleteAsset(ctx context.Context, id string) error {
func (s *Service) DeleteAsset(ctx context.Context, id string) (err error) {
defer func() {
s.instrumentAssetOp(ctx, "DeleteAsset", id, err)
}()

urn := id
if isValidUUID(id) {
asset, err := s.assetRepository.GetByID(ctx, id)
if err != nil {
return err
}

return s.DeleteAsset(ctx, asset.URN)
urn = asset.URN
}

if err := s.assetRepository.DeleteByURN(ctx, id); err != nil {
if err := s.assetRepository.DeleteByURN(ctx, urn); err != nil {
return err
}

if err := s.worker.EnqueueDeleteAssetJob(ctx, id); err != nil {
if err := s.worker.EnqueueDeleteAssetJob(ctx, urn); err != nil {
return err
}

return s.lineageRepository.DeleteByURN(ctx, id)
return s.lineageRepository.DeleteByURN(ctx, urn)
}

func (s *Service) GetAssetByID(ctx context.Context, id string) (Asset, error) {
var ast Asset
func (s *Service) GetAssetByID(ctx context.Context, id string) (ast Asset, err error) {
defer func() {
s.instrumentAssetOp(ctx, "GetAssetByID", id, err)
}()

if isValidUUID(id) {
var err error
ast, err = s.assetRepository.GetByID(ctx, id)
Expand All @@ -129,7 +150,11 @@ func (s *Service) GetAssetByID(ctx context.Context, id string) (Asset, error) {
return ast, nil
}

func (s *Service) GetAssetByVersion(ctx context.Context, id, version string) (Asset, error) {
func (s *Service) GetAssetByVersion(ctx context.Context, id, version string) (ast Asset, err error) {
defer func() {
s.instrumentAssetOp(ctx, "GetAssetByVersion", id, err)
}()

if isValidUUID(id) {
return s.assetRepository.GetByVersionWithID(ctx, id, version)
}
Expand Down Expand Up @@ -191,6 +216,19 @@ func (s *Service) SuggestAssets(ctx context.Context, cfg SearchConfig) (suggesti
return s.discoveryRepository.Suggest(ctx, cfg)
}

func (s *Service) instrumentAssetOp(ctx context.Context, op, id string, err error) {
identifier := "URN"
if isValidUUID(id) {
identifier = "ID"
}

s.assetOpCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("compass.asset_operation", op),
attribute.String("asset.identifier", identifier),
attribute.Bool("operation.success", err == nil),
))
}

func isValidUUID(u string) bool {
_, err := uuid.Parse(u)
return err == nil
Expand Down
Loading

0 comments on commit 94d5baf

Please sign in to comment.