From ce0fe10e9ec40acf3c1706557cbc40cc4320ee92 Mon Sep 17 00:00:00 2001
From: Forrest <30576607+fspmarshall@users.noreply.github.com>
Date: Tue, 15 Oct 2024 10:08:15 -0700
Subject: [PATCH] high level event exporter & cursor helpers (#47370)
---
lib/events/export/cursor.go | 276 +++++++++++++++++
lib/events/export/cursor_test.go | 243 +++++++++++++++
lib/events/export/date_exporter.go | 18 ++
lib/events/export/exporter.go | 440 +++++++++++++++++++++++++++
lib/events/export/exporter_test.go | 163 ++++++++++
lib/events/export/helpers.go | 27 ++
tool/tctl/common/loadtest_command.go | 145 ++++-----
7 files changed, 1233 insertions(+), 79 deletions(-)
create mode 100644 lib/events/export/cursor.go
create mode 100644 lib/events/export/cursor_test.go
create mode 100644 lib/events/export/exporter.go
create mode 100644 lib/events/export/exporter_test.go
create mode 100644 lib/events/export/helpers.go
diff --git a/lib/events/export/cursor.go b/lib/events/export/cursor.go
new file mode 100644
index 0000000000000..8dff088d00831
--- /dev/null
+++ b/lib/events/export/cursor.go
@@ -0,0 +1,276 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package export
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "path/filepath"
+ "slices"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/gravitational/trace"
+
+ "github.com/gravitational/teleport"
+)
+
+const (
+ // completedName is the completed file name
+ completedName = "completed-chunks"
+
+ // chunkSuffix is the suffix for per-chunk cursor files
+ chunkSuffix = ".chunk"
+)
+
+// CursorConfig configures a cursor.
+type CursorConfig struct {
+ // Dir is the cursor directory. This directory will be created if it does not exist
+ // and should not be used for any other purpose.
+ Dir string
+}
+
+// CheckAndSetDefaults validates configuration and sets default values for optional parameters.
+func (c *CursorConfig) CheckAndSetDefaults() error {
+ if c.Dir == "" {
+ return trace.BadParameter("missing required parameter Dir in CursorConfig")
+ }
+
+ return nil
+}
+
+// Cursor manages an event export cursor directory and keeps a copy of its state in-memory,
+// improving the efficiency of updates by only writing diffs to disk. the cursor directory
+// contains a sub-directory per date. each date's state is tracked using an append-only list
+// of completed chunks, along with a per-chunk cursor file. cursor directories are not intended
+// for true concurrent use, but concurrent use in the context of a graceful restart won't have
+// any consequences more dire than duplicate events.
+type Cursor struct {
+ cfg CursorConfig
+ mu sync.Mutex
+ state ExporterState
+}
+
+// NewCursor creates a new cursor, loading any preexisting state from disk.
+func NewCursor(cfg CursorConfig) (*Cursor, error) {
+ if err := cfg.CheckAndSetDefaults(); err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ state, err := loadInitialState(cfg.Dir)
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+ return &Cursor{
+ cfg: cfg,
+ state: *state,
+ }, nil
+}
+
+// GetState gets the current state as seen by this cursor.
+func (c *Cursor) GetState() ExporterState {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ return c.state.Clone()
+}
+
+// Sync synchronizes the cursor's in-memory state with the provided state, writing any diffs to disk.
+func (c *Cursor) Sync(newState ExporterState) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for d, s := range newState.Dates {
+ if err := c.syncDate(d, s); err != nil {
+ return trace.Wrap(err)
+ }
+ }
+
+ for d := range c.state.Dates {
+ if _, ok := newState.Dates[d]; !ok {
+ if err := c.deleteDate(d); err != nil {
+ return trace.Wrap(err)
+ }
+ }
+ }
+
+ return nil
+}
+
+func (c *Cursor) syncDate(date time.Time, state DateExporterState) error {
+ // ensure date directory exists. the existence of the date directory
+ // is meaningful even if it contains no files.
+ dateDir := filepath.Join(c.cfg.Dir, date.Format(time.DateOnly))
+ if err := os.MkdirAll(dateDir, teleport.SharedDirMode); err != nil {
+ return trace.ConvertSystemError(err)
+ }
+
+ // open completed file in append mode
+ completedFile, err := os.OpenFile(filepath.Join(dateDir, completedName), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
+ if err != nil {
+ return trace.ConvertSystemError(err)
+ }
+ defer completedFile.Close()
+
+ current, ok := c.state.Dates[date]
+ if !ok {
+ current = DateExporterState{
+ Cursors: make(map[string]string),
+ }
+ }
+ defer func() {
+ c.state.Dates[date] = current
+ }()
+
+ for _, chunk := range state.Completed {
+ if slices.Contains(current.Completed, chunk) {
+ // already written to disk
+ continue
+ }
+
+ // add chunk to completed file
+ if _, err := fmt.Fprintln(completedFile, chunk); err != nil {
+ return trace.ConvertSystemError(err)
+ }
+
+ // ensure chunk is flushed to disk successfully before removing the cursor file
+ // and updating in-memory state.
+ if err := completedFile.Sync(); err != nil {
+ return trace.ConvertSystemError(err)
+ }
+
+ // delete cursor file if it exists
+ if err := os.Remove(filepath.Join(dateDir, chunk+chunkSuffix)); err != nil && !os.IsNotExist(err) {
+ return trace.ConvertSystemError(err)
+ }
+
+ // update current state
+ current.Completed = append(current.Completed, chunk)
+ delete(current.Cursors, chunk)
+ }
+
+ for chunk, cursor := range state.Cursors {
+ if current.Cursors[chunk] == cursor {
+ continue
+ }
+
+ // write cursor file
+ if err := os.WriteFile(filepath.Join(dateDir, chunk+chunkSuffix), []byte(cursor), 0644); err != nil {
+ return trace.ConvertSystemError(err)
+ }
+
+ // update current state
+ current.Cursors[chunk] = cursor
+ }
+
+ return nil
+}
+
+func (c *Cursor) deleteDate(date time.Time) error {
+ if _, ok := c.state.Dates[date]; !ok {
+ return nil
+ }
+
+ // delete the date directory and all its contents
+ if err := os.RemoveAll(filepath.Join(c.cfg.Dir, date.Format(time.DateOnly))); err != nil {
+ return trace.ConvertSystemError(err)
+ }
+
+ delete(c.state.Dates, date)
+
+ return nil
+}
+
+func loadInitialState(dir string) (*ExporterState, error) {
+ state := ExporterState{
+ Dates: make(map[time.Time]DateExporterState),
+ }
+ // list subdirectories of the cursors v2 directory
+ entries, err := os.ReadDir(dir)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return &state, nil
+ }
+ return nil, trace.ConvertSystemError(err)
+ }
+
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ // ignore non-directories
+ continue
+ }
+
+ // attempt to parse dir name as date
+ date, err := time.Parse(time.DateOnly, entry.Name())
+ if err != nil {
+ // ignore non-date directories
+ continue
+ }
+
+ dateState := DateExporterState{
+ Cursors: make(map[string]string),
+ }
+
+ dateEntries, err := os.ReadDir(filepath.Join(dir, entry.Name()))
+ if err != nil {
+ return nil, trace.ConvertSystemError(err)
+ }
+
+ for _, dateEntry := range dateEntries {
+ if dateEntry.IsDir() {
+ continue
+ }
+
+ if dateEntry.Name() == completedName {
+ // load the completed file
+ b, err := os.ReadFile(filepath.Join(dir, entry.Name(), completedName))
+ if err != nil {
+ return nil, trace.ConvertSystemError(err)
+ }
+
+ // split the completed file into whitespace-separated chunks
+ dateState.Completed = strings.Fields(string(b))
+ continue
+ }
+
+ if !strings.HasSuffix(dateEntry.Name(), chunkSuffix) {
+ continue
+ }
+
+ chunk := strings.TrimSuffix(dateEntry.Name(), chunkSuffix)
+ b, err := os.ReadFile(filepath.Join(dir, entry.Name(), dateEntry.Name()))
+ if err != nil {
+ return nil, trace.ConvertSystemError(err)
+ }
+
+ if cc := bytes.TrimSpace(b); len(cc) != 0 {
+ dateState.Cursors[chunk] = string(cc)
+ }
+ }
+
+ // note that some dates may not contain any chunks. we still want to track the
+ // fact that these dates have had their dirs initialized since that still indicates
+ // how far we've gotten in the export process.
+ state.Dates[date] = dateState
+ }
+
+ return &state, nil
+}
diff --git a/lib/events/export/cursor_test.go b/lib/events/export/cursor_test.go
new file mode 100644
index 0000000000000..9853141139345
--- /dev/null
+++ b/lib/events/export/cursor_test.go
@@ -0,0 +1,243 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package export
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/gravitational/teleport"
+)
+
+// testState is a helper for easily/cleanly representing a cursor/exporter state as a literal when
+// writing tests. the values of the inner mapping are generally strings, but some helpers support
+// using []string for the completed key.
+type testState map[string]map[string]any
+
+// writeRawState writes the test state to a directory structure. unlike `newState`, this helper
+// is designed to inject non-conforming data, so it will accept non-date dir names and expects a string
+// value for the completed key.
+func writeRawState(t *testing.T, dir string, ts testState) {
+ for d, s := range ts {
+ dateDir := filepath.Join(dir, d)
+ require.NoError(t, os.MkdirAll(dateDir, teleport.SharedDirMode))
+
+ for k, v := range s {
+ fileName := filepath.Join(dateDir, k)
+ require.NoError(t, os.WriteFile(fileName, []byte(v.(string)), 0644))
+ }
+ }
+}
+
+// newState converts a testState into a real ExporterState. this helper only works
+// with a well-formed testState.
+func newState(t *testing.T, ts testState) ExporterState {
+ state := ExporterState{
+ Dates: make(map[time.Time]DateExporterState),
+ }
+ for d, s := range ts {
+ date, err := time.Parse(time.DateOnly, d)
+ require.NoError(t, err)
+
+ dateState := DateExporterState{
+ Cursors: make(map[string]string),
+ Completed: []string{}, // avoids require.Equal rejecting nil slices as unequal to empty slices
+ }
+
+ Entries:
+ for k, v := range s {
+ if k == completedName {
+ dateState.Completed = v.([]string)
+ continue Entries
+ }
+ dateState.Cursors[k] = v.(string)
+ }
+
+ state.Dates[date] = dateState
+ }
+ return state
+}
+
+func syncAndVerifyState(t *testing.T, cursor *Cursor, ts testState) {
+ state := newState(t, ts)
+
+ // sync the state
+ require.NoError(t, cursor.Sync(state))
+
+ // verify in-memory state is as expected
+ require.Equal(t, state, cursor.GetState())
+
+ // attempt to load the state from disk
+ loaded, err := NewCursor(CursorConfig{
+ Dir: cursor.cfg.Dir,
+ })
+ require.NoError(t, err)
+
+ // verify that the loaded state is the same as the original state
+ require.Equal(t, state, loaded.GetState())
+}
+
+// verifyRawState asserts that the raw state on disk matches the provided test state. chunk names
+// need to be suffixed to match and the completed key should be a string.
+func verifyRawState(t *testing.T, dir string, ts testState) {
+ for d, s := range ts {
+ dateDir := filepath.Join(dir, d)
+ for k, v := range s {
+ fileName := filepath.Join(dateDir, k)
+ data, err := os.ReadFile(fileName)
+ require.NoError(t, err)
+
+ require.Equal(t, v.(string), string(data))
+ }
+ }
+}
+
+// TestCursorBasics verifies basic syncing/loading of cursor state.
+func TestCursorBasics(t *testing.T) {
+ dir := t.TempDir()
+
+ cursor, err := NewCursor(CursorConfig{
+ Dir: dir,
+ })
+ require.NoError(t, err)
+ state := cursor.GetState()
+ require.True(t, state.IsEmpty())
+
+ // sync and verify a typical state
+ syncAndVerifyState(t, cursor, testState{
+ "2021-01-01": {
+ completedName: []string{"chunk1", "chunk2"},
+ },
+ "2021-01-02": {
+ completedName: []string{"chunk1", "chunk2"},
+ "chunk3": "cursor1",
+ "chunk4": "cursor2",
+ },
+ "2021-01-03": {
+ "chunk3": "cursor1",
+ "chunk4": "cursor2",
+ },
+ "2021-01-04": {},
+ })
+
+ verifyRawState(t, dir, testState{
+ "2021-01-01": {
+ completedName: "chunk1\nchunk2\n",
+ },
+ "2021-01-02": {
+ completedName: "chunk1\nchunk2\n",
+ "chunk3" + chunkSuffix: "cursor1",
+ "chunk4" + chunkSuffix: "cursor2",
+ },
+ "2021-01-03": {
+ "chunk3" + chunkSuffix: "cursor1",
+ "chunk4" + chunkSuffix: "cursor2",
+ },
+ "2021-01-04": {},
+ })
+
+ // sync and verify updated state
+ syncAndVerifyState(t, cursor, testState{
+ "2021-01-01": {
+ completedName: []string{"chunk1", "chunk2", "chunk3"},
+ },
+ "2021-01-02": {
+ completedName: []string{"chunk1", "chunk2"},
+ "chunk3": "cursor1",
+ "chunk4": "cursor2",
+ "chunk5": "cursor4",
+ },
+ "2021-01-03": {
+ "chunk3": "cursor2",
+ "chunk4": "cursor3",
+ },
+ "2021-01-04": {},
+ "2021-01-05": {},
+ })
+
+ verifyRawState(t, dir, testState{
+ "2021-01-01": {
+ completedName: "chunk1\nchunk2\nchunk3\n",
+ },
+ "2021-01-02": {
+ completedName: "chunk1\nchunk2\n",
+ "chunk3" + chunkSuffix: "cursor1",
+ "chunk4" + chunkSuffix: "cursor2",
+ "chunk5" + chunkSuffix: "cursor4",
+ },
+ "2021-01-03": {
+ "chunk3" + chunkSuffix: "cursor2",
+ "chunk4" + chunkSuffix: "cursor3",
+ },
+ "2021-01-04": {},
+ "2021-01-05": {},
+ })
+
+ // sync & verify heavily truncated state
+ syncAndVerifyState(t, cursor, testState{
+ "2021-01-05": {},
+ })
+
+ verifyRawState(t, dir, testState{
+ "2021-01-05": {},
+ })
+}
+
+func TestCursorBadState(t *testing.T) {
+ dir := t.TempDir()
+
+ writeRawState(t, dir, testState{
+ "2021-01-01": {
+ completedName: "\n\nchunk1\n\n", // extra whitespace should be ignored
+ },
+ "not-a-date": {
+ completedName: "chunk1",
+ "no-suffix": "cursor1", // unknown suffix should be ignored
+ },
+ "2021-01-02": {
+ completedName: "\n", // whitespace-only completed file should count as empty
+ "chunk1" + chunkSuffix: "cursor1",
+ "chunk2" + chunkSuffix: "cursor2\n", // extra whitespace should be ignored
+ "chunk3" + chunkSuffix: " ", // whitespace-only cursor file should count as empty
+ "no-suffix": "cursor3", // unknown suffix should be ignored
+ },
+ })
+
+ expected := newState(t, testState{
+ "2021-01-01": {
+ completedName: []string{"chunk1"},
+ },
+ "2021-01-02": {
+ "chunk1": "cursor1",
+ "chunk2": "cursor2",
+ },
+ })
+
+ loaded, err := NewCursor(CursorConfig{
+ Dir: dir,
+ })
+ require.NoError(t, err)
+
+ // verify that the loaded state is the same as expected state
+ require.Equal(t, expected, loaded.GetState())
+}
diff --git a/lib/events/export/date_exporter.go b/lib/events/export/date_exporter.go
index cee3e3a36278c..698c67d8c156e 100644
--- a/lib/events/export/date_exporter.go
+++ b/lib/events/export/date_exporter.go
@@ -117,6 +117,24 @@ type DateExporterState struct {
Cursors map[string]string
}
+// IsEmpty returns true if no state is defined.
+func (s *DateExporterState) IsEmpty() bool {
+ return len(s.Completed) == 0 && len(s.Cursors) == 0
+}
+
+// Clone returns a deep copy of the date exporter state.
+func (s *DateExporterState) Clone() DateExporterState {
+ cloned := DateExporterState{
+ Completed: make([]string, len(s.Completed)),
+ Cursors: make(map[string]string, len(s.Cursors)),
+ }
+ copy(cloned.Completed, s.Completed)
+ for chunk, cursor := range s.Cursors {
+ cloned.Cursors[chunk] = cursor
+ }
+ return cloned
+}
+
// DateExporter is a utility for exporting events for a given date using the chunked event export APIs. Note that
// it is specifically designed to prioritize performance and ensure that events aren't missed. It may not yield events
// in time order, and does not provide a mechanism to decide when export for a given date should be considered complete,
diff --git a/lib/events/export/exporter.go b/lib/events/export/exporter.go
new file mode 100644
index 0000000000000..9506a80b81adc
--- /dev/null
+++ b/lib/events/export/exporter.go
@@ -0,0 +1,440 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package export
+
+import (
+ "context"
+ "log/slog"
+ "slices"
+ "sync"
+ "time"
+
+ "github.com/gravitational/trace"
+ "golang.org/x/time/rate"
+
+ auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
+ "github.com/gravitational/teleport/api/utils/retryutils"
+ "github.com/gravitational/teleport/lib/utils"
+ "github.com/gravitational/teleport/lib/utils/interval"
+)
+
+type ExporterState struct {
+ // Dates is a map of dates to their respective state. Note that an empty
+ // state for a date is still meaningful and either indicates that the date
+ // itself contains no events, or that no progress has been made against that
+ // date yet.
+ Dates map[time.Time]DateExporterState
+}
+
+// IsEmpty returns true if no state is defined.
+func (s *ExporterState) IsEmpty() bool {
+ return len(s.Dates) == 0
+}
+
+// Clone creates a deep copy of the exporter state.
+func (s *ExporterState) Clone() ExporterState {
+ out := ExporterState{
+ Dates: make(map[time.Time]DateExporterState, len(s.Dates)),
+ }
+ for date, state := range s.Dates {
+ out.Dates[date] = state.Clone()
+ }
+ return out
+}
+
+// ExporterConfig configured an exporter.
+type ExporterConfig struct {
+ // Client is the audit event client used to fetch and export events.
+ Client Client
+ // StartDate is the date from which to start exporting events.
+ StartDate time.Time
+ // Export is the callback used to export events. Must be safe for concurrent use if
+ // the Concurrency parameter is greater than 1.
+ Export func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error
+ // OnIdle is an optional callback that gets invoked periodically when the exporter is idle. Note that it is
+ // safe to close the exporter or inspect its state from within this callback, but waiting on the exporter's
+ // Done channel within this callback will deadlock. This callback is an asynchronous signal and additional
+ // events may be discovered concurrently with its invocation.
+ OnIdle func(ctx context.Context)
+ // PreviousState is an optional parameter used to resume from a previous date export run.
+ PreviousState ExporterState
+ // Concurrency sets the maximum number of event chunks that will be processed concurrently
+ // for a given date (defaults to 1). Note that the total number of inflight chunk processing
+ // may be up to Conurrency * (BacklogSize + 1).
+ Concurrency int
+ // BacklogSize optionally overrides the default size of the export backlog (i.e. the number of
+ // previous dates for which polling continues after initial idleness). default is 1.
+ BacklogSize int
+ // MaxBackoff optionally overrides the default maximum backoff applied when errors are hit.
+ MaxBackoff time.Duration
+ // PollInterval optionally overrides the default poll interval used to fetch event chunks.
+ PollInterval time.Duration
+}
+
+// CheckAndSetDefaults validates configuration and sets default values for optional parameters.
+func (cfg *ExporterConfig) CheckAndSetDefaults() error {
+ if cfg.Client == nil {
+ return trace.BadParameter("missing required parameter Client in ExporterConfig")
+ }
+ if cfg.StartDate.IsZero() {
+ return trace.BadParameter("missing required parameter StartDate in ExporterConfig")
+ }
+ if cfg.Export == nil {
+ return trace.BadParameter("missing required parameter Export in ExporterConfig")
+ }
+ if cfg.Concurrency == 0 {
+ cfg.Concurrency = 1
+ }
+ if cfg.BacklogSize == 0 {
+ cfg.BacklogSize = 1
+ }
+ if cfg.MaxBackoff == 0 {
+ cfg.MaxBackoff = 90 * time.Second
+ }
+ if cfg.PollInterval == 0 {
+ cfg.PollInterval = 16 * time.Second
+ }
+ return nil
+}
+
+// Exporter is a utility for exporting events starting from a given date using the chunked event export APIs. Note that
+// it is specifically designed to prioritize performance and ensure that events aren't missed. Events may not be yielded
+// in time order. Export of events is performed by consuming all currently available events for a given date, then moving
+// to the next date. In order to account for replication delays, a backlog of previous dates are also polled.
+type Exporter struct {
+ cfg ExporterConfig
+ mu sync.Mutex
+ current *DateExporter
+ currentDate time.Time
+ previous map[time.Time]*DateExporter
+ cancel context.CancelFunc
+ idle chan struct{}
+ done chan struct{}
+}
+
+// NewExporter creates a new exporter and begins background processing of events. Processing will continue indefinitely
+// until Exporter.Close is called.
+func NewExporter(cfg ExporterConfig) (*Exporter, error) {
+ if err := cfg.CheckAndSetDefaults(); err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ e := &Exporter{
+ cfg: cfg,
+ cancel: cancel,
+ idle: make(chan struct{}, 1),
+ done: make(chan struct{}),
+ previous: make(map[time.Time]*DateExporter, len(cfg.PreviousState.Dates)),
+ }
+
+ // start initial event processing
+ var initError error
+ e.withLock(func() {
+ var resumed int
+ for date, state := range cfg.PreviousState.Dates {
+ date = normalizeDate(date)
+ if cfg.StartDate.After(date) {
+ // skip dates that are older than the start date
+ continue
+ }
+ if err := e.resumeExportLocked(ctx, date, state); err != nil {
+ initError = err
+ return
+ }
+ slog.InfoContext(ctx, "resumed event export", "date", date.Format(time.DateOnly))
+ resumed++
+ }
+
+ if resumed == 0 {
+ // no previous state at/after start date, start at the beginning
+ if err := e.startExportLocked(ctx, cfg.StartDate); err != nil {
+ initError = err
+ return
+ }
+ slog.InfoContext(ctx, "started event export", "date", cfg.StartDate.Format(time.DateOnly))
+ }
+ })
+ if initError != nil {
+ e.Close()
+ return nil, trace.Wrap(initError)
+ }
+
+ go e.run(ctx)
+ return e, nil
+
+}
+
+// Close terminates all event processing. Note that shutdown is asynchronous. Any operation that needs to wait for export to fully
+// terminate should wait on Done after calling Close.
+func (e *Exporter) Close() {
+ e.cancel()
+}
+
+// Done provides a channel that will be closed when the exporter has completed processing all inflight dates. When saving the
+// final state of the exporter for future resumption, this channel must be waited upon before state is loaded. Note that the date
+// exporter never termiantes unless Close is called, so waiting on Done is only meaningful after Close has been called.
+func (e *Exporter) Done() <-chan struct{} {
+ return e.done
+}
+
+// GetState loads the current state of the exporter. Note that there may be concurrent export operations
+// in progress, meaning that by the time state is observed it may already be outdated.
+func (e *Exporter) GetState() ExporterState {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ state := ExporterState{
+ Dates: make(map[time.Time]DateExporterState, len(e.previous)+1),
+ }
+
+ // Add the current date state.
+ state.Dates[e.currentDate] = e.current.GetState()
+
+ for date, exporter := range e.previous {
+ state.Dates[date] = exporter.GetState()
+ }
+
+ return state
+}
+
+func (e *Exporter) run(ctx context.Context) {
+ defer func() {
+ // on exit we close all date exporters and block on their completion
+ // before signaling that we are done.
+ var doneChans []<-chan struct{}
+ e.withLock(func() {
+ doneChans = make([]<-chan struct{}, 0, len(e.previous)+1)
+ e.current.Close()
+ doneChans = append(doneChans, e.current.Done())
+ for _, exporter := range e.previous {
+ exporter.Close()
+ doneChans = append(doneChans, exporter.Done())
+ }
+ })
+
+ for _, done := range doneChans {
+ <-done
+ }
+ close(e.done)
+ }()
+
+ poll := interval.New(interval.Config{
+ Duration: e.cfg.PollInterval,
+ FirstDuration: utils.FullJitter(e.cfg.PollInterval / 2),
+ Jitter: retryutils.NewSeventhJitter(),
+ })
+ defer poll.Stop()
+
+ logLimiter := rate.NewLimiter(rate.Every(time.Minute), 1)
+
+ for {
+ idle, err := e.poll(ctx)
+ if err != nil && logLimiter.Allow() {
+ var dates []string
+ e.withLock(func() {
+ dates = make([]string, 0, len(e.previous)+1)
+ dates = append(dates, e.currentDate.Format(time.DateOnly))
+ for date := range e.previous {
+ dates = append(dates, date.Format(time.DateOnly))
+ }
+ })
+ slices.Sort(dates)
+ slog.WarnContext(ctx, "event export poll failed", "error", err, "dates", dates)
+ }
+
+ if idle && e.cfg.OnIdle != nil {
+ e.cfg.OnIdle(ctx)
+ }
+
+ select {
+ case <-e.idle:
+ case <-poll.Next():
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// poll advances the exporter to the next date if the current date is idle and in the past, and prunes any idle exporters that
+// are outside of the target backlog range. if the exporter is caught up with the current date and all sub-exporters are idle,
+// poll returns true. otherwise, poll returns false.
+func (e *Exporter) poll(ctx context.Context) (bool, error) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ var caughtUp bool
+ if e.current.IsIdle() {
+ if normalizeDate(time.Now()).After(e.currentDate) {
+ nextDate := e.currentDate.AddDate(0, 0, 1)
+ // current date is idle and in the past, advance to the next date
+ if err := e.startExportLocked(ctx, nextDate); err != nil {
+ return false, trace.Wrap(err)
+ }
+ slog.InfoContext(ctx, "advanced to next event export target", "date", nextDate.Format(time.DateOnly))
+ } else {
+ caughtUp = true
+ }
+ }
+
+ // prune any dangling exporters that appear idle
+ e.pruneBacklogLocked(ctx)
+
+ if !caughtUp {
+ return false, nil
+ }
+
+ // check if all backlog exporters are idle
+ for _, exporter := range e.previous {
+ if !exporter.IsIdle() {
+ return false, nil
+ }
+ }
+
+ // all exporters are idle and we are caught up with the current date
+ return true, nil
+}
+
+// pruneBacklogLocked prunes any idle exporters that are outside of the target backlog range.
+func (e *Exporter) pruneBacklogLocked(ctx context.Context) {
+ if len(e.previous) <= e.cfg.BacklogSize {
+ return
+ }
+
+ dates := make([]time.Time, 0, len(e.previous))
+ for date := range e.previous {
+ dates = append(dates, date)
+ }
+
+ // sort dates with most recent first
+ slices.SortFunc(dates, func(a, b time.Time) int {
+ if a.After(b) {
+ return -1
+ }
+ if b.After(a) {
+ return 1
+ }
+ return 0
+ })
+
+ // close any idle exporters that are older than the backlog size
+ for _, date := range dates[e.cfg.BacklogSize:] {
+ if !e.previous[date].IsIdle() {
+ continue
+ }
+
+ e.previous[date].Close()
+
+ doneC := e.previous[date].Done()
+
+ var closing bool
+ e.withoutLock(func() {
+ select {
+ case <-doneC:
+ case <-ctx.Done():
+ closing = true
+ }
+ })
+
+ if closing {
+ return
+ }
+
+ delete(e.previous, date)
+
+ slog.InfoContext(ctx, "halted historical event export", "date", date.Format(time.DateOnly))
+ }
+}
+
+// startExport starts export of events for the given date.
+func (e *Exporter) startExportLocked(ctx context.Context, date time.Time) error {
+ return e.resumeExportLocked(ctx, date, DateExporterState{})
+}
+
+// resumeExport resumes export of events for the given date with the given state.
+func (e *Exporter) resumeExportLocked(ctx context.Context, date time.Time, state DateExporterState) error {
+ date = normalizeDate(date)
+
+ // check if the date is already being exported
+ if _, ok := e.previous[date]; ok || e.currentDate.Equal(date) {
+ return nil
+ }
+
+ onIdle := func(ctx context.Context) {
+ var isCurrent bool
+ e.withLock(func() {
+ isCurrent = e.currentDate.Equal(date)
+ })
+ if !isCurrent {
+ // idle callbacks from an exporter in the backlog
+ // can be ignored.
+ return
+ }
+
+ // current date is idle, wake up the poll loop
+ select {
+ case e.idle <- struct{}{}:
+ default:
+ }
+ }
+
+ // set up exporter
+ exporter, err := NewDateExporter(DateExporterConfig{
+ Client: e.cfg.Client,
+ Date: date,
+ Export: e.cfg.Export,
+ OnIdle: onIdle,
+ PreviousState: state,
+ Concurrency: e.cfg.Concurrency,
+ MaxBackoff: e.cfg.MaxBackoff,
+ PollInterval: e.cfg.PollInterval,
+ })
+ if err != nil {
+ return trace.Wrap(err)
+ }
+
+ // if a current export is in progress and is newer than this export,
+ // add this export to the backlog.
+ if e.current != nil && e.currentDate.After(date) {
+ // historical export is being started, add to backlog
+ e.previous[date] = exporter
+ return nil
+ }
+
+ // bump previous export to backlog
+ if e.current != nil {
+ e.previous[e.currentDate] = e.current
+ }
+ e.current = exporter
+ e.currentDate = date
+
+ return nil
+}
+
+func (e *Exporter) withLock(fn func()) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ fn()
+}
+
+func (e *Exporter) withoutLock(fn func()) {
+ e.mu.Unlock()
+ defer e.mu.Lock()
+ fn()
+}
diff --git a/lib/events/export/exporter_test.go b/lib/events/export/exporter_test.go
new file mode 100644
index 0000000000000..436cd3ecccf52
--- /dev/null
+++ b/lib/events/export/exporter_test.go
@@ -0,0 +1,163 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package export
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/require"
+
+ auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
+)
+
+const day = time.Hour * 24
+
+// TestExporterBasics tests the basic functionality of the exporter with and without random flake.
+func TestExporterBasics(t *testing.T) {
+ t.Parallel()
+
+ now := normalizeDate(time.Now())
+ startDate := now.Add(-7 * day)
+
+ for _, randomFlake := range []bool{false, true} {
+
+ // empty case verified export of a time range larger than backlog size with no events in it.
+ t.Run(fmt.Sprintf("case=empty,randomFlake=%v", randomFlake), func(t *testing.T) {
+ t.Parallel()
+ clt := newFakeClient()
+ clt.setRandomFlake(randomFlake)
+
+ testExportAll(t, exportTestCase{
+ clt: clt,
+ startDate: startDate,
+ expected: []*auditlogpb.ExportEventUnstructured{},
+ })
+ })
+
+ // sparse case verifies export of a time range with gaps larger than the backlog size.
+ t.Run(fmt.Sprintf("case=sparse,randomFlake=%v", randomFlake), func(t *testing.T) {
+ t.Parallel()
+
+ clt := newFakeClient()
+ clt.setRandomFlake(randomFlake)
+
+ var allEvents []*auditlogpb.ExportEventUnstructured
+ allEvents = append(allEvents, addEvents(t, clt, startDate, 1, 1)...)
+ allEvents = append(allEvents, addEvents(t, clt, startDate.Add(4*day), 3, 2)...)
+
+ testExportAll(t, exportTestCase{
+ clt: clt,
+ startDate: startDate,
+ expected: allEvents,
+ })
+ })
+
+ // dense case verifies export of a time range with many events in every date.
+ t.Run(fmt.Sprintf("case=dense,randomFlake=%v", randomFlake), func(t *testing.T) {
+ t.Parallel()
+
+ clt := newFakeClient()
+ clt.setRandomFlake(randomFlake)
+
+ var allEvents []*auditlogpb.ExportEventUnstructured
+ allEvents = append(allEvents, addEvents(t, clt, startDate, 100, 1)...)
+ allEvents = append(allEvents, addEvents(t, clt, startDate.Add(day), 50, 2)...)
+ allEvents = append(allEvents, addEvents(t, clt, startDate.Add(2*day), 5, 20)...)
+ allEvents = append(allEvents, addEvents(t, clt, startDate.Add(3*day), 20, 5)...)
+ allEvents = append(allEvents, addEvents(t, clt, startDate.Add(4*day), 14, 7)...)
+ allEvents = append(allEvents, addEvents(t, clt, startDate.Add(5*day), 7, 14)...)
+ allEvents = append(allEvents, addEvents(t, clt, startDate.Add(6*day), 1, 100)...)
+
+ testExportAll(t, exportTestCase{
+ clt: clt,
+ startDate: startDate,
+ expected: allEvents,
+ })
+ })
+ }
+}
+
+// addEvents is a helper for generating events in tests. It both inserts the specified event chunks/counts into the fake client
+// and returns the generated events for comparison.
+func addEvents(t *testing.T, clt *fakeClient, date time.Time, chunks, eventsPerChunk int) []*auditlogpb.ExportEventUnstructured {
+ var allEvents []*auditlogpb.ExportEventUnstructured
+ for i := 0; i < chunks; i++ {
+ chunk := makeEventChunk(t, date, eventsPerChunk)
+ allEvents = append(allEvents, chunk...)
+ clt.addChunk(date.Format(time.DateOnly), uuid.NewString(), chunk)
+ }
+
+ return allEvents
+}
+
+type exportTestCase struct {
+ clt Client
+ startDate time.Time
+ expected []*auditlogpb.ExportEventUnstructured
+}
+
+// testExportAll verifies that the expected events are exported by the exporter given
+// the supplied client state.
+func testExportAll(t *testing.T, tc exportTestCase) {
+ var exportedMu sync.Mutex
+ var exported []*auditlogpb.ExportEventUnstructured
+
+ exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error {
+ exportedMu.Lock()
+ defer exportedMu.Unlock()
+ exported = append(exported, event)
+ return nil
+ }
+
+ getExported := func() []*auditlogpb.ExportEventUnstructured {
+ exportedMu.Lock()
+ defer exportedMu.Unlock()
+ return append([]*auditlogpb.ExportEventUnstructured(nil), exported...)
+ }
+
+ var idleOnce sync.Once
+ idleCh := make(chan struct{})
+
+ exporter, err := NewExporter(ExporterConfig{
+ Client: tc.clt,
+ StartDate: tc.startDate,
+ Export: exportFn,
+ OnIdle: func(_ context.Context) { idleOnce.Do(func() { close(idleCh) }) },
+ Concurrency: 2,
+ BacklogSize: 2,
+ MaxBackoff: 600 * time.Millisecond,
+ PollInterval: 200 * time.Millisecond,
+ })
+ require.NoError(t, err)
+ defer exporter.Close()
+
+ timeout := time.After(30 * time.Second)
+ select {
+ case <-idleCh:
+ case <-timeout:
+ require.FailNow(t, "timeout waiting for exporter to become idle")
+ }
+
+ require.ElementsMatch(t, tc.expected, getExported())
+}
diff --git a/lib/events/export/helpers.go b/lib/events/export/helpers.go
new file mode 100644
index 0000000000000..35f7fe0997fd5
--- /dev/null
+++ b/lib/events/export/helpers.go
@@ -0,0 +1,27 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package export
+
+import "time"
+
+// normalizeDate normalizes a timestamp to the beginning of the day in UTC.
+func normalizeDate(t time.Time) time.Time {
+ t = t.UTC()
+ return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
+}
diff --git a/tool/tctl/common/loadtest_command.go b/tool/tctl/common/loadtest_command.go
index 2aa72c816dee1..a764d954103db 100644
--- a/tool/tctl/common/loadtest_command.go
+++ b/tool/tctl/common/loadtest_command.go
@@ -19,6 +19,7 @@ package common
import (
"context"
"fmt"
+ "log/slog"
"os"
"os/signal"
"runtime"
@@ -31,12 +32,12 @@ import (
"github.com/google/uuid"
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
- "google.golang.org/protobuf/types/known/timestamppb"
auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/cache"
+ "github.com/gravitational/teleport/lib/events/export"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/utils"
)
@@ -83,7 +84,7 @@ func (c *LoadtestCommand) Initialize(app *kingpin.Application, config *servicecf
c.auditEvents = loadtest.Command("export-audit-events", "Bulk export audit events").Hidden()
c.auditEvents.Flag("date", "Date to dump events for").StringVar(&c.date)
- c.auditEvents.Flag("cursor", "Cursor to start from").StringVar(&c.cursor)
+ c.auditEvents.Flag("cursor", "Specify an optional cursor directory").StringVar(&c.cursor)
}
// TryRun takes the CLI command as an argument (like "loadtest node-heartbeats") and executes it.
@@ -303,6 +304,9 @@ func printEvent(ekind string, rsc types.Resource) {
}
func (c *LoadtestCommand) AuditEvents(ctx context.Context, client *authclient.Client) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
date := time.Now()
if c.date != "" {
var err error
@@ -315,6 +319,8 @@ func (c *LoadtestCommand) AuditEvents(ctx context.Context, client *authclient.Cl
outch := make(chan *auditlogpb.ExportEventUnstructured, 1024)
defer close(outch)
+ var eventsProcessed atomic.Uint64
+
go func() {
for event := range outch {
s, err := utils.FastMarshal(event.Event.Unstructured)
@@ -322,96 +328,77 @@ func (c *LoadtestCommand) AuditEvents(ctx context.Context, client *authclient.Cl
panic(err)
}
fmt.Println(string(s))
+ eventsProcessed.Add(1)
}
}()
- chunksProcessed := make(map[string]struct{})
+ exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error {
+ select {
+ case outch <- event:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ return nil
+ }
-Outer:
- for {
- chunks := client.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{
- Date: timestamppb.New(date),
+ var cursor *export.Cursor
+ if c.cursor != "" {
+ var err error
+ cursor, err = export.NewCursor(export.CursorConfig{
+ Dir: c.cursor,
})
+ if err != nil {
+ return trace.Wrap(err)
+ }
+ }
- Chunks:
- for chunks.Next() {
- if _, ok := chunksProcessed[chunks.Item().Chunk]; ok {
- log.WithFields(log.Fields{
- "date": date.Format(time.DateOnly),
- "chunk": chunks.Item().Chunk,
- }).Info("skipping already processed chunk")
- continue Chunks
- }
-
- var cursor string
- ProcessChunk:
- for {
-
- eventStream := client.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{
- Date: timestamppb.New(date),
- Chunk: chunks.Item().Chunk,
- Cursor: cursor,
- })
+ var state export.ExporterState
+ if cursor != nil {
+ state = cursor.GetState()
+ }
- Events:
- for eventStream.Next() {
- cursor = eventStream.Item().Cursor
- select {
- case outch <- eventStream.Item():
- continue Events
- default:
- log.Warn("backpressure in event stream")
- }
+ exporter, err := export.NewExporter(export.ExporterConfig{
+ Client: client,
+ StartDate: date,
+ PreviousState: state,
+ Export: exportFn,
+ Concurrency: 3,
+ BacklogSize: 1,
+ })
+ if err != nil {
+ return trace.Wrap(err)
+ }
+ defer exporter.Close()
- select {
- case outch <- eventStream.Item():
- case <-ctx.Done():
- return nil
- }
- }
+ if cursor != nil {
+ go func() {
+ syncTicker := time.NewTicker(time.Millisecond * 666)
+ defer syncTicker.Stop()
- if err := eventStream.Done(); err != nil {
- log.WithFields(log.Fields{
- "date": date.Format(time.DateOnly),
- "chunk": chunks.Item().Chunk,
- "error": err,
- }).Error("event stream failed, will attempt to reestablish")
- continue ProcessChunk
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-syncTicker.C:
+ cursor.Sync(exporter.GetState())
}
-
- chunksProcessed[chunks.Item().Chunk] = struct{}{}
- break ProcessChunk
}
- }
+ }()
+ }
- if err := chunks.Done(); err != nil {
- log.WithFields(log.Fields{
- "date": date.Format(time.DateOnly),
- "error": err,
- }).Error("event chunk stream failed, will attempt to reestablish")
- continue Outer
- }
+ logTicker := time.NewTicker(time.Minute)
- nextDate := date.AddDate(0, 0, 1)
- if nextDate.After(time.Now()) {
- delay := utils.SeventhJitter(time.Second * 7)
- log.WithFields(log.Fields{
- "date": date.Format(time.DateOnly),
- "delay": delay,
- }).Info("finished processing known event chunks for current date, will re-poll after delay")
- select {
- case <-time.After(delay):
- case <-ctx.Done():
- return nil
- }
- continue Outer
+ var prevEventsProcessed uint64
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-logTicker.C:
+ processed := eventsProcessed.Load()
+ slog.InfoContext(ctx, "event processing", "total", processed, "per_minute", processed-prevEventsProcessed)
+ prevEventsProcessed = processed
+ case <-exporter.Done():
+ return trace.Errorf("exporter exited unexpected with state: %+v", exporter.GetState())
}
-
- log.WithFields(log.Fields{
- "date": date.Format(time.DateOnly),
- "next": nextDate.Format(time.DateOnly),
- }).Info("finished processing known event chunks for historical date, moving to next")
- date = nextDate
- clear(chunksProcessed)
}
}