Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a retry mechanism to UploadCompleter's session.end checker #41113

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ const (
// valuable info.
CorruptedSessionsDir = "corrupted"

// SessionsWithUnconfirmedSessionEnd is a subdirectory of sessions (/var/lib/teleport/log/upload/unconfirmed_session_ends)
// where session ids where it wasn't possible to confirm the session.end events are placed.
// An incomplete session is a session that was correctly
// uploaded to Auth server or final destination but it wasn't possible to ensure the session
// correctly had the session end event. Upload completer ensures the session.end event when it
// finishes uploading it but if the audit events storage is down, e.g. when database is failing
// but auth is still operational, the session recording will never be available because session
// completer never retries after the first session.end creation failure.
// Teleport uses this directory to track sessions that it couldn't ensure the proper session.end
// event.
SessionsWithUnconfirmedSessionEnd = "unconfirmed_session_ends"

// RecordsDir is an auth server subdirectory with session recordings that is used
// when the auth server is not configured for external cloud storage. It is not
// used by nodes, proxies, or other Teleport services.
Expand Down
81 changes: 76 additions & 5 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package events

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"time"

"github.com/google/uuid"
Expand All @@ -37,6 +39,7 @@ import (
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/observability/metrics"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"
)
Expand All @@ -63,6 +66,10 @@ type UploadCompleterConfig struct {
Clock clockwork.Clock
// ClusterName identifies the originating teleport cluster
ClusterName string
// UnconfirmedSessionEndDir is the directory where the upload completer stores
// the session ids for sessions where it wasn't possible to ensure
// the session.end event because Audit Logs storage wasn't available.
UnconfirmedSessionEndDir string
}

// CheckAndSetDefaults checks and sets default values
Expand All @@ -88,6 +95,9 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
if cfg.Clock == nil {
cfg.Clock = clockwork.NewRealClock()
}
if cfg.UnconfirmedSessionEndDir == "" {
return trace.BadParameter("missing parameter UnconfirmedSessionEndDir")
}
return nil
}

Expand Down Expand Up @@ -209,6 +219,8 @@ func (u *UploadCompleter) PerformPeriodicCheck(ctx context.Context) {

// CheckUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
u.ensureAllPendingsessionsHaveEndEvts(ctx)

uploads, err := u.cfg.Uploader.ListUploads(ctx)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -288,8 +300,15 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
return
case <-u.cfg.Clock.After(2 * time.Minute):
log.Debug("checking for session end event")
if err := u.ensureSessionEndEvent(ctx, uploadData); err != nil {
switch err := u.ensureSessionEndEvent(ctx, uploadData.SessionID); {
case trace.IsNotFound(err), errors.Is(err, errEmptySession):
log.WithError(err).Warning("failed to ensure session end event")
case err != nil:
log.WithError(err).Warning("failed to ensure session end event. Retrying later.")
if err := u.createIncompleteSession(uploadData.SessionID); err != nil {
log.WithError(err).Warning("failed to create pivot file")
}

}
}
}()
Expand All @@ -315,7 +334,24 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
return nil
}

func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData UploadMetadata) error {
func (u *UploadCompleter) ensureAllPendingsessionsHaveEndEvts(ctx context.Context) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (u *UploadCompleter) ensureAllPendingsessionsHaveEndEvts(ctx context.Context) {
func (u *UploadCompleter) ensureAllPendingSessionsHaveEndEvents(ctx context.Context) {

pendingSessionIDs, err := u.scanDirectoryForIncompleteSessions()
if err != nil {
u.log.WithError(err).Warn("Failed to list directory objects")
}

for _, pendingSessionID := range pendingSessionIDs {
if err := u.ensureSessionEndEvent(ctx, pendingSessionID); err == nil {
u.removeIncompleteSession(pendingSessionID)
} else {
u.log.WithError(err).Warning("failed to ensure session end event. Retrying later.")
}
}
}

var errEmptySession = errors.New("empty session")

func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, sessionID session.ID) error {
// at this point, we don't know whether we'll need to emit a session.end or a
// windows.desktop.session.end, but as soon as we see the session start we'll
// be able to start filling in the details
Expand All @@ -326,7 +362,7 @@ func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData
// for both Desktop and SSH sessions, where as the GetSessionEvents API relies on downloading
// a copy of the session and using the SSH-specific index to iterate through events.
var lastEvent events.AuditEvent
evts, errors := u.cfg.AuditLog.StreamSessionEvents(ctx, uploadData.SessionID, 0)
evts, errors := u.cfg.AuditLog.StreamSessionEvents(ctx, sessionID, 0)

loop:
for {
Expand Down Expand Up @@ -386,7 +422,7 @@ loop:
}

if lastEvent == nil {
return trace.Errorf("could not find any events for session %v", uploadData.SessionID)
return trace.Wrap(errEmptySession, "could not find any events for session %v", sessionID)
}

sshSessionEnd.Participants = apiutils.Deduplicate(sshSessionEnd.Participants)
Expand All @@ -403,7 +439,7 @@ loop:
return trace.BadParameter("invalid session, could not find session start")
}

u.log.Infof("emitting %T event for completed session %v", sessionEndEvent, uploadData.SessionID)
u.log.Infof("emitting %T event for completed session %v", sessionEndEvent, sessionID)

sessionEndEvent.SetTime(lastEvent.GetTime())

Expand All @@ -416,3 +452,38 @@ loop:
}
return nil
}

