Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use time-based event index for app session events #38495

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,9 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod
return nil, trace.Wrap(err)
}
return &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(l.Tablename),
Item: av,
TableName: aws.String(l.Tablename),
ConditionExpression: aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this break existing environments when auth is upgraded but app server is not yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as soon as they update their auth servers and events from older app servers, they will hit this, at least for the first app.session.chunk (which conflicts with app.session.start). As mentioned in the PR description, if the app server serving the requests changes, there will be more conflicts (generating more errors).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we release this check with the patch, customers's auth will be "spammed" with this before app servers are upgraded. As discussed offline, it is preferred to handle this separately and ideally release it in a new major version. By that time, app servers should have been upgraded with this patch already and we can likely catch other events that have the same problem during the release testing.

Though, since this check does not block using the app, and we are losing events before app servers are upgraded anyway, I don't mind if we really want to release this together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer having an error log to silently overwriting audit events (current behavior).

@codingllama, Any thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I don't have enough context to have a strong opinion here.

}, nil
}

Expand Down
38 changes: 38 additions & 0 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import (
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/test"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -434,6 +436,42 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
}
}

// TestEmitSessionEventsSameIndex given events that share the same session ID
// and index, the emit should fail, avoiding any event to get overwritten.
func TestEmitSessionEventsSameIndex(t *testing.T) {
ctx := context.Background()
tt := setupDynamoContext(t)
sessionID := session.NewID()

require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0)))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
}

