diff --git a/lib/events/export/cursor.go b/lib/events/export/cursor.go new file mode 100644 index 0000000000000..8dff088d00831 --- /dev/null +++ b/lib/events/export/cursor.go @@ -0,0 +1,276 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package export + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + "slices" + "strings" + "sync" + "time" + + "github.com/gravitational/trace" + + "github.com/gravitational/teleport" +) + +const ( + // completedName is the completed file name + completedName = "completed-chunks" + + // chunkSuffix is the suffix for per-chunk cursor files + chunkSuffix = ".chunk" +) + +// CursorConfig configures a cursor. +type CursorConfig struct { + // Dir is the cursor directory. This directory will be created if it does not exist + // and should not be used for any other purpose. + Dir string +} + +// CheckAndSetDefaults validates configuration and sets default values for optional parameters. +func (c *CursorConfig) CheckAndSetDefaults() error { + if c.Dir == "" { + return trace.BadParameter("missing required parameter Dir in CursorConfig") + } + + return nil +} + +// Cursor manages an event export cursor directory and keeps a copy of its state in-memory, +// improving the efficiency of updates by only writing diffs to disk. the cursor directory +// contains a sub-directory per date. each date's state is tracked using an append-only list +// of completed chunks, along with a per-chunk cursor file. cursor directories are not intended +// for true concurrent use, but concurrent use in the context of a graceful restart won't have +// any consequences more dire than duplicate events. +type Cursor struct { + cfg CursorConfig + mu sync.Mutex + state ExporterState +} + +// NewCursor creates a new cursor, loading any preexisting state from disk. +func NewCursor(cfg CursorConfig) (*Cursor, error) { + if err := cfg.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } + + state, err := loadInitialState(cfg.Dir) + if err != nil { + return nil, trace.Wrap(err) + } + return &Cursor{ + cfg: cfg, + state: *state, + }, nil +} + +// GetState gets the current state as seen by this cursor. +func (c *Cursor) GetState() ExporterState { + c.mu.Lock() + defer c.mu.Unlock() + + return c.state.Clone() +} + +// Sync synchronizes the cursor's in-memory state with the provided state, writing any diffs to disk. +func (c *Cursor) Sync(newState ExporterState) error { + c.mu.Lock() + defer c.mu.Unlock() + + for d, s := range newState.Dates { + if err := c.syncDate(d, s); err != nil { + return trace.Wrap(err) + } + } + + for d := range c.state.Dates { + if _, ok := newState.Dates[d]; !ok { + if err := c.deleteDate(d); err != nil { + return trace.Wrap(err) + } + } + } + + return nil +} + +func (c *Cursor) syncDate(date time.Time, state DateExporterState) error { + // ensure date directory exists. the existence of the date directory + // is meaningful even if it contains no files. + dateDir := filepath.Join(c.cfg.Dir, date.Format(time.DateOnly)) + if err := os.MkdirAll(dateDir, teleport.SharedDirMode); err != nil { + return trace.ConvertSystemError(err) + } + + // open completed file in append mode + completedFile, err := os.OpenFile(filepath.Join(dateDir, completedName), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return trace.ConvertSystemError(err) + } + defer completedFile.Close() + + current, ok := c.state.Dates[date] + if !ok { + current = DateExporterState{ + Cursors: make(map[string]string), + } + } + defer func() { + c.state.Dates[date] = current + }() + + for _, chunk := range state.Completed { + if slices.Contains(current.Completed, chunk) { + // already written to disk + continue + } + + // add chunk to completed file + if _, err := fmt.Fprintln(completedFile, chunk); err != nil { + return trace.ConvertSystemError(err) + } + + // ensure chunk is flushed to disk successfully before removing the cursor file + // and updating in-memory state. + if err := completedFile.Sync(); err != nil { + return trace.ConvertSystemError(err) + } + + // delete cursor file if it exists + if err := os.Remove(filepath.Join(dateDir, chunk+chunkSuffix)); err != nil && !os.IsNotExist(err) { + return trace.ConvertSystemError(err) + } + + // update current state + current.Completed = append(current.Completed, chunk) + delete(current.Cursors, chunk) + } + + for chunk, cursor := range state.Cursors { + if current.Cursors[chunk] == cursor { + continue + } + + // write cursor file + if err := os.WriteFile(filepath.Join(dateDir, chunk+chunkSuffix), []byte(cursor), 0644); err != nil { + return trace.ConvertSystemError(err) + } + + // update current state + current.Cursors[chunk] = cursor + } + + return nil +} + +func (c *Cursor) deleteDate(date time.Time) error { + if _, ok := c.state.Dates[date]; !ok { + return nil + } + + // delete the date directory and all its contents + if err := os.RemoveAll(filepath.Join(c.cfg.Dir, date.Format(time.DateOnly))); err != nil { + return trace.ConvertSystemError(err) + } + + delete(c.state.Dates, date) + + return nil +} + +func loadInitialState(dir string) (*ExporterState, error) { + state := ExporterState{ + Dates: make(map[time.Time]DateExporterState), + } + // list subdirectories of the cursors v2 directory + entries, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return &state, nil + } + return nil, trace.ConvertSystemError(err) + } + + for _, entry := range entries { + if !entry.IsDir() { + // ignore non-directories + continue + } + + // attempt to parse dir name as date + date, err := time.Parse(time.DateOnly, entry.Name()) + if err != nil { + // ignore non-date directories + continue + } + + dateState := DateExporterState{ + Cursors: make(map[string]string), + } + + dateEntries, err := os.ReadDir(filepath.Join(dir, entry.Name())) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + + for _, dateEntry := range dateEntries { + if dateEntry.IsDir() { + continue + } + + if dateEntry.Name() == completedName { + // load the completed file + b, err := os.ReadFile(filepath.Join(dir, entry.Name(), completedName)) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + + // split the completed file into whitespace-separated chunks + dateState.Completed = strings.Fields(string(b)) + continue + } + + if !strings.HasSuffix(dateEntry.Name(), chunkSuffix) { + continue + } + + chunk := strings.TrimSuffix(dateEntry.Name(), chunkSuffix) + b, err := os.ReadFile(filepath.Join(dir, entry.Name(), dateEntry.Name())) + if err != nil { + return nil, trace.ConvertSystemError(err) + } + + if cc := bytes.TrimSpace(b); len(cc) != 0 { + dateState.Cursors[chunk] = string(cc) + } + } + + // note that some dates may not contain any chunks. we still want to track the + // fact that these dates have had their dirs initialized since that still indicates + // how far we've gotten in the export process. + state.Dates[date] = dateState + } + + return &state, nil +} diff --git a/lib/events/export/cursor_test.go b/lib/events/export/cursor_test.go new file mode 100644 index 0000000000000..9853141139345 --- /dev/null +++ b/lib/events/export/cursor_test.go @@ -0,0 +1,243 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package export + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/gravitational/teleport" +) + +// testState is a helper for easily/cleanly representing a cursor/exporter state as a literal when +// writing tests. the values of the inner mapping are generally strings, but some helpers support +// using []string for the completed key. +type testState map[string]map[string]any + +// writeRawState writes the test state to a directory structure. unlike `newState`, this helper +// is designed to inject non-conforming data, so it will accept non-date dir names and expects a string +// value for the completed key. +func writeRawState(t *testing.T, dir string, ts testState) { + for d, s := range ts { + dateDir := filepath.Join(dir, d) + require.NoError(t, os.MkdirAll(dateDir, teleport.SharedDirMode)) + + for k, v := range s { + fileName := filepath.Join(dateDir, k) + require.NoError(t, os.WriteFile(fileName, []byte(v.(string)), 0644)) + } + } +} + +// newState converts a testState into a real ExporterState. this helper only works +// with a well-formed testState. +func newState(t *testing.T, ts testState) ExporterState { + state := ExporterState{ + Dates: make(map[time.Time]DateExporterState), + } + for d, s := range ts { + date, err := time.Parse(time.DateOnly, d) + require.NoError(t, err) + + dateState := DateExporterState{ + Cursors: make(map[string]string), + Completed: []string{}, // avoids require.Equal rejecting nil slices as unequal to empty slices + } + + Entries: + for k, v := range s { + if k == completedName { + dateState.Completed = v.([]string) + continue Entries + } + dateState.Cursors[k] = v.(string) + } + + state.Dates[date] = dateState + } + return state +} + +func syncAndVerifyState(t *testing.T, cursor *Cursor, ts testState) { + state := newState(t, ts) + + // sync the state + require.NoError(t, cursor.Sync(state)) + + // verify in-memory state is as expected + require.Equal(t, state, cursor.GetState()) + + // attempt to load the state from disk + loaded, err := NewCursor(CursorConfig{ + Dir: cursor.cfg.Dir, + }) + require.NoError(t, err) + + // verify that the loaded state is the same as the original state + require.Equal(t, state, loaded.GetState()) +} + +// verifyRawState asserts that the raw state on disk matches the provided test state. chunk names +// need to be suffixed to match and the completed key should be a string. +func verifyRawState(t *testing.T, dir string, ts testState) { + for d, s := range ts { + dateDir := filepath.Join(dir, d) + for k, v := range s { + fileName := filepath.Join(dateDir, k) + data, err := os.ReadFile(fileName) + require.NoError(t, err) + + require.Equal(t, v.(string), string(data)) + } + } +} + +// TestCursorBasics verifies basic syncing/loading of cursor state. +func TestCursorBasics(t *testing.T) { + dir := t.TempDir() + + cursor, err := NewCursor(CursorConfig{ + Dir: dir, + }) + require.NoError(t, err) + state := cursor.GetState() + require.True(t, state.IsEmpty()) + + // sync and verify a typical state + syncAndVerifyState(t, cursor, testState{ + "2021-01-01": { + completedName: []string{"chunk1", "chunk2"}, + }, + "2021-01-02": { + completedName: []string{"chunk1", "chunk2"}, + "chunk3": "cursor1", + "chunk4": "cursor2", + }, + "2021-01-03": { + "chunk3": "cursor1", + "chunk4": "cursor2", + }, + "2021-01-04": {}, + }) + + verifyRawState(t, dir, testState{ + "2021-01-01": { + completedName: "chunk1\nchunk2\n", + }, + "2021-01-02": { + completedName: "chunk1\nchunk2\n", + "chunk3" + chunkSuffix: "cursor1", + "chunk4" + chunkSuffix: "cursor2", + }, + "2021-01-03": { + "chunk3" + chunkSuffix: "cursor1", + "chunk4" + chunkSuffix: "cursor2", + }, + "2021-01-04": {}, + }) + + // sync and verify updated state + syncAndVerifyState(t, cursor, testState{ + "2021-01-01": { + completedName: []string{"chunk1", "chunk2", "chunk3"}, + }, + "2021-01-02": { + completedName: []string{"chunk1", "chunk2"}, + "chunk3": "cursor1", + "chunk4": "cursor2", + "chunk5": "cursor4", + }, + "2021-01-03": { + "chunk3": "cursor2", + "chunk4": "cursor3", + }, + "2021-01-04": {}, + "2021-01-05": {}, + }) + + verifyRawState(t, dir, testState{ + "2021-01-01": { + completedName: "chunk1\nchunk2\nchunk3\n", + }, + "2021-01-02": { + completedName: "chunk1\nchunk2\n", + "chunk3" + chunkSuffix: "cursor1", + "chunk4" + chunkSuffix: "cursor2", + "chunk5" + chunkSuffix: "cursor4", + }, + "2021-01-03": { + "chunk3" + chunkSuffix: "cursor2", + "chunk4" + chunkSuffix: "cursor3", + }, + "2021-01-04": {}, + "2021-01-05": {}, + }) + + // sync & verify heavily truncated state + syncAndVerifyState(t, cursor, testState{ + "2021-01-05": {}, + }) + + verifyRawState(t, dir, testState{ + "2021-01-05": {}, + }) +} + +func TestCursorBadState(t *testing.T) { + dir := t.TempDir() + + writeRawState(t, dir, testState{ + "2021-01-01": { + completedName: "\n\nchunk1\n\n", // extra whitespace should be ignored + }, + "not-a-date": { + completedName: "chunk1", + "no-suffix": "cursor1", // unknown suffix should be ignored + }, + "2021-01-02": { + completedName: "\n", // whitespace-only completed file should count as empty + "chunk1" + chunkSuffix: "cursor1", + "chunk2" + chunkSuffix: "cursor2\n", // extra whitespace should be ignored + "chunk3" + chunkSuffix: " ", // whitespace-only cursor file should count as empty + "no-suffix": "cursor3", // unknown suffix should be ignored + }, + }) + + expected := newState(t, testState{ + "2021-01-01": { + completedName: []string{"chunk1"}, + }, + "2021-01-02": { + "chunk1": "cursor1", + "chunk2": "cursor2", + }, + }) + + loaded, err := NewCursor(CursorConfig{ + Dir: dir, + }) + require.NoError(t, err) + + // verify that the loaded state is the same as expected state + require.Equal(t, expected, loaded.GetState()) +} diff --git a/lib/events/export/date_exporter.go b/lib/events/export/date_exporter.go index cee3e3a36278c..698c67d8c156e 100644 --- a/lib/events/export/date_exporter.go +++ b/lib/events/export/date_exporter.go @@ -117,6 +117,24 @@ type DateExporterState struct { Cursors map[string]string } +// IsEmpty returns true if no state is defined. +func (s *DateExporterState) IsEmpty() bool { + return len(s.Completed) == 0 && len(s.Cursors) == 0 +} + +// Clone returns a deep copy of the date exporter state. +func (s *DateExporterState) Clone() DateExporterState { + cloned := DateExporterState{ + Completed: make([]string, len(s.Completed)), + Cursors: make(map[string]string, len(s.Cursors)), + } + copy(cloned.Completed, s.Completed) + for chunk, cursor := range s.Cursors { + cloned.Cursors[chunk] = cursor + } + return cloned +} + // DateExporter is a utility for exporting events for a given date using the chunked event export APIs. Note that // it is specifically designed to prioritize performance and ensure that events aren't missed. It may not yield events // in time order, and does not provide a mechanism to decide when export for a given date should be considered complete, diff --git a/lib/events/export/exporter.go b/lib/events/export/exporter.go new file mode 100644 index 0000000000000..9506a80b81adc --- /dev/null +++ b/lib/events/export/exporter.go @@ -0,0 +1,440 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package export + +import ( + "context" + "log/slog" + "slices" + "sync" + "time" + + "github.com/gravitational/trace" + "golang.org/x/time/rate" + + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/utils/retryutils" + "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/interval" +) + +type ExporterState struct { + // Dates is a map of dates to their respective state. Note that an empty + // state for a date is still meaningful and either indicates that the date + // itself contains no events, or that no progress has been made against that + // date yet. + Dates map[time.Time]DateExporterState +} + +// IsEmpty returns true if no state is defined. +func (s *ExporterState) IsEmpty() bool { + return len(s.Dates) == 0 +} + +// Clone creates a deep copy of the exporter state. +func (s *ExporterState) Clone() ExporterState { + out := ExporterState{ + Dates: make(map[time.Time]DateExporterState, len(s.Dates)), + } + for date, state := range s.Dates { + out.Dates[date] = state.Clone() + } + return out +} + +// ExporterConfig configured an exporter. +type ExporterConfig struct { + // Client is the audit event client used to fetch and export events. + Client Client + // StartDate is the date from which to start exporting events. + StartDate time.Time + // Export is the callback used to export events. Must be safe for concurrent use if + // the Concurrency parameter is greater than 1. + Export func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error + // OnIdle is an optional callback that gets invoked periodically when the exporter is idle. Note that it is + // safe to close the exporter or inspect its state from within this callback, but waiting on the exporter's + // Done channel within this callback will deadlock. This callback is an asynchronous signal and additional + // events may be discovered concurrently with its invocation. + OnIdle func(ctx context.Context) + // PreviousState is an optional parameter used to resume from a previous date export run. + PreviousState ExporterState + // Concurrency sets the maximum number of event chunks that will be processed concurrently + // for a given date (defaults to 1). Note that the total number of inflight chunk processing + // may be up to Conurrency * (BacklogSize + 1). + Concurrency int + // BacklogSize optionally overrides the default size of the export backlog (i.e. the number of + // previous dates for which polling continues after initial idleness). default is 1. + BacklogSize int + // MaxBackoff optionally overrides the default maximum backoff applied when errors are hit. + MaxBackoff time.Duration + // PollInterval optionally overrides the default poll interval used to fetch event chunks. + PollInterval time.Duration +} + +// CheckAndSetDefaults validates configuration and sets default values for optional parameters. +func (cfg *ExporterConfig) CheckAndSetDefaults() error { + if cfg.Client == nil { + return trace.BadParameter("missing required parameter Client in ExporterConfig") + } + if cfg.StartDate.IsZero() { + return trace.BadParameter("missing required parameter StartDate in ExporterConfig") + } + if cfg.Export == nil { + return trace.BadParameter("missing required parameter Export in ExporterConfig") + } + if cfg.Concurrency == 0 { + cfg.Concurrency = 1 + } + if cfg.BacklogSize == 0 { + cfg.BacklogSize = 1 + } + if cfg.MaxBackoff == 0 { + cfg.MaxBackoff = 90 * time.Second + } + if cfg.PollInterval == 0 { + cfg.PollInterval = 16 * time.Second + } + return nil +} + +// Exporter is a utility for exporting events starting from a given date using the chunked event export APIs. Note that +// it is specifically designed to prioritize performance and ensure that events aren't missed. Events may not be yielded +// in time order. Export of events is performed by consuming all currently available events for a given date, then moving +// to the next date. In order to account for replication delays, a backlog of previous dates are also polled. +type Exporter struct { + cfg ExporterConfig + mu sync.Mutex + current *DateExporter + currentDate time.Time + previous map[time.Time]*DateExporter + cancel context.CancelFunc + idle chan struct{} + done chan struct{} +} + +// NewExporter creates a new exporter and begins background processing of events. Processing will continue indefinitely +// until Exporter.Close is called. +func NewExporter(cfg ExporterConfig) (*Exporter, error) { + if err := cfg.CheckAndSetDefaults(); err != nil { + return nil, trace.Wrap(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + e := &Exporter{ + cfg: cfg, + cancel: cancel, + idle: make(chan struct{}, 1), + done: make(chan struct{}), + previous: make(map[time.Time]*DateExporter, len(cfg.PreviousState.Dates)), + } + + // start initial event processing + var initError error + e.withLock(func() { + var resumed int + for date, state := range cfg.PreviousState.Dates { + date = normalizeDate(date) + if cfg.StartDate.After(date) { + // skip dates that are older than the start date + continue + } + if err := e.resumeExportLocked(ctx, date, state); err != nil { + initError = err + return + } + slog.InfoContext(ctx, "resumed event export", "date", date.Format(time.DateOnly)) + resumed++ + } + + if resumed == 0 { + // no previous state at/after start date, start at the beginning + if err := e.startExportLocked(ctx, cfg.StartDate); err != nil { + initError = err + return + } + slog.InfoContext(ctx, "started event export", "date", cfg.StartDate.Format(time.DateOnly)) + } + }) + if initError != nil { + e.Close() + return nil, trace.Wrap(initError) + } + + go e.run(ctx) + return e, nil + +} + +// Close terminates all event processing. Note that shutdown is asynchronous. Any operation that needs to wait for export to fully +// terminate should wait on Done after calling Close. +func (e *Exporter) Close() { + e.cancel() +} + +// Done provides a channel that will be closed when the exporter has completed processing all inflight dates. When saving the +// final state of the exporter for future resumption, this channel must be waited upon before state is loaded. Note that the date +// exporter never termiantes unless Close is called, so waiting on Done is only meaningful after Close has been called. +func (e *Exporter) Done() <-chan struct{} { + return e.done +} + +// GetState loads the current state of the exporter. Note that there may be concurrent export operations +// in progress, meaning that by the time state is observed it may already be outdated. +func (e *Exporter) GetState() ExporterState { + e.mu.Lock() + defer e.mu.Unlock() + state := ExporterState{ + Dates: make(map[time.Time]DateExporterState, len(e.previous)+1), + } + + // Add the current date state. + state.Dates[e.currentDate] = e.current.GetState() + + for date, exporter := range e.previous { + state.Dates[date] = exporter.GetState() + } + + return state +} + +func (e *Exporter) run(ctx context.Context) { + defer func() { + // on exit we close all date exporters and block on their completion + // before signaling that we are done. + var doneChans []<-chan struct{} + e.withLock(func() { + doneChans = make([]<-chan struct{}, 0, len(e.previous)+1) + e.current.Close() + doneChans = append(doneChans, e.current.Done()) + for _, exporter := range e.previous { + exporter.Close() + doneChans = append(doneChans, exporter.Done()) + } + }) + + for _, done := range doneChans { + <-done + } + close(e.done) + }() + + poll := interval.New(interval.Config{ + Duration: e.cfg.PollInterval, + FirstDuration: utils.FullJitter(e.cfg.PollInterval / 2), + Jitter: retryutils.NewSeventhJitter(), + }) + defer poll.Stop() + + logLimiter := rate.NewLimiter(rate.Every(time.Minute), 1) + + for { + idle, err := e.poll(ctx) + if err != nil && logLimiter.Allow() { + var dates []string + e.withLock(func() { + dates = make([]string, 0, len(e.previous)+1) + dates = append(dates, e.currentDate.Format(time.DateOnly)) + for date := range e.previous { + dates = append(dates, date.Format(time.DateOnly)) + } + }) + slices.Sort(dates) + slog.WarnContext(ctx, "event export poll failed", "error", err, "dates", dates) + } + + if idle && e.cfg.OnIdle != nil { + e.cfg.OnIdle(ctx) + } + + select { + case <-e.idle: + case <-poll.Next(): + case <-ctx.Done(): + return + } + } +} + +// poll advances the exporter to the next date if the current date is idle and in the past, and prunes any idle exporters that +// are outside of the target backlog range. if the exporter is caught up with the current date and all sub-exporters are idle, +// poll returns true. otherwise, poll returns false. +func (e *Exporter) poll(ctx context.Context) (bool, error) { + e.mu.Lock() + defer e.mu.Unlock() + + var caughtUp bool + if e.current.IsIdle() { + if normalizeDate(time.Now()).After(e.currentDate) { + nextDate := e.currentDate.AddDate(0, 0, 1) + // current date is idle and in the past, advance to the next date + if err := e.startExportLocked(ctx, nextDate); err != nil { + return false, trace.Wrap(err) + } + slog.InfoContext(ctx, "advanced to next event export target", "date", nextDate.Format(time.DateOnly)) + } else { + caughtUp = true + } + } + + // prune any dangling exporters that appear idle + e.pruneBacklogLocked(ctx) + + if !caughtUp { + return false, nil + } + + // check if all backlog exporters are idle + for _, exporter := range e.previous { + if !exporter.IsIdle() { + return false, nil + } + } + + // all exporters are idle and we are caught up with the current date + return true, nil +} + +// pruneBacklogLocked prunes any idle exporters that are outside of the target backlog range. +func (e *Exporter) pruneBacklogLocked(ctx context.Context) { + if len(e.previous) <= e.cfg.BacklogSize { + return + } + + dates := make([]time.Time, 0, len(e.previous)) + for date := range e.previous { + dates = append(dates, date) + } + + // sort dates with most recent first + slices.SortFunc(dates, func(a, b time.Time) int { + if a.After(b) { + return -1 + } + if b.After(a) { + return 1 + } + return 0 + }) + + // close any idle exporters that are older than the backlog size + for _, date := range dates[e.cfg.BacklogSize:] { + if !e.previous[date].IsIdle() { + continue + } + + e.previous[date].Close() + + doneC := e.previous[date].Done() + + var closing bool + e.withoutLock(func() { + select { + case <-doneC: + case <-ctx.Done(): + closing = true + } + }) + + if closing { + return + } + + delete(e.previous, date) + + slog.InfoContext(ctx, "halted historical event export", "date", date.Format(time.DateOnly)) + } +} + +// startExport starts export of events for the given date. +func (e *Exporter) startExportLocked(ctx context.Context, date time.Time) error { + return e.resumeExportLocked(ctx, date, DateExporterState{}) +} + +// resumeExport resumes export of events for the given date with the given state. +func (e *Exporter) resumeExportLocked(ctx context.Context, date time.Time, state DateExporterState) error { + date = normalizeDate(date) + + // check if the date is already being exported + if _, ok := e.previous[date]; ok || e.currentDate.Equal(date) { + return nil + } + + onIdle := func(ctx context.Context) { + var isCurrent bool + e.withLock(func() { + isCurrent = e.currentDate.Equal(date) + }) + if !isCurrent { + // idle callbacks from an exporter in the backlog + // can be ignored. + return + } + + // current date is idle, wake up the poll loop + select { + case e.idle <- struct{}{}: + default: + } + } + + // set up exporter + exporter, err := NewDateExporter(DateExporterConfig{ + Client: e.cfg.Client, + Date: date, + Export: e.cfg.Export, + OnIdle: onIdle, + PreviousState: state, + Concurrency: e.cfg.Concurrency, + MaxBackoff: e.cfg.MaxBackoff, + PollInterval: e.cfg.PollInterval, + }) + if err != nil { + return trace.Wrap(err) + } + + // if a current export is in progress and is newer than this export, + // add this export to the backlog. + if e.current != nil && e.currentDate.After(date) { + // historical export is being started, add to backlog + e.previous[date] = exporter + return nil + } + + // bump previous export to backlog + if e.current != nil { + e.previous[e.currentDate] = e.current + } + e.current = exporter + e.currentDate = date + + return nil +} + +func (e *Exporter) withLock(fn func()) { + e.mu.Lock() + defer e.mu.Unlock() + fn() +} + +func (e *Exporter) withoutLock(fn func()) { + e.mu.Unlock() + defer e.mu.Lock() + fn() +} diff --git a/lib/events/export/exporter_test.go b/lib/events/export/exporter_test.go new file mode 100644 index 0000000000000..436cd3ecccf52 --- /dev/null +++ b/lib/events/export/exporter_test.go @@ -0,0 +1,163 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package export + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" +) + +const day = time.Hour * 24 + +// TestExporterBasics tests the basic functionality of the exporter with and without random flake. +func TestExporterBasics(t *testing.T) { + t.Parallel() + + now := normalizeDate(time.Now()) + startDate := now.Add(-7 * day) + + for _, randomFlake := range []bool{false, true} { + + // empty case verified export of a time range larger than backlog size with no events in it. + t.Run(fmt.Sprintf("case=empty,randomFlake=%v", randomFlake), func(t *testing.T) { + t.Parallel() + clt := newFakeClient() + clt.setRandomFlake(randomFlake) + + testExportAll(t, exportTestCase{ + clt: clt, + startDate: startDate, + expected: []*auditlogpb.ExportEventUnstructured{}, + }) + }) + + // sparse case verifies export of a time range with gaps larger than the backlog size. + t.Run(fmt.Sprintf("case=sparse,randomFlake=%v", randomFlake), func(t *testing.T) { + t.Parallel() + + clt := newFakeClient() + clt.setRandomFlake(randomFlake) + + var allEvents []*auditlogpb.ExportEventUnstructured + allEvents = append(allEvents, addEvents(t, clt, startDate, 1, 1)...) + allEvents = append(allEvents, addEvents(t, clt, startDate.Add(4*day), 3, 2)...) + + testExportAll(t, exportTestCase{ + clt: clt, + startDate: startDate, + expected: allEvents, + }) + }) + + // dense case verifies export of a time range with many events in every date. + t.Run(fmt.Sprintf("case=dense,randomFlake=%v", randomFlake), func(t *testing.T) { + t.Parallel() + + clt := newFakeClient() + clt.setRandomFlake(randomFlake) + + var allEvents []*auditlogpb.ExportEventUnstructured + allEvents = append(allEvents, addEvents(t, clt, startDate, 100, 1)...) + allEvents = append(allEvents, addEvents(t, clt, startDate.Add(day), 50, 2)...) + allEvents = append(allEvents, addEvents(t, clt, startDate.Add(2*day), 5, 20)...) + allEvents = append(allEvents, addEvents(t, clt, startDate.Add(3*day), 20, 5)...) + allEvents = append(allEvents, addEvents(t, clt, startDate.Add(4*day), 14, 7)...) + allEvents = append(allEvents, addEvents(t, clt, startDate.Add(5*day), 7, 14)...) + allEvents = append(allEvents, addEvents(t, clt, startDate.Add(6*day), 1, 100)...) + + testExportAll(t, exportTestCase{ + clt: clt, + startDate: startDate, + expected: allEvents, + }) + }) + } +} + +// addEvents is a helper for generating events in tests. It both inserts the specified event chunks/counts into the fake client +// and returns the generated events for comparison. +func addEvents(t *testing.T, clt *fakeClient, date time.Time, chunks, eventsPerChunk int) []*auditlogpb.ExportEventUnstructured { + var allEvents []*auditlogpb.ExportEventUnstructured + for i := 0; i < chunks; i++ { + chunk := makeEventChunk(t, date, eventsPerChunk) + allEvents = append(allEvents, chunk...) + clt.addChunk(date.Format(time.DateOnly), uuid.NewString(), chunk) + } + + return allEvents +} + +type exportTestCase struct { + clt Client + startDate time.Time + expected []*auditlogpb.ExportEventUnstructured +} + +// testExportAll verifies that the expected events are exported by the exporter given +// the supplied client state. +func testExportAll(t *testing.T, tc exportTestCase) { + var exportedMu sync.Mutex + var exported []*auditlogpb.ExportEventUnstructured + + exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error { + exportedMu.Lock() + defer exportedMu.Unlock() + exported = append(exported, event) + return nil + } + + getExported := func() []*auditlogpb.ExportEventUnstructured { + exportedMu.Lock() + defer exportedMu.Unlock() + return append([]*auditlogpb.ExportEventUnstructured(nil), exported...) + } + + var idleOnce sync.Once + idleCh := make(chan struct{}) + + exporter, err := NewExporter(ExporterConfig{ + Client: tc.clt, + StartDate: tc.startDate, + Export: exportFn, + OnIdle: func(_ context.Context) { idleOnce.Do(func() { close(idleCh) }) }, + Concurrency: 2, + BacklogSize: 2, + MaxBackoff: 600 * time.Millisecond, + PollInterval: 200 * time.Millisecond, + }) + require.NoError(t, err) + defer exporter.Close() + + timeout := time.After(30 * time.Second) + select { + case <-idleCh: + case <-timeout: + require.FailNow(t, "timeout waiting for exporter to become idle") + } + + require.ElementsMatch(t, tc.expected, getExported()) +} diff --git a/lib/events/export/helpers.go b/lib/events/export/helpers.go new file mode 100644 index 0000000000000..35f7fe0997fd5 --- /dev/null +++ b/lib/events/export/helpers.go @@ -0,0 +1,27 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package export + +import "time" + +// normalizeDate normalizes a timestamp to the beginning of the day in UTC. +func normalizeDate(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) +} diff --git a/tool/tctl/common/loadtest_command.go b/tool/tctl/common/loadtest_command.go index 2aa72c816dee1..a764d954103db 100644 --- a/tool/tctl/common/loadtest_command.go +++ b/tool/tctl/common/loadtest_command.go @@ -19,6 +19,7 @@ package common import ( "context" "fmt" + "log/slog" "os" "os/signal" "runtime" @@ -31,12 +32,12 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" log "github.com/sirupsen/logrus" - "google.golang.org/protobuf/types/known/timestamppb" auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/cache" + "github.com/gravitational/teleport/lib/events/export" "github.com/gravitational/teleport/lib/service/servicecfg" "github.com/gravitational/teleport/lib/utils" ) @@ -83,7 +84,7 @@ func (c *LoadtestCommand) Initialize(app *kingpin.Application, config *servicecf c.auditEvents = loadtest.Command("export-audit-events", "Bulk export audit events").Hidden() c.auditEvents.Flag("date", "Date to dump events for").StringVar(&c.date) - c.auditEvents.Flag("cursor", "Cursor to start from").StringVar(&c.cursor) + c.auditEvents.Flag("cursor", "Specify an optional cursor directory").StringVar(&c.cursor) } // TryRun takes the CLI command as an argument (like "loadtest node-heartbeats") and executes it. @@ -303,6 +304,9 @@ func printEvent(ekind string, rsc types.Resource) { } func (c *LoadtestCommand) AuditEvents(ctx context.Context, client *authclient.Client) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + date := time.Now() if c.date != "" { var err error @@ -315,6 +319,8 @@ func (c *LoadtestCommand) AuditEvents(ctx context.Context, client *authclient.Cl outch := make(chan *auditlogpb.ExportEventUnstructured, 1024) defer close(outch) + var eventsProcessed atomic.Uint64 + go func() { for event := range outch { s, err := utils.FastMarshal(event.Event.Unstructured) @@ -322,96 +328,77 @@ func (c *LoadtestCommand) AuditEvents(ctx context.Context, client *authclient.Cl panic(err) } fmt.Println(string(s)) + eventsProcessed.Add(1) } }() - chunksProcessed := make(map[string]struct{}) + exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error { + select { + case outch <- event: + case <-ctx.Done(): + return ctx.Err() + } + return nil + } -Outer: - for { - chunks := client.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ - Date: timestamppb.New(date), + var cursor *export.Cursor + if c.cursor != "" { + var err error + cursor, err = export.NewCursor(export.CursorConfig{ + Dir: c.cursor, }) + if err != nil { + return trace.Wrap(err) + } + } - Chunks: - for chunks.Next() { - if _, ok := chunksProcessed[chunks.Item().Chunk]; ok { - log.WithFields(log.Fields{ - "date": date.Format(time.DateOnly), - "chunk": chunks.Item().Chunk, - }).Info("skipping already processed chunk") - continue Chunks - } - - var cursor string - ProcessChunk: - for { - - eventStream := client.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{ - Date: timestamppb.New(date), - Chunk: chunks.Item().Chunk, - Cursor: cursor, - }) + var state export.ExporterState + if cursor != nil { + state = cursor.GetState() + } - Events: - for eventStream.Next() { - cursor = eventStream.Item().Cursor - select { - case outch <- eventStream.Item(): - continue Events - default: - log.Warn("backpressure in event stream") - } + exporter, err := export.NewExporter(export.ExporterConfig{ + Client: client, + StartDate: date, + PreviousState: state, + Export: exportFn, + Concurrency: 3, + BacklogSize: 1, + }) + if err != nil { + return trace.Wrap(err) + } + defer exporter.Close() - select { - case outch <- eventStream.Item(): - case <-ctx.Done(): - return nil - } - } + if cursor != nil { + go func() { + syncTicker := time.NewTicker(time.Millisecond * 666) + defer syncTicker.Stop() - if err := eventStream.Done(); err != nil { - log.WithFields(log.Fields{ - "date": date.Format(time.DateOnly), - "chunk": chunks.Item().Chunk, - "error": err, - }).Error("event stream failed, will attempt to reestablish") - continue ProcessChunk + for { + select { + case <-ctx.Done(): + return + case <-syncTicker.C: + cursor.Sync(exporter.GetState()) } - - chunksProcessed[chunks.Item().Chunk] = struct{}{} - break ProcessChunk } - } + }() + } - if err := chunks.Done(); err != nil { - log.WithFields(log.Fields{ - "date": date.Format(time.DateOnly), - "error": err, - }).Error("event chunk stream failed, will attempt to reestablish") - continue Outer - } + logTicker := time.NewTicker(time.Minute) - nextDate := date.AddDate(0, 0, 1) - if nextDate.After(time.Now()) { - delay := utils.SeventhJitter(time.Second * 7) - log.WithFields(log.Fields{ - "date": date.Format(time.DateOnly), - "delay": delay, - }).Info("finished processing known event chunks for current date, will re-poll after delay") - select { - case <-time.After(delay): - case <-ctx.Done(): - return nil - } - continue Outer + var prevEventsProcessed uint64 + for { + select { + case <-ctx.Done(): + return nil + case <-logTicker.C: + processed := eventsProcessed.Load() + slog.InfoContext(ctx, "event processing", "total", processed, "per_minute", processed-prevEventsProcessed) + prevEventsProcessed = processed + case <-exporter.Done(): + return trace.Errorf("exporter exited unexpected with state: %+v", exporter.GetState()) } - - log.WithFields(log.Fields{ - "date": date.Format(time.DateOnly), - "next": nextDate.Format(time.DateOnly), - }).Info("finished processing known event chunks for historical date, moving to next") - date = nextDate - clear(chunksProcessed) } }