func (u *UploadCompleter) createIncompleteSession(sessionID session.ID) error {
path := filepath.Join(u.cfg.UnconfirmedSessionEndDir, sessionID.String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so we're creating an empty file on disk named after the session ID as in indicator to go back and retry the session.

But then when we retry we have to stream the entire session again to compute what the end event should look like.

Why don't we just write the JSON-encoded end event to this file, so subsequent retries can emit the end event directly rather than having to compute it again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we don't have the event every time.
If the audit log backend is suffering a temporary failure, streaming can fail and we can't even validate if the session already has the session.end event.

Saying that, we need to retry later to:

  1. pull all session events and check whether we have a session.end or not
  2. if audit log failed, create the file and retry later.
  3. if session doesn't have the session.end event, try creating it. If create audit event operation fails we retry later. For this step we can implement the optimization you mentioned but we still need to deal with step 2.

f, err := os.Create(path)
if os.IsExist(err) {
return nil
} else if err != nil {
return trace.ConvertSystemError(err)
}
f.Close()
return nil
}

func (u *UploadCompleter) removeIncompleteSession(sessionID session.ID) {
path := filepath.Join(u.cfg.UnconfirmedSessionEndDir, sessionID.String())
os.Remove(path)
}

func (u *UploadCompleter) scanDirectoryForIncompleteSessions() ([]session.ID, error) {
entries, err := os.ReadDir(u.cfg.UnconfirmedSessionEndDir)
if err != nil {
return nil, trace.Wrap(err)
}
var sessionIDs []session.ID
for _, entry := range entries {
if entry.IsDir() {
continue
}
sessID := session.ID(entry.Name())
if sessID.Check() == nil {
sessionIDs = append(sessionIDs, sessID)
}
}
return sessionIDs, nil
}
87 changes: 59 additions & 28 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"testing"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/client/proto"
Expand Down Expand Up @@ -66,11 +68,12 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
}

uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
UnconfirmedSessionEndDir: t.TempDir(),
})
require.NoError(t, err)

Expand Down Expand Up @@ -100,13 +103,14 @@ func TestUploadCompleterNeedsSemaphore(t *testing.T) {
sessionTrackerService := &mockSessionTrackerService{clock: clock}

uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
CheckPeriod: 3 * time.Minute,
ServerID: "abc123",
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
CheckPeriod: 3 * time.Minute,
ServerID: "abc123",
UnconfirmedSessionEndDir: t.TempDir(),
Semaphores: mockSemaphores{
acquireErr: errors.New("semaphore already taken"),
},
Expand Down Expand Up @@ -147,6 +151,7 @@ func TestUploadCompleterAcquiresSemaphore(t *testing.T) {
},
acquireErr: nil,
},
UnconfirmedSessionEndDir: t.TempDir(),
})
require.NoError(t, err)

