From 6c6ddb67463524cca8b399b672bb9a59d1c826ae Mon Sep 17 00:00:00 2001 From: Noah Stride Date: Fri, 25 Oct 2024 16:18:05 +0100 Subject: [PATCH] Convert `event-handler` to Slog (#47932) * Start converting event handler to slog * Finish removing logrus calls from event handler * Initialize loggerts * Write slog.Logger thru * Pass logger to a.config.Dump * Fix missing logger in test --- integrations/event-handler/app.go | 55 +++++++------- integrations/event-handler/cli.go | 46 ++++++------ .../event-handler/event_handler_test.go | 3 +- integrations/event-handler/events_job.go | 31 ++++---- integrations/event-handler/fluentd_client.go | 9 +-- integrations/event-handler/go.mod | 2 +- integrations/event-handler/helpers.go | 8 +-- .../event-handler/legacy_events_watcher.go | 55 +++++++++----- .../legacy_events_watcher_test.go | 3 +- integrations/event-handler/main.go | 25 +++++-- .../event-handler/session_events_job.go | 71 +++++++++++++------ .../event-handler/session_events_job_test.go | 2 + integrations/event-handler/state.go | 26 ++++--- integrations/event-handler/state_test.go | 7 +- 14 files changed, 210 insertions(+), 133 deletions(-) diff --git a/integrations/event-handler/app.go b/integrations/event-handler/app.go index 08fa5aabb881f..6064fc5d645c5 100644 --- a/integrations/event-handler/app.go +++ b/integrations/event-handler/app.go @@ -18,16 +18,15 @@ package main import ( "context" + "log/slog" "path/filepath" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport/integrations/lib" "github.com/gravitational/teleport/integrations/lib/backoff" - "github.com/gravitational/teleport/integrations/lib/logger" "github.com/gravitational/teleport/lib/integrations/diagnostics" ) @@ -45,6 +44,8 @@ type App struct { eventsJob *EventsJob // sessionEventsJob represents session events consumer job sessionEventsJob *SessionEventsJob + // log is the logger to use. + log *slog.Logger // Process *lib.Process } @@ -59,8 +60,8 @@ const ( ) // NewApp creates new app instance -func NewApp(c *StartCmdConfig) (*App, error) { - app := &App{Config: c} +func NewApp(c *StartCmdConfig, log *slog.Logger) (*App, error) { + app := &App{Config: c, log: log} app.eventsJob = NewEventsJob(app) app.sessionEventsJob = NewSessionEventsJob(app) @@ -107,8 +108,6 @@ func (a *App) WaitReady(ctx context.Context) (bool, error) { // SendEvent sends an event to fluentd. Shared method used by jobs. func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error { - log := logger.Get(ctx) - if !a.Config.DryRun { backoff := backoff.NewDecorr(sendBackoffBase, sendBackoffMax, clockwork.NewRealClock()) backoffCount := sendBackoffNumTries @@ -119,7 +118,7 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error break } - log.Debug("Error sending event to fluentd: ", err) + a.log.DebugContext(ctx, "Error sending event to fluentd", "error", err) bErr := backoff.Do(ctx) if bErr != nil { @@ -131,36 +130,45 @@ func (a *App) SendEvent(ctx context.Context, url string, e *TeleportEvent) error if lib.IsCanceled(err) { return nil } - log.WithFields(logrus.Fields{ - "error": err.Error(), // omitting the stack trace (too verbose) - "attempts": sendBackoffNumTries, - }).Error("failed to send event to fluentd") + a.log.ErrorContext( + ctx, + "Failed to send event to fluentd", + "error", err, + "attempts", sendBackoffNumTries, + ) return trace.Wrap(err) } } } - fields := logrus.Fields{"id": e.ID, "type": e.Type, "ts": e.Time, "index": e.Index} + fields := []slog.Attr{ + slog.String("id", e.ID), + slog.String("type", e.Type), + slog.Time("ts", e.Time), + slog.Int64("index", e.Index), + } if e.SessionID != "" { - fields["sid"] = e.SessionID + fields = append(fields, slog.String("sid", e.SessionID)) } - - log.WithFields(fields).Debug("Event sent") + a.log.LogAttrs( + ctx, slog.LevelDebug, "Event sent", + fields..., + ) return nil } // init initializes application state func (a *App) init(ctx context.Context) error { - a.Config.Dump(ctx) + a.Config.Dump(ctx, a.log) var err error - a.client, err = newClient(ctx, a.Config) + a.client, err = newClient(ctx, a.log, a.Config) if err != nil { return trace.Wrap(err) } - a.State, err = NewState(a.Config) + a.State, err = NewState(a.Config, a.log) if err != nil { return trace.Wrap(err) } @@ -170,7 +178,7 @@ func (a *App) init(ctx context.Context) error { return trace.Wrap(err) } - a.Fluentd, err = NewFluentdClient(&a.Config.FluentdConfig) + a.Fluentd, err = NewFluentdClient(&a.Config.FluentdConfig, a.log) if err != nil { return trace.Wrap(err) } @@ -180,15 +188,13 @@ func (a *App) init(ctx context.Context) error { // setStartTime sets start time or fails if start time has changed from the last run func (a *App) setStartTime(ctx context.Context, s *State) error { - log := logger.Get(ctx) - prevStartTime, err := s.GetStartTime() if err != nil { return trace.Wrap(err) } if prevStartTime == nil { - log.WithField("value", a.Config.StartTime).Debug("Setting start time") + a.log.DebugContext(ctx, "Setting start time", "value", a.Config.StartTime) t := a.Config.StartTime if t == nil { @@ -210,14 +216,13 @@ func (a *App) setStartTime(ctx context.Context, s *State) error { // RegisterSession registers new session func (a *App) RegisterSession(ctx context.Context, e *TeleportEvent) { - log := logger.Get(ctx) if err := a.sessionEventsJob.RegisterSession(ctx, e); err != nil { - log.Error("Registering session: ", err) + a.log.ErrorContext(ctx, "Registering session", "error", err) } } func (a *App) Profile() { if err := diagnostics.Profile(filepath.Join(a.Config.StorageDir, "profiles")); err != nil { - logrus.WithError(err).Warn("failed to capture profiles") + a.log.WarnContext(context.TODO(), "Failed to capture profiles", "error", err) } } diff --git a/integrations/event-handler/cli.go b/integrations/event-handler/cli.go index dc04edd4936e1..93b2cfcf78924 100644 --- a/integrations/event-handler/cli.go +++ b/integrations/event-handler/cli.go @@ -18,13 +18,13 @@ package main import ( "context" + "log/slog" "strings" "time" "github.com/alecthomas/kong" "github.com/gravitational/trace" - "github.com/gravitational/teleport/integrations/lib/logger" "github.com/gravitational/teleport/integrations/lib/stringset" "github.com/gravitational/teleport/integrations/event-handler/lib" @@ -242,42 +242,40 @@ func (c *StartCmdConfig) Validate() error { } // Dump dumps configuration values to the log -func (c *StartCmdConfig) Dump(ctx context.Context) { - log := logger.Get(ctx) - +func (c *StartCmdConfig) Dump(ctx context.Context, log *slog.Logger) { // Log configuration variables - log.WithField("batch", c.BatchSize).Info("Using batch size") - log.WithField("types", c.Types).Info("Using type filter") - log.WithField("skip-event-types", c.SkipEventTypes).Info("Using type exclude filter") - log.WithField("types", c.SkipSessionTypes).Info("Skipping session events of type") - log.WithField("value", c.StartTime).Info("Using start time") - log.WithField("timeout", c.Timeout).Info("Using timeout") - log.WithField("url", c.FluentdURL).Info("Using Fluentd url") - log.WithField("url", c.FluentdSessionURL).Info("Using Fluentd session url") - log.WithField("ca", c.FluentdCA).Info("Using Fluentd ca") - log.WithField("cert", c.FluentdCert).Info("Using Fluentd cert") - log.WithField("key", c.FluentdKey).Info("Using Fluentd key") - log.WithField("window-size", c.WindowSize).Info("Using window size") + log.InfoContext(ctx, "Using batch size", "batch", c.BatchSize) + log.InfoContext(ctx, "Using type filter", "types", c.Types) + log.InfoContext(ctx, "Using type exclude filter", "skip_event_types", c.SkipEventTypes) + log.InfoContext(ctx, "Skipping session events of type", "types", c.SkipSessionTypes) + log.InfoContext(ctx, "Using start time", "value", c.StartTime) + log.InfoContext(ctx, "Using timeout", "timeout", c.Timeout) + log.InfoContext(ctx, "Using Fluentd url", "url", c.FluentdURL) + log.InfoContext(ctx, "Using Fluentd session url", "url", c.FluentdSessionURL) + log.InfoContext(ctx, "Using Fluentd ca", "ca", c.FluentdCA) + log.InfoContext(ctx, "Using Fluentd cert", "cert", c.FluentdCert) + log.InfoContext(ctx, "Using Fluentd key", "key", c.FluentdKey) + log.InfoContext(ctx, "Using window size", "window_size", c.WindowSize) if c.TeleportIdentityFile != "" { - log.WithField("file", c.TeleportIdentityFile).Info("Using Teleport identity file") + log.InfoContext(ctx, "Using Teleport identity file", "file", c.TeleportIdentityFile) } if c.TeleportRefreshEnabled { - log.WithField("interval", c.TeleportRefreshInterval).Info("Using Teleport identity file refresh") + log.InfoContext(ctx, "Using Teleport identity file refresh", "interval", c.TeleportRefreshInterval) } if c.TeleportKey != "" { - log.WithField("addr", c.TeleportAddr).Info("Using Teleport addr") - log.WithField("ca", c.TeleportCA).Info("Using Teleport CA") - log.WithField("cert", c.TeleportCert).Info("Using Teleport cert") - log.WithField("key", c.TeleportKey).Info("Using Teleport key") + log.InfoContext(ctx, "Using Teleport addr", "addr", c.TeleportAddr) + log.InfoContext(ctx, "Using Teleport CA", "ca", c.TeleportCA) + log.InfoContext(ctx, "Using Teleport cert", "cert", c.TeleportCert) + log.InfoContext(ctx, "Using Teleport key", "key", c.TeleportKey) } if c.LockEnabled { - log.WithField("count", c.LockFailedAttemptsCount).WithField("period", c.LockPeriod).Info("Auto-locking enabled") + log.InfoContext(ctx, "Auto-locking enabled", "count", c.LockFailedAttemptsCount, "period", c.LockPeriod) } if c.DryRun { - log.Warn("Dry run! Events are not sent to Fluentd. Separate storage is used.") + log.WarnContext(ctx, "Dry run! Events are not sent to Fluentd. Separate storage is used.") } } diff --git a/integrations/event-handler/event_handler_test.go b/integrations/event-handler/event_handler_test.go index 70b2c2a935023..63efd444ce628 100644 --- a/integrations/event-handler/event_handler_test.go +++ b/integrations/event-handler/event_handler_test.go @@ -18,6 +18,7 @@ package main import ( "context" + "log/slog" "strings" "testing" "time" @@ -131,7 +132,7 @@ func (s *EventHandlerSuite) startApp() { t := s.T() t.Helper() - app, err := NewApp(&s.appConfig) + app, err := NewApp(&s.appConfig, slog.Default()) require.NoError(t, err) integration.RunAndWaitReady(s.T(), app) diff --git a/integrations/event-handler/events_job.go b/integrations/event-handler/events_job.go index 645866ec34752..4a26c3cc4e531 100644 --- a/integrations/event-handler/events_job.go +++ b/integrations/event-handler/events_job.go @@ -22,13 +22,11 @@ import ( "github.com/gravitational/trace" limiter "github.com/sethvargo/go-limiter" "github.com/sethvargo/go-limiter/memorystore" - log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/timestamppb" auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/integrations/lib" - "github.com/gravitational/teleport/integrations/lib/logger" "github.com/gravitational/teleport/lib/events/export" ) @@ -50,8 +48,6 @@ func NewEventsJob(app *App) *EventsJob { // run runs the event consumption logic func (j *EventsJob) run(ctx context.Context) error { - log := logger.Get(ctx) - // Create cancellable context which handles app termination ctx, cancel := context.WithCancel(ctx) j.app.Process.OnTerminate(func(_ context.Context) error { @@ -67,11 +63,11 @@ func (j *EventsJob) run(ctx context.Context) error { for { select { case <-logTicker.C: - ll := log.WithField("events_per_minute", j.eventsProcessed.Swap(0)) + ll := j.app.log.With("events_per_minute", j.eventsProcessed.Swap(0)) if td := j.targetDate.Load(); td != nil { - ll = ll.WithField("date", td.Format(time.DateOnly)) + ll = ll.With("date", td.Format(time.DateOnly)) } - ll.Info("event processing") + ll.InfoContext(ctx, "Event processing") case <-ctx.Done(): return } @@ -94,11 +90,14 @@ func (j *EventsJob) run(ctx context.Context) error { for { err := j.runPolling(ctx) if err == nil || ctx.Err() != nil { - log.Debug("watch loop exiting") + j.app.log.DebugContext(ctx, "Watch loop exiting") return trace.Wrap(err) } - log.WithError(err).Error("unexpected error in watch loop. reconnecting in 5s...") + j.app.log.ErrorContext( + ctx, "Unexpected error in watch loop. Reconnecting in 5s...", + "error", err, + ) select { case <-time.After(time.Second * 5): @@ -187,7 +186,7 @@ func (j *EventsJob) runPolling(ctx context.Context) error { date := exporter.GetCurrentDate() j.targetDate.Store(&date) if err := j.app.State.SetCursorV2State(exporter.GetState()); err != nil { - log.WithError(err).Error("Failed to save cursor_v2 values, will retry") + j.app.log.ErrorContext(ctx, "Failed to save cursor_v2 values, will retry", "error", err) } case <-ctx.Done(): exporter.Close() @@ -213,8 +212,6 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - log := logger.Get(ctx) - lc, err := j.app.State.GetLegacyCursorValues() if err != nil { return trace.Wrap(err) @@ -228,7 +225,9 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error { lc.WindowStartTime = *st } - eventWatcher := NewLegacyEventsWatcher(j.app.Config, j.app.client, *lc, j.handleEvent) + eventWatcher := NewLegacyEventsWatcher( + j.app.Config, j.app.client, *lc, j.handleEvent, j.app.log, + ) // periodically sync cursor values to disk go func() { @@ -246,7 +245,7 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error { continue } if err := j.app.State.SetLegacyCursorValues(currentCursorValues); err != nil { - log.WithError(err).Error("Failed to save cursor values, will retry") + j.app.log.ErrorContext(ctx, "Failed to save cursor values, will retry", "error", err) continue } lastCursorValues = currentCursorValues @@ -320,8 +319,6 @@ func (j *EventsJob) TryLockUser(ctx context.Context, evt *TeleportEvent) error { return nil } - log := logger.Get(ctx) - _, _, _, ok, err := j.rl.Take(ctx, evt.FailedLoginData.Login) if err != nil { return trace.Wrap(err) @@ -335,7 +332,7 @@ func (j *EventsJob) TryLockUser(ctx context.Context, evt *TeleportEvent) error { return trace.Wrap(err) } - log.WithField("data", evt.FailedLoginData).Info("User login is locked") + j.app.log.InfoContext(ctx, "User login is locked", "data", evt.FailedLoginData) return nil } diff --git a/integrations/event-handler/fluentd_client.go b/integrations/event-handler/fluentd_client.go index 92015bef4d312..c3bea94ce97ee 100644 --- a/integrations/event-handler/fluentd_client.go +++ b/integrations/event-handler/fluentd_client.go @@ -21,12 +21,12 @@ import ( "context" "crypto/tls" "crypto/x509" + "log/slog" "net/http" "os" "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" tlib "github.com/gravitational/teleport/integrations/lib" ) @@ -40,10 +40,11 @@ const ( type FluentdClient struct { // client HTTP client to send requests client *http.Client + log *slog.Logger } // NewFluentdClient creates new FluentdClient -func NewFluentdClient(c *FluentdConfig) (*FluentdClient, error) { +func NewFluentdClient(c *FluentdConfig, log *slog.Logger) (*FluentdClient, error) { var certs []tls.Certificate if c.FluentdCert != "" && c.FluentdKey != "" { cert, err := tls.LoadX509KeyPair(c.FluentdCert, c.FluentdKey) @@ -70,7 +71,7 @@ func NewFluentdClient(c *FluentdConfig) (*FluentdClient, error) { Timeout: httpTimeout, } - return &FluentdClient{client: client}, nil + return &FluentdClient{client: client, log: log}, nil } // getCertPool reads CA certificate and returns CA cert pool if passed @@ -91,7 +92,7 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) { // Send sends event to fluentd func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error { - log.WithField("payload", string(b)).Debug("Sending event to Fluentd") + f.log.DebugContext(ctx, "Sending event to Fluentd", "payload", string(b)) req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b)) if err != nil { diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod index 2e8dcc7c26f60..f1fe2ca44cdff 100644 --- a/integrations/event-handler/go.mod +++ b/integrations/event-handler/go.mod @@ -14,7 +14,6 @@ require ( github.com/pelletier/go-toml v1.9.5 github.com/peterbourgon/diskv/v3 v3.0.1 github.com/sethvargo/go-limiter v1.0.0 - github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 golang.org/x/time v0.6.0 google.golang.org/protobuf v1.35.1 @@ -249,6 +248,7 @@ require ( github.com/scim2/filter-parser/v2 v2.2.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/sijms/go-ora/v2 v2.8.22 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/integrations/event-handler/helpers.go b/integrations/event-handler/helpers.go index 793b4c00262e3..d584c5de4770d 100644 --- a/integrations/event-handler/helpers.go +++ b/integrations/event-handler/helpers.go @@ -19,10 +19,10 @@ package main import ( "context" "fmt" + "log/slog" "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" @@ -52,7 +52,7 @@ type TeleportSearchEventsClient interface { // newClient performs teleport api client setup, including credentials loading, validation, and // setup of credentials refresh if needed. -func newClient(ctx context.Context, c *StartCmdConfig) (*client.Client, error) { +func newClient(ctx context.Context, log *slog.Logger, c *StartCmdConfig) (*client.Client, error) { var creds []client.Credentials switch { case c.TeleportIdentityFile != "" && !c.TeleportRefreshEnabled: @@ -70,13 +70,13 @@ func newClient(ctx context.Context, c *StartCmdConfig) (*client.Client, error) { } if validCred, err := credentials.CheckIfExpired(creds); err != nil { - log.Warn(err) + log.WarnContext(ctx, "Encountered error when checking credentials", "error", err) if !validCred { return nil, trace.BadParameter( "No valid credentials found, this likely means credentials are expired. In this case, please sign new credentials and increase their TTL if needed.", ) } - log.Info("At least one non-expired credential has been found, continuing startup") + log.InfoContext(ctx, "At least one non-expired credential has been found, continuing startup") } clientConfig := client.Config{ diff --git a/integrations/event-handler/legacy_events_watcher.go b/integrations/event-handler/legacy_events_watcher.go index 2279bd779d9b6..29addd7b2397d 100644 --- a/integrations/event-handler/legacy_events_watcher.go +++ b/integrations/event-handler/legacy_events_watcher.go @@ -18,17 +18,16 @@ package main import ( "context" + "log/slog" "sync" "sync/atomic" "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "golang.org/x/time/rate" auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/integrations/lib/logger" ) const ( @@ -73,6 +72,8 @@ type LegacyEventsWatcher struct { // until it returns nil. export func(context.Context, *TeleportEvent) error + log *slog.Logger + // exportedCursor is a pointer to the cursor values of the most recently // exported event. the values mirror above fields, but those are only accessible // to the main event processing goroutine. these values are meant to be read @@ -90,6 +91,7 @@ func NewLegacyEventsWatcher( client TeleportSearchEventsClient, cursorValues LegacyCursorValues, export func(context.Context, *TeleportEvent) error, + log *slog.Logger, ) *LegacyEventsWatcher { w := &LegacyEventsWatcher{ client: client, @@ -99,6 +101,7 @@ func NewLegacyEventsWatcher( export: export, id: cursorValues.ID, windowStartTime: cursorValues.WindowStartTime, + log: log, } w.exportedCursor.Store(&cursorValues) @@ -131,7 +134,6 @@ func (t *LegacyEventsWatcher) flipPage() bool { // fetch fetches the page and sets the position to the event after latest known func (t *LegacyEventsWatcher) fetch(ctx context.Context) error { - log := logger.Get(ctx) // Zero batch t.batch = make([]*LegacyTeleportEvent, 0, t.config.BatchSize) nextCursor, err := t.getEvents(ctx) @@ -145,7 +147,12 @@ func (t *LegacyEventsWatcher) fetch(ctx context.Context) error { // Mark position as unresolved (the page is empty) t.pos = -1 - log.WithField("cursor", t.cursor).WithField("next", nextCursor).WithField("len", len(t.batch)).Debug("Fetched page") + t.log.DebugContext( + ctx, "Fetched page", + "cursor", t.cursor, + "next", nextCursor, + "len", len(t.batch), + ) // Page is empty: do nothing, return if len(t.batch) == 0 { @@ -167,7 +174,7 @@ func (t *LegacyEventsWatcher) fetch(ctx context.Context) error { // Set the position of the last known event t.pos = pos - log.WithField("id", t.id).WithField("new_pos", t.pos).Debug("Skipping last known event") + t.log.DebugContext(ctx, "Skipped last known event", "id", t.id, "pos", t.pos) return nil } @@ -182,7 +189,7 @@ func (t *LegacyEventsWatcher) getEvents(ctx context.Context) (string, error) { for i := 1; i < len(rangeSplitByDay); i++ { startTime := rangeSplitByDay[i-1] endTime := rangeSplitByDay[i] - log.Debugf("Fetching events from %v to %v", startTime, endTime) + t.log.DebugContext(ctx, "Fetching events", "from", startTime, "to", endTime) evts, cursor, err := t.getEventsInWindow(ctx, startTime, endTime) if err != nil { return "", trace.Wrap(err) @@ -191,7 +198,7 @@ func (t *LegacyEventsWatcher) getEvents(ctx context.Context) (string, error) { // Convert batch to TeleportEvent for _, e := range evts { if _, ok := t.config.SkipEventTypes[e.Type]; ok { - log.WithField("event", e).Debug("Skipping event") + t.log.DebugContext(ctx, "Skipping event", "event", e) continue } evt, err := NewLegacyTeleportEvent(e, t.cursor, wst) @@ -205,7 +212,11 @@ func (t *LegacyEventsWatcher) getEvents(ctx context.Context) (string, error) { // if no events are found, the cursor is out of the range [startTime, endTime] // and it's the last complete day, update start time to the next day. if t.canSkipToNextWindow(i, rangeSplitByDay, cursor) { - log.Infof("No new events found for the range %v to %v", startTime, endTime) + t.log.InfoContext( + ctx, "No new events found for the range", + "from", startTime, + "to", endTime, + ) t.setWindowStartTime(endTime) continue } @@ -221,7 +232,11 @@ func (t *LegacyEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time. } if len(t.batch) == 0 && i < len(rangeSplitByDay)-1 { - log.Infof("No events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i]) + t.log.InfoContext( + context.TODO(), "No events found for the range", + "from", rangeSplitByDay[i-1], + "to", rangeSplitByDay[i], + ) return true } pos := 0 @@ -236,7 +251,13 @@ func (t *LegacyEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time. } if i < len(rangeSplitByDay)-1 && pos >= len(t.batch) { - log.WithField("pos", pos).WithField("len", len(t.batch)).Infof("No new events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i]) + t.log.InfoContext( + context.TODO(), "No new events found for the range", + "from", rangeSplitByDay[i-1], + "to", rangeSplitByDay[i], + "pos", pos, + "len", len(t.batch), + ) return true } return false @@ -269,8 +290,10 @@ func splitRangeByDay(from, to time.Time, windowSize time.Duration) []time.Time { // pause sleeps for timeout seconds func (t *LegacyEventsWatcher) pause(ctx context.Context) error { - log := logger.Get(ctx) - log.Debugf("No new events, pause for %v seconds", t.config.Timeout) + t.log.DebugContext( + ctx, "No new events, pausing", + "pause_time", t.config.Timeout, + ) select { case <-ctx.Done(): @@ -303,7 +326,7 @@ func (t *LegacyEventsWatcher) ExportEvents(ctx context.Context) error { // If there is still nothing, sleep if len(t.batch) == 0 && t.nextCursor == "" { if t.config.ExitOnLastEvent { - log.Info("All events are processed, exiting...") + t.log.InfoContext(ctx, "All events are processed, exiting...") break } @@ -334,7 +357,7 @@ func (t *LegacyEventsWatcher) ExportEvents(ctx context.Context) error { // If there is still nothing new on current page, sleep if t.pos >= len(t.batch) { if t.config.ExitOnLastEvent && t.nextCursor == "" { - log.Info("All events are processed, exiting...") + t.log.InfoContext(ctx, "All events are processed, exiting...") break } @@ -361,7 +384,7 @@ func (t *LegacyEventsWatcher) ExportEvents(ctx context.Context) error { } if logLimiter.Allow() { - log.Warn("encountering backpressure from outbound event processing") + t.log.WarnContext(ctx, "Encountering backpressure from outbound event processing") } select { @@ -385,7 +408,7 @@ func (t *LegacyEventsWatcher) ExportEvents(ctx context.Context) error { break Export } - log.WithError(err).Error("Failed to export event, retrying...") + t.log.ErrorContext(ctx, "Failed to export event, retrying...", "error", err) select { case <-ctx.Done(): return trace.Wrap(ctx.Err()) diff --git a/integrations/event-handler/legacy_events_watcher_test.go b/integrations/event-handler/legacy_events_watcher_test.go index e106b03815234..bf02fb967066f 100644 --- a/integrations/event-handler/legacy_events_watcher_test.go +++ b/integrations/event-handler/legacy_events_watcher_test.go @@ -18,6 +18,7 @@ package main import ( "context" + "log/slog" "strconv" "sync" "testing" @@ -153,7 +154,7 @@ func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClie SkipSessionTypesRaw: skipEventTypesRaw, WindowSize: 24 * time.Hour, }, - }, eventsClient, cursor, exportFn) + }, eventsClient, cursor, exportFn, slog.Default()) } func TestEvents(t *testing.T) { diff --git a/integrations/event-handler/main.go b/integrations/event-handler/main.go index 85874915e6256..859f6544c1e06 100644 --- a/integrations/event-handler/main.go +++ b/integrations/event-handler/main.go @@ -19,6 +19,7 @@ package main import ( "context" "fmt" + "log/slog" "os" "strings" "time" @@ -45,6 +46,8 @@ const ( ) func main() { + // This initializes the legacy logrus logger. This has been kept in place + // in case any of the dependencies are still using logrus. logger.Init() ctx := kong.Parse( @@ -55,9 +58,23 @@ func main() { kong.Description(pluginDescription), ) + logCfg := logger.Config{ + Severity: "info", + Output: "stderr", + Format: "text", + } if cli.Debug { enableLogDebug() + logCfg.Severity = "debug" + } + log, err := logCfg.NewSLogLogger() + if err != nil { + fmt.Println(trace.DebugReport(trace.Wrap(err, "initializing logger"))) + os.Exit(-1) } + // Whilst this package mostly dependency injects slog, upstream dependencies + // may still use the default slog logger. + slog.SetDefault(log) switch { case ctx.Command() == "version": @@ -69,12 +86,12 @@ func main() { os.Exit(-1) } case ctx.Command() == "start": - err := start() + err := start(log) if err != nil { lib.Bail(err) } else { - logger.Standard().Info("Successfully shut down") + log.InfoContext(context.TODO(), "Successfully shut down") } } } @@ -89,8 +106,8 @@ func enableLogDebug() { } // start spawns the main process -func start() error { - app, err := NewApp(&cli.Start) +func start(log *slog.Logger) error { + app, err := NewApp(&cli.Start, log) if err != nil { return trace.Wrap(err) } diff --git a/integrations/event-handler/session_events_job.go b/integrations/event-handler/session_events_job.go index 8a009333bc3d9..4bf51a8f4e34a 100644 --- a/integrations/event-handler/session_events_job.go +++ b/integrations/event-handler/session_events_job.go @@ -16,18 +16,17 @@ package main import ( "context" + "log/slog" "sync/atomic" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "golang.org/x/time/rate" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/integrations/lib" "github.com/gravitational/teleport/integrations/lib/backoff" - "github.com/gravitational/teleport/integrations/lib/logger" ) const ( @@ -76,8 +75,6 @@ func NewSessionEventsJob(app *App) *SessionEventsJob { // run runs session consuming process func (j *SessionEventsJob) run(ctx context.Context) error { - log := logger.Get(ctx) - // Create cancellable context which handles app termination process := lib.MustGetProcess(ctx) ctx, cancel := context.WithCancel(ctx) @@ -94,7 +91,10 @@ func (j *SessionEventsJob) run(ctx context.Context) error { for { select { case <-logTicker.C: - log.WithField("sessions_per_minute", j.sessionsProcessed.Swap(0)).Info("session processing") + j.app.log.InfoContext( + ctx, "Session processing", + "sessions_per_minute", j.sessionsProcessed.Swap(0), + ) case <-ctx.Done(): return } @@ -102,7 +102,7 @@ func (j *SessionEventsJob) run(ctx context.Context) error { }() if err := j.restartPausedSessions(); err != nil { - log.WithError(err).Error("Restarting paused sessions") + j.app.log.ErrorContext(ctx, "Restarting paused sessions", "error", err) } j.SetReady(true) @@ -110,20 +110,23 @@ func (j *SessionEventsJob) run(ctx context.Context) error { for { select { case s := <-j.sessions: - logger := log.WithField("id", s.ID).WithField("index", s.Index) + log := j.app.log.With( + "id", s.ID, + "index", s.Index, + ) if j.logLimiter.Allow() { - logger.Debug("Starting session ingest") + log.DebugContext(ctx, "Starting session ingest") } select { case j.semaphore <- struct{}{}: case <-ctx.Done(): - logger.WithError(ctx.Err()).Error("Failed to acquire semaphore") + log.ErrorContext(ctx, "Failed to acquire semaphore", "error", ctx.Err()) return nil } - func(s session, log logrus.FieldLogger) { + func(s session, log *slog.Logger) { j.app.SpawnCritical(func(ctx context.Context) error { defer func() { <-j.semaphore }() @@ -133,7 +136,7 @@ func (j *SessionEventsJob) run(ctx context.Context) error { return nil }) - }(s, logger) + }(s, log) case <-ctx.Done(): if lib.IsCanceled(ctx.Err()) { return nil @@ -159,8 +162,10 @@ func (j *SessionEventsJob) processSession(ctx context.Context, s session, proces // sessionBackoffNumTries is the maximum number of backoff tries sessionBackoffNumTries = 3 ) - - log := logger.Get(ctx).WithField("id", s.ID).WithField("index", s.Index) + log := j.app.log.With( + "id", s.ID, + "index", s.Index, + ) backoff := backoff.NewDecorr(sessionBackoffBase, sessionBackoffMax, clockwork.NewRealClock()) attempt := sessionBackoffNumTries @@ -187,11 +192,18 @@ func (j *SessionEventsJob) processSession(ctx context.Context, s session, proces // abort processing the session any further. attempt-- if attempt <= 0 { - log.WithField("limit", sessionBackoffNumTries).Error("Session ingestion exceeded attempt limit, aborting") + log.ErrorContext( + ctx, "Session ingestion exceeded attempt limit, aborting", + "limit", sessionBackoffNumTries, + ) return trace.LimitExceeded("Session ingestion exceeded attempt limit") } - log.WithError(err).WithField("n", attempt).Error("Session ingestion error, retrying") + log.ErrorContext( + ctx, "Session ingestion error, retrying", + "error", err, + "attempt", attempt, + ) // Perform backoff before retrying the session again. if err := backoff.Do(ctx); err != nil { @@ -200,7 +212,7 @@ func (j *SessionEventsJob) processSession(ctx context.Context, s session, proces case err != nil: // Abort on any errors that don't require a retry. if !lib.IsCanceled(err) { - log.WithField("err", err).Error("Session ingestion failed") + log.ErrorContext(ctx, "Session ingestion failed", "error", err) } return trace.Wrap(err) default: @@ -250,14 +262,14 @@ func (j *SessionEventsJob) processMissingRecordings(ctx context.Context) error { defer func() { <-semaphore }() if err := j.processSession(ctx, sess, attempts); err != nil { - logger.Get(ctx).WithError(err).Debug("Failed processing session recording") + j.app.log.DebugContext(ctx, "Failed processing session recording", "error", err) } }() return nil }) if err != nil && !lib.IsCanceled(err) { - logger.Get(ctx).WithError(err).Warn("Unable to load previously failed sessions for processing") + j.app.log.WarnContext(ctx, "Unable to load previously failed sessions for processing", "error", err) } timer.Reset(jitter(processingInterval)) @@ -275,12 +287,19 @@ func (j *SessionEventsJob) restartPausedSessions() error { return nil } - logrus.WithField("count", len(sessions)).Debug("Restarting paused sessions") + j.app.log.DebugContext( + context.TODO(), "Restarting paused sessions", + "count", len(sessions), + ) for id, idx := range sessions { func(id string, idx int64) { j.app.SpawnCritical(func(ctx context.Context) error { - logrus.WithField("id", id).WithField("index", idx).Debug("Restarting session ingestion") + j.app.log.DebugContext( + ctx, "Restarting session ingestion", + "id", id, + "index", idx, + ) s := session{ID: id, Index: idx} @@ -317,7 +336,7 @@ Loop: case evt, ok := <-chEvt: if !ok { if j.logLimiter.Allow() { - logrus.WithField("id", s.ID).Debug("Finished session events ingest") + j.app.log.DebugContext(ctx, "Finished session events ingest", "id", s.ID) } break Loop // Break the main loop } @@ -376,7 +395,10 @@ func (j *SessionEventsJob) RegisterSession(ctx context.Context, e *TeleportEvent } if j.backpressureLogLimiter.Allow() { - logrus.Warn("backpressure in session processing, consider increasing concurrency if this issue persists") + j.app.log.WarnContext( + ctx, + "Backpressure in session processing, consider increasing concurrency if this issue persists", + ) } select { @@ -384,7 +406,10 @@ func (j *SessionEventsJob) RegisterSession(ctx context.Context, e *TeleportEvent return nil case <-ctx.Done(): if !lib.IsCanceled(ctx.Err()) { - logrus.Error(ctx.Err()) + j.app.log.ErrorContext( + ctx, "Encountered context error that was not a cancellation", + "error", ctx.Err(), + ) } // from the caller's perspective this isn't really an error since we did // successfully sync session index to disk... session will be ingested diff --git a/integrations/event-handler/session_events_job_test.go b/integrations/event-handler/session_events_job_test.go index b0ae8caca8e43..79ec5bd497dec 100644 --- a/integrations/event-handler/session_events_job_test.go +++ b/integrations/event-handler/session_events_job_test.go @@ -16,6 +16,7 @@ package main import ( "context" + "log/slog" "testing" "github.com/peterbourgon/diskv/v3" @@ -37,6 +38,7 @@ func TestConsumeSessionNoEventsFound(t *testing.T) { }), }, client: &mockClient{}, + log: slog.Default(), }) _, err := j.consumeSession(context.Background(), session{ID: sessionID}) require.NoError(t, err) diff --git a/integrations/event-handler/state.go b/integrations/event-handler/state.go index 9a76497108048..07b825be72943 100644 --- a/integrations/event-handler/state.go +++ b/integrations/event-handler/state.go @@ -17,10 +17,12 @@ limitations under the License. package main import ( + "context" "encoding/binary" "encoding/json" "errors" "io/fs" + "log/slog" "net" "os" "path/filepath" @@ -30,9 +32,7 @@ import ( "github.com/gravitational/trace" "github.com/peterbourgon/diskv/v3" - "github.com/sirupsen/logrus" - "github.com/gravitational/teleport/integrations/lib/logger" "github.com/gravitational/teleport/lib/events/export" "github.com/gravitational/teleport/integrations/event-handler/lib" @@ -77,14 +77,16 @@ type State struct { // implement the newer bulk export apis, the v1 cursor stored in the above // dv may be the source of truth still. cursorV2 *export.Cursor + + log *slog.Logger } // NewCursor creates new cursor instance -func NewState(c *StartCmdConfig) (*State, error) { +func NewState(c *StartCmdConfig, log *slog.Logger) (*State, error) { // Simplest transform function: put all the data files into the base dir. flatTransform := func(s string) []string { return []string{} } - dir, err := createStorageDir(c) + dir, err := createStorageDir(c, log) if err != nil { return nil, trace.Wrap(err) } @@ -105,15 +107,14 @@ func NewState(c *StartCmdConfig) (*State, error) { s := State{ dv: dv, cursorV2: cursorV2, + log: log, } return &s, nil } // createStorageDir is used to calculate storage dir path and create dir if it does not exits -func createStorageDir(c *StartCmdConfig) (string, error) { - log := logger.Standard() - +func createStorageDir(c *StartCmdConfig, log *slog.Logger) (string, error) { host, port, err := net.SplitHostPort(c.TeleportAddr) if err != nil { return "", trace.Wrap(err) @@ -142,9 +143,9 @@ func createStorageDir(c *StartCmdConfig) (string, error) { return "", trace.Errorf("Can not create storage directory %v : %w", dir, err) } - log.WithField("dir", dir).Info("Created storage directory") + log.InfoContext(context.TODO(), "Created storage directory", "dir", dir) } else { - log.WithField("dir", dir).Info("Using existing storage directory") + log.InfoContext(context.TODO(), "Using existing storage directory", "dir", dir) } return dir, nil @@ -362,7 +363,12 @@ func (s *State) IterateMissingRecordings(callback func(s session, attempts int) var m missingRecording if err := json.Unmarshal(b, &m); err != nil { - logrus.WithError(err).Warnf("Failed to unmarshal missing recording %s from persisted state", key) + s.log.WarnContext( + context.TODO(), + "Failed to unmarshal missing recording from persisted state", + "key", key, + "error", err, + ) continue } diff --git a/integrations/event-handler/state_test.go b/integrations/event-handler/state_test.go index 11ee192b95051..ae1965f9a10cc 100644 --- a/integrations/event-handler/state_test.go +++ b/integrations/event-handler/state_test.go @@ -18,6 +18,7 @@ package main import ( "cmp" + "log/slog" "slices" "strconv" "testing" @@ -52,7 +53,7 @@ func newStartCmdConfig(t *testing.T) *StartCmdConfig { // TestStatePersist checks that state is persisted when StartTime stays constant func TestStatePersist(t *testing.T) { config := newStartCmdConfig(t) - state, err := NewState(config) + state, err := NewState(config, slog.Default()) require.NoError(t, err) startTime, errt := state.GetStartTime() @@ -74,7 +75,7 @@ func TestStatePersist(t *testing.T) { require.NoError(t, erri) require.NoError(t, errt) - state, err = NewState(config) + state, err = NewState(config, slog.Default()) require.NoError(t, err) startTime, errt = state.GetStartTime() @@ -94,7 +95,7 @@ func TestStatePersist(t *testing.T) { func TestStateMissingRecordings(t *testing.T) { config := newStartCmdConfig(t) - state, err := NewState(config) + state, err := NewState(config, slog.Default()) require.NoError(t, err) // Iterating should find no records if nothing has been stored yet.