Skip to content

Commit

Permalink
Convert event-handler to Slog (#47932)
Browse files Browse the repository at this point in the history
* Start converting event handler to slog

* Finish removing logrus calls from event handler

* Initialize loggerts

* Write slog.Logger thru

* Pass logger to a.config.Dump

* Fix missing logger in test
  • Loading branch information
strideynet authored Oct 25, 2024
1 parent 91bb17a commit 6c6ddb6
Show file tree
Hide file tree
Showing 14 changed files with 210 additions and 133 deletions.
55 changes: 30 additions & 25 deletions integrations/event-handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package main

import (
"context"
"log/slog"
"path/filepath"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/backoff"
"github.com/gravitational/teleport/integrations/lib/logger"
"github.com/gravitational/teleport/lib/integrations/diagnostics"
)

Expand All @@ -45,6 +44,8 @@ type App struct {
eventsJob *EventsJob
// sessionEventsJob represents session events consumer job
sessionEventsJob *SessionEventsJob
// log is the logger to use.
log *slog.Logger
// Process
*lib.Process
}
Expand All @@ -59,8 +60,8 @@ const (
)

// NewApp creates new app instance
func NewApp(c *StartCmdConfig) (*App, error) {
app := &App{Config: c}
func NewApp(c *StartCmdConfig, log *slog.Logger) (*App, error) {
app := &App{Config: c, log: log}

app.eventsJob = NewEventsJob(app)
app.sessionEventsJob = NewSessionEventsJob(app)
Expand Down Expand Up @@ -107,8 +108,6 @@ func (a *App) WaitReady(ctx context.Context) (bool, error) {

// SendEvent sends an event to fluentd. Shared method used by jobs.
func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error {
log := logger.Get(ctx)

if !a.Config.DryRun {
backoff := backoff.NewDecorr(sendBackoffBase, sendBackoffMax, clockwork.NewRealClock())
backoffCount := sendBackoffNumTries
Expand All @@ -119,7 +118,7 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error
break
}

log.Debug("Error sending event to fluentd: ", err)
a.log.DebugContext(ctx, "Error sending event to fluentd", "error", err)

bErr := backoff.Do(ctx)
if bErr != nil {
Expand All @@ -131,36 +130,45 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error
if lib.IsCanceled(err) {
return nil
}
log.WithFields(logrus.Fields{
"error": err.Error(), // omitting the stack trace (too verbose)
"attempts": sendBackoffNumTries,
}).Error("failed to send event to fluentd")
a.log.ErrorContext(
ctx,
"Failed to send event to fluentd",
"error", err,
"attempts", sendBackoffNumTries,
)
return trace.Wrap(err)
}
}
}

fields := logrus.Fields{"id": e.ID, "type": e.Type, "ts": e.Time, "index": e.Index}
fields := []slog.Attr{
slog.String("id", e.ID),
slog.String("type", e.Type),
slog.Time("ts", e.Time),
slog.Int64("index", e.Index),
}
if e.SessionID != "" {
fields["sid"] = e.SessionID
fields = append(fields, slog.String("sid", e.SessionID))
}

log.WithFields(fields).Debug("Event sent")
a.log.LogAttrs(
ctx, slog.LevelDebug, "Event sent",
fields...,
)

return nil
}

// init initializes application state
func (a *App) init(ctx context.Context) error {
a.Config.Dump(ctx)
a.Config.Dump(ctx, a.log)

var err error
a.client, err = newClient(ctx, a.Config)
a.client, err = newClient(ctx, a.log, a.Config)
if err != nil {
return trace.Wrap(err)
}

a.State, err = NewState(a.Config)
a.State, err = NewState(a.Config, a.log)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -170,7 +178,7 @@ func (a *App) init(ctx context.Context) error {
return trace.Wrap(err)
}

a.Fluentd, err = NewFluentdClient(&a.Config.FluentdConfig)
a.Fluentd, err = NewFluentdClient(&a.Config.FluentdConfig, a.log)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -180,15 +188,13 @@ func (a *App) init(ctx context.Context) error {

// setStartTime sets start time or fails if start time has changed from the last run
func (a *App) setStartTime(ctx context.Context, s *State) error {
log := logger.Get(ctx)

prevStartTime, err := s.GetStartTime()
if err != nil {
return trace.Wrap(err)
}

if prevStartTime == nil {
log.WithField("value", a.Config.StartTime).Debug("Setting start time")
a.log.DebugContext(ctx, "Setting start time", "value", a.Config.StartTime)

t := a.Config.StartTime
if t == nil {
Expand All @@ -210,14 +216,13 @@ func (a *App) setStartTime(ctx context.Context, s *State) error {

// RegisterSession registers new session
func (a *App) RegisterSession(ctx context.Context, e *TeleportEvent) {
log := logger.Get(ctx)
if err := a.sessionEventsJob.RegisterSession(ctx, e); err != nil {
log.Error("Registering session: ", err)
a.log.ErrorContext(ctx, "Registering session", "error", err)
}
}

func (a *App) Profile() {
if err := diagnostics.Profile(filepath.Join(a.Config.StorageDir, "profiles")); err != nil {
logrus.WithError(err).Warn("failed to capture profiles")
a.log.WarnContext(context.TODO(), "Failed to capture profiles", "error", err)
}
}
46 changes: 22 additions & 24 deletions integrations/event-handler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package main

import (
"context"
"log/slog"
"strings"
"time"

"github.com/alecthomas/kong"
"github.com/gravitational/trace"

"github.com/gravitational/teleport/integrations/lib/logger"
"github.com/gravitational/teleport/integrations/lib/stringset"

"github.com/gravitational/teleport/integrations/event-handler/lib"
Expand Down Expand Up @@ -242,42 +242,40 @@ func (c *StartCmdConfig) Validate() error {
}

// Dump dumps configuration values to the log
func (c *StartCmdConfig) Dump(ctx context.Context) {
log := logger.Get(ctx)

func (c *StartCmdConfig) Dump(ctx context.Context, log *slog.Logger) {
// Log configuration variables
log.WithField("batch", c.BatchSize).Info("Using batch size")
log.WithField("types", c.Types).Info("Using type filter")
log.WithField("skip-event-types", c.SkipEventTypes).Info("Using type exclude filter")
log.WithField("types", c.SkipSessionTypes).Info("Skipping session events of type")
log.WithField("value", c.StartTime).Info("Using start time")
log.WithField("timeout", c.Timeout).Info("Using timeout")
log.WithField("url", c.FluentdURL).Info("Using Fluentd url")
log.WithField("url", c.FluentdSessionURL).Info("Using Fluentd session url")
log.WithField("ca", c.FluentdCA).Info("Using Fluentd ca")
log.WithField("cert", c.FluentdCert).Info("Using Fluentd cert")
log.WithField("key", c.FluentdKey).Info("Using Fluentd key")
log.WithField("window-size", c.WindowSize).Info("Using window size")
log.InfoContext(ctx, "Using batch size", "batch", c.BatchSize)
log.InfoContext(ctx, "Using type filter", "types", c.Types)
log.InfoContext(ctx, "Using type exclude filter", "skip_event_types", c.SkipEventTypes)
log.InfoContext(ctx, "Skipping session events of type", "types", c.SkipSessionTypes)
log.InfoContext(ctx, "Using start time", "value", c.StartTime)
log.InfoContext(ctx, "Using timeout", "timeout", c.Timeout)
log.InfoContext(ctx, "Using Fluentd url", "url", c.FluentdURL)
log.InfoContext(ctx, "Using Fluentd session url", "url", c.FluentdSessionURL)
log.InfoContext(ctx, "Using Fluentd ca", "ca", c.FluentdCA)
log.InfoContext(ctx, "Using Fluentd cert", "cert", c.FluentdCert)
log.InfoContext(ctx, "Using Fluentd key", "key", c.FluentdKey)
log.InfoContext(ctx, "Using window size", "window_size", c.WindowSize)

if c.TeleportIdentityFile != "" {
log.WithField("file", c.TeleportIdentityFile).Info("Using Teleport identity file")
log.InfoContext(ctx, "Using Teleport identity file", "file", c.TeleportIdentityFile)
}
if c.TeleportRefreshEnabled {
log.WithField("interval", c.TeleportRefreshInterval).Info("Using Teleport identity file refresh")
log.InfoContext(ctx, "Using Teleport identity file refresh", "interval", c.TeleportRefreshInterval)
}

if c.TeleportKey != "" {
log.WithField("addr", c.TeleportAddr).Info("Using Teleport addr")
log.WithField("ca", c.TeleportCA).Info("Using Teleport CA")
log.WithField("cert", c.TeleportCert).Info("Using Teleport cert")
log.WithField("key", c.TeleportKey).Info("Using Teleport key")
log.InfoContext(ctx, "Using Teleport addr", "addr", c.TeleportAddr)
log.InfoContext(ctx, "Using Teleport CA", "ca", c.TeleportCA)
log.InfoContext(ctx, "Using Teleport cert", "cert", c.TeleportCert)
log.InfoContext(ctx, "Using Teleport key", "key", c.TeleportKey)
}

if c.LockEnabled {
log.WithField("count", c.LockFailedAttemptsCount).WithField("period", c.LockPeriod).Info("Auto-locking enabled")
log.InfoContext(ctx, "Auto-locking enabled", "count", c.LockFailedAttemptsCount, "period", c.LockPeriod)
}

if c.DryRun {
log.Warn("Dry run! Events are not sent to Fluentd. Separate storage is used.")
log.WarnContext(ctx, "Dry run! Events are not sent to Fluentd. Separate storage is used.")
}
}
3 changes: 2 additions & 1 deletion integrations/event-handler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"log/slog"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -131,7 +132,7 @@ func (s *EventHandlerSuite) startApp() {
t := s.T()
t.Helper()

app, err := NewApp(&s.appConfig)
app, err := NewApp(&s.appConfig, slog.Default())
require.NoError(t, err)

integration.RunAndWaitReady(s.T(), app)
Expand Down
31 changes: 14 additions & 17 deletions integrations/event-handler/events_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import (
"github.com/gravitational/trace"
limiter "github.com/sethvargo/go-limiter"
"github.com/sethvargo/go-limiter/memorystore"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
"github.com/gravitational/teleport/integrations/lib"
"github.com/gravitational/teleport/integrations/lib/logger"
"github.com/gravitational/teleport/lib/events/export"
)

Expand All @@ -50,8 +48,6 @@ func NewEventsJob(app *App) *EventsJob {

// run runs the event consumption logic
func (j *EventsJob) run(ctx context.Context) error {
log := logger.Get(ctx)

// Create cancellable context which handles app termination
ctx, cancel := context.WithCancel(ctx)
j.app.Process.OnTerminate(func(_ context.Context) error {
Expand All @@ -67,11 +63,11 @@ func (j *EventsJob) run(ctx context.Context) error {
for {
select {
case <-logTicker.C:
ll := log.WithField("events_per_minute", j.eventsProcessed.Swap(0))
ll := j.app.log.With("events_per_minute", j.eventsProcessed.Swap(0))
if td := j.targetDate.Load(); td != nil {
ll = ll.WithField("date", td.Format(time.DateOnly))
ll = ll.With("date", td.Format(time.DateOnly))
}
ll.Info("event processing")
ll.InfoContext(ctx, "Event processing")
case <-ctx.Done():
return
}
Expand All @@ -94,11 +90,14 @@ func (j *EventsJob) run(ctx context.Context) error {
for {
err := j.runPolling(ctx)
if err == nil || ctx.Err() != nil {
log.Debug("watch loop exiting")
j.app.log.DebugContext(ctx, "Watch loop exiting")
return trace.Wrap(err)
}

log.WithError(err).Error("unexpected error in watch loop. reconnecting in 5s...")
j.app.log.ErrorContext(
ctx, "Unexpected error in watch loop. Reconnecting in 5s...",
"error", err,
)

select {
case <-time.After(time.Second * 5):
Expand Down Expand Up @@ -187,7 +186,7 @@ func (j *EventsJob) runPolling(ctx context.Context) error {
date := exporter.GetCurrentDate()
j.targetDate.Store(&date)
if err := j.app.State.SetCursorV2State(exporter.GetState()); err != nil {
log.WithError(err).Error("Failed to save cursor_v2 values, will retry")
j.app.log.ErrorContext(ctx, "Failed to save cursor_v2 values, will retry", "error", err)
}
case <-ctx.Done():
exporter.Close()
Expand All @@ -213,8 +212,6 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

log := logger.Get(ctx)

lc, err := j.app.State.GetLegacyCursorValues()
if err != nil {
return trace.Wrap(err)
Expand All @@ -228,7 +225,9 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error {
lc.WindowStartTime = *st
}

eventWatcher := NewLegacyEventsWatcher(j.app.Config, j.app.client, *lc, j.handleEvent)
eventWatcher := NewLegacyEventsWatcher(
j.app.Config, j.app.client, *lc, j.handleEvent, j.app.log,
)

// periodically sync cursor values to disk
go func() {
Expand All @@ -246,7 +245,7 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error {
continue
}
if err := j.app.State.SetLegacyCursorValues(currentCursorValues); err != nil {
log.WithError(err).Error("Failed to save cursor values, will retry")
j.app.log.ErrorContext(ctx, "Failed to save cursor values, will retry", "error", err)
continue
}
lastCursorValues = currentCursorValues
Expand Down Expand Up @@ -320,8 +319,6 @@ func (j *EventsJob) TryLockUser(ctx context.Context, evt *TeleportEvent) error {
return nil
}

log := logger.Get(ctx)

_, _, _, ok, err := j.rl.Take(ctx, evt.FailedLoginData.Login)
if err != nil {
return trace.Wrap(err)
Expand All @@ -335,7 +332,7 @@ func (j *EventsJob) TryLockUser(ctx context.Context, evt *TeleportEvent) error {
return trace.Wrap(err)
}

log.WithField("data", evt.FailedLoginData).Info("User login is locked")
j.app.log.InfoContext(ctx, "User login is locked", "data", evt.FailedLoginData)

return nil
}
Loading

0 comments on commit 6c6ddb6

Please sign in to comment.