func generateEvent(sessionID session.ID, index int64) apievents.AuditEvent {
return &apievents.AppSessionChunk{
Metadata: apievents.Metadata{
Type: events.AppSessionChunkEvent,
Code: events.AppSessionChunkCode,
ClusterName: "root",
Index: index,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: uuid.New().String(),
ServerNamespace: apidefaults.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: sessionID.String(),
},
AppMetadata: apievents.AppMetadata{
AppURI: "nginx",
AppPublicAddr: "https://nginx",
AppName: "nginx",
},
SessionChunkID: uuid.New().String(),
}
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randStringAlpha(n int) string {
Expand Down
5 changes: 5 additions & 0 deletions lib/events/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type Config struct {

// BackoffDuration is a duration of the backoff before the next try.
BackoffDuration time.Duration

// StartTime represents the time the recorder started. If not zero, this
// value is used to generate the events index.
StartTime time.Time
}

// New returns a [events.SessionPreparerRecorder]. If session recording is disabled,
Expand All @@ -112,6 +116,7 @@ func New(cfg Config) (events.SessionPreparerRecorder, error) {
Clock: cfg.Clock,
UID: cfg.UID,
ClusterName: cfg.ClusterName,
StartTime: cfg.StartTime,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
16 changes: 14 additions & 2 deletions lib/events/setter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -60,6 +61,10 @@ type PreparerConfig struct {

// ClusterName defines the name of this teleport cluster.
ClusterName string

// StartTime represents the time the recorder started. If not zero, this
// value is used to generate the events index.
StartTime time.Time
}

// CheckAndSetDefaults checks and sets defaults
Expand Down Expand Up @@ -110,8 +115,7 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr
srv.SetServerID(c.cfg.ServerID)
}

// ensure index is incremented and loaded atomically
event.SetIndex(int64(c.eventIndex.Add(1) - 1))
event.SetIndex(c.nextIndex())

preparedEvent := preparedSessionEvent{
event: event,
Expand All @@ -135,6 +139,14 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr
return preparedEvent, nil
}

func (c *Preparer) nextIndex() int64 {
if !c.cfg.StartTime.IsZero() {
gabrielcorado marked this conversation as resolved.
Show resolved Hide resolved
return c.cfg.Clock.Since(c.cfg.StartTime).Nanoseconds()
}

return int64(c.eventIndex.Add(1) - 1)
}

type preparedSessionEvent struct {
event apievents.AuditEvent
}
Expand Down
123 changes: 123 additions & 0 deletions lib/events/setter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package events

import (
"testing"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"

apidefaults "github.com/gravitational/teleport/api/defaults"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
)

func TestPreparerIncrementalIndex(t *testing.T) {
sessionID := session.NewID()
preparer, err := NewPreparer(PreparerConfig{
SessionID: sessionID,
ClusterName: "root",
})
require.NoError(t, err)

for i := 0; i < 10; i++ {
e, err := preparer.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
require.Equal(t, int64(i), e.GetAuditEvent().GetIndex(), "unexpected event index")
}
}

func TestPreparerTimeBasedIndex(t *testing.T) {
clock := clockwork.NewRealClock()
preparer, err := NewPreparer(PreparerConfig{
SessionID: session.NewID(),
ServerID: uuid.New().String(),
ClusterName: "root",
Clock: clock,
StartTime: clock.Now(),
})
require.NoError(t, err)

var lastIndex int64
for i := 0; i < 9; i++ {
e, err := preparer.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
require.Greater(t, e.GetAuditEvent().GetIndex(), lastIndex, "expected a larger index")
lastIndex = e.GetAuditEvent().GetIndex()
}
}

func TestPreparerTimeBasedIndexCollisions(t *testing.T) {
serverID := uuid.New().String()
sessionID := session.NewID()
clusterName := "root"
clock := clockwork.NewRealClock()
loginTime := clock.Now()

preparerOne, err := NewPreparer(PreparerConfig{
SessionID: sessionID,
ServerID: serverID,
ClusterName: clusterName,
Clock: clock,
StartTime: loginTime,
})
require.NoError(t, err)

preparerTwo, err := NewPreparer(PreparerConfig{
SessionID: sessionID,
ServerID: serverID,
ClusterName: clusterName,
Clock: clock,
StartTime: loginTime,
})
require.NoError(t, err)

for i := 0; i < 9; i++ {
evtOne, err := preparerOne.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
idxOne := evtOne.GetAuditEvent().GetIndex()

evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
idxTwo := evtTwo.GetAuditEvent().GetIndex()

require.NotEqual(t, idxOne, idxTwo)
gabrielcorado marked this conversation as resolved.
Show resolved Hide resolved
require.Greater(t, idxTwo, idxOne)
}
}

func generateEvent() apievents.AuditEvent {
return &apievents.AppSessionChunk{
Metadata: apievents.Metadata{
Type: AppSessionChunkEvent,
Code: AppSessionChunkCode,
ClusterName: "root",
},
ServerMetadata: apievents.ServerMetadata{
ServerID: uuid.NewString(),
ServerNamespace: apidefaults.Namespace,
},
AppMetadata: apievents.AppMetadata{
AppURI: "nginx",
AppPublicAddr: "https://nginx",
AppName: "nginx",
},
SessionChunkID: uuid.NewString(),
}
}
32 changes: 29 additions & 3 deletions lib/srv/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package app
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"net"
"net/http"
Expand Down Expand Up @@ -774,6 +775,10 @@ func (s *Server) handleConnection(conn net.Conn) (func(), error) {
}
}

// Add user certificate into the context after the monitor connection
// initialization to ensure value is present on the context.
ctx = authz.ContextWithUserCertificate(ctx, certFromConn(tlsConn))

// Application access supports plain TCP connections which are handled
// differently than HTTP requests from web apps.
if app.IsTCP() {
Expand Down Expand Up @@ -912,7 +917,7 @@ func (s *Server) serveSession(w http.ResponseWriter, r *http.Request, identity *
// minutes. Used to stream session chunks to the Audit Log.
ttl := min(identity.Expires.Sub(s.c.Clock.Now()), 5*time.Minute)
session, err := utils.FnCacheGetWithTTL(r.Context(), s.cache, identity.RouteToApp.SessionID, ttl, func(ctx context.Context) (*sessionChunk, error) {
session, err := s.newSessionChunk(ctx, identity, app, opts...)
session, err := s.newSessionChunk(ctx, identity, app, s.sessionStartTime(r.Context()), opts...)
return session, trace.Wrap(err)
})
if err != nil {
Expand Down Expand Up @@ -1101,10 +1106,10 @@ func (s *Server) newHTTPServer(clusterName string) *http.Server {
// newTCPServer creates a server that proxies TCP applications.
func (s *Server) newTCPServer() (*tcpServer, error) {
return &tcpServer{
newAudit: func(sessionID string) (common.Audit, error) {
newAudit: func(ctx context.Context, sessionID string) (common.Audit, error) {
// Audit stream is using server context, not session context,
// to make sure that session is uploaded even after it is closed.
rec, err := s.newSessionRecorder(s.closeContext, sessionID)
rec, err := s.newSessionRecorder(s.closeContext, s.sessionStartTime(ctx), sessionID)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1139,6 +1144,17 @@ func (s *Server) getProxyPort() string {
return port
}

// sessionStartTime fetches the session start time based on the the certificate
// valid date.
func (s *Server) sessionStartTime(ctx context.Context) time.Time {
if userCert, err := authz.UserCertificateFromContext(ctx); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we use the certificate NotBefore because we want this to be deterministic for a user, correct?

It would be nice if we could just draw a random number - it's simple, less prone to time clashes (although with nanos precision that is quite difficult) and we could use that to guarantee that this number always exists, dropping the old logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed interesting approach. Chunks are every five minutes if I recall. So using nanoseconds ensures chunks are relatively in order. However in HA case, time on different app servers may not be perfectly in sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we use the certificate NotBefore because we want this to be deterministic for a user, correct?

The idea is to keep the index characteristics as an always-growing number. Since the app sessions are stateless, I relied on the certificate NotBefore time since it is consistent during the entire session.

It would be nice if we could just draw a random number

The random numbers would work here because our events storage doesn't rely on the index to sort (they use the event timestamps). However, it is not guaranteed that the index will always increase within the session. If we're okay with changing this index characteristic, I don't see any problem using a random number.

However in HA case, time on different app servers may not be perfectly in sync.

We can still have collisions in that case, although they will be rare (given that we're using nanoseconds elapsed time). The solution for this would be to consider the ServerID while generating the index, which is similar to what Snowflake IDs do, or set the index on the proxy/auth servers.

return userCert.NotBefore
}

s.log.Warn("Unable to retrieve session start time from certificate.")
return time.Time{}
}

// CopyAndConfigureTLS can be used to copy and modify an existing *tls.Config
// for Teleport application proxy servers.
func CopyAndConfigureTLS(log logrus.FieldLogger, client auth.AccessCache, config *tls.Config) *tls.Config {
Expand Down Expand Up @@ -1187,3 +1203,13 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl
return tlsCopy, nil
}
}

// certFromConnState returns the leaf certificate from the connection.
func certFromConn(tlsConn *tls.Conn) *x509.Certificate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func certFromConn(tlsConn *tls.Conn) *x509.Certificate {
func leafCertFromConn(tlsConn *tls.Conn) *x509.Certificate {

state := tlsConn.ConnectionState()
if len(state.PeerCertificates) == 0 {
return nil
}

return state.PeerCertificates[0]
}
8 changes: 3 additions & 5 deletions lib/srv/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,6 @@ func TestRequestAuditEvents(t *testing.T) {
Type: events.AppSessionChunkEvent,
Code: events.AppSessionChunkCode,
ClusterName: "root.example.com",
Index: 0,
},
AppMetadata: apievents.AppMetadata{
AppURI: app.Spec.URI,
Expand All @@ -1023,7 +1022,7 @@ func TestRequestAuditEvents(t *testing.T) {
expectedEvent,
event,
cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"),
cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"),
))
case events.AppSessionRequestEvent:
Expand All @@ -1033,7 +1032,6 @@ func TestRequestAuditEvents(t *testing.T) {
Type: events.AppSessionRequestEvent,
Code: events.AppSessionRequestCode,
ClusterName: "root.example.com",
Index: 1,
},
AppMetadata: apievents.AppMetadata{
AppURI: app.Spec.URI,
Expand All @@ -1048,7 +1046,7 @@ func TestRequestAuditEvents(t *testing.T) {
expectedEvent,
event,
cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"),
cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"),
))
}
Expand Down Expand Up @@ -1101,7 +1099,7 @@ func TestRequestAuditEvents(t *testing.T) {
expectedEvent,
searchEvents[0],
cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"),
cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"),
))
}
Expand Down
Loading
Loading