Expand All @@ -164,11 +169,13 @@ func TestUploadCompleterAcquiresSemaphore(t *testing.T) {
// that are completed.
func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
for _, test := range []struct {
startEvent apievents.AuditEvent
endEventType string
startEvent apievents.AuditEvent
endEventType string
sessionStreamErr error
}{
{&apievents.SessionStart{}, events.SessionEndEvent},
{&apievents.WindowsDesktopSessionStart{}, events.WindowsDesktopSessionEndEvent},
{startEvent: &apievents.SessionStart{}, endEventType: events.SessionEndEvent},
{startEvent: &apievents.SessionStart{}, endEventType: events.SessionEndEvent, sessionStreamErr: trace.ConnectionProblem(nil, "connection problem")},
{startEvent: &apievents.WindowsDesktopSessionStart{}, endEventType: events.WindowsDesktopSessionEndEvent},
} {
t.Run(test.endEventType, func(t *testing.T) {
clock := clockwork.NewFakeClock()
Expand All @@ -186,17 +193,20 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
&apievents.SessionPrint{Metadata: apievents.Metadata{Time: endTime}},
},
}

log.SetStreamSessionEventsErr(test.sessionStreamErr)
sessionIncomplete := t.TempDir()
uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
Clock: clock,
SessionTracker: &mockSessionTrackerService{},
ClusterName: "teleport-cluster",
Uploader: mu,
AuditLog: log,
Clock: clock,
SessionTracker: &mockSessionTrackerService{},
ClusterName: "teleport-cluster",
UnconfirmedSessionEndDir: sessionIncomplete,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), session.NewID())
sessionID := session.NewID()
upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

// session end events are only emitted if there's at least one
Expand All @@ -211,6 +221,21 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
clock.BlockUntil(1)
clock.Advance(3 * time.Minute)

if test.sessionStreamErr != nil {
// if session stream returned an error, we must ensure the unconfirmed session tracking file
// exists.
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.FileExists(t, filepath.Join(sessionIncomplete, sessionID.String()))
}, 5*time.Second, 100*time.Millisecond)
// reset the error
log.SetStreamSessionEventsErr(nil)
// reset the uploader
mu.Reset()
// re-trigger the upload routine and it will submit the delayed
// session.end event.
err = uc.CheckUploads(context.Background())
assert.NoError(t, err)
}
// expect two events - a session end and a session upload
// the session end is done asynchronously, so wait for that
require.Eventually(t, func() bool { return len(log.Emitter.Events()) == 2 }, 5*time.Second, 1*time.Second,
Expand All @@ -220,6 +245,11 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
require.Equal(t, startTime, log.Emitter.Events()[0].GetTime())
require.Equal(t, test.endEventType, log.Emitter.Events()[1].GetType())
require.Equal(t, endTime, log.Emitter.Events()[1].GetTime())

if test.sessionStreamErr != nil {
// ensure file was removed once session.end event was confirmed.
assert.NoFileExists(t, filepath.Join(sessionIncomplete, sessionID.String()))
}
})
}
}
Expand Down Expand Up @@ -281,11 +311,12 @@ func TestCheckUploadsContinuesOnError(t *testing.T) {
}

uc, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: uploader,
AuditLog: &eventstest.MockAuditLog{},
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
Uploader: uploader,
AuditLog: &eventstest.MockAuditLog{},
SessionTracker: sessionTrackerService,
Clock: clock,
ClusterName: "teleport-cluster",
UnconfirmedSessionEndDir: t.TempDir(),
})
require.NoError(t, err)

Expand Down
22 changes: 19 additions & 3 deletions lib/events/eventstest/mock_auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,39 @@

import (
"context"
"sync"

Check failure on line 24 in lib/events/eventstest/mock_auditlog.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/gravitational/teleport) -s prefix(github.com/gravitational/teleport/integrations/terraform,github.com/gravitational/teleport/integrations/event-handler) --custom-order (gci)
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"

Check failure on line 28 in lib/events/eventstest/mock_auditlog.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/gravitational/teleport) -s prefix(github.com/gravitational/teleport/integrations/terraform,github.com/gravitational/teleport/integrations/event-handler) --custom-order (gci)
)

type MockAuditLog struct {
*events.DiscardAuditLog

Emitter *MockRecorderEmitter
SessionEvents []apievents.AuditEvent
Emitter *MockRecorderEmitter
SessionEvents []apievents.AuditEvent
streamSessionEventsErr error
mu sync.Mutex
}

func (m *MockAuditLog) SetStreamSessionEventsErr(err error) {
m.mu.Lock()
defer m.mu.Unlock()
m.streamSessionEventsErr = err
}

func (m *MockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) {
errors := make(chan error, 1)
events := make(chan apievents.AuditEvent)

m.mu.Lock()
err := m.streamSessionEventsErr
m.mu.Unlock()
if err != nil {
errors <- trace.Wrap(err)
return events, errors
}
go func() {
defer close(events)

Expand Down
Loading
Loading