-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use time-based event index for app session events #38495
Changes from 5 commits
9dcd990
4a4465e
b4bbae6
e8ba6da
3ba4322
84b0819
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
// Teleport | ||
// Copyright (C) 2024 Gravitational, Inc. | ||
// | ||
// This program is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU Affero General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
// | ||
// This program is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU Affero General Public License for more details. | ||
// | ||
// You should have received a copy of the GNU Affero General Public License | ||
// along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
package events | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/jonboulle/clockwork" | ||
"github.com/stretchr/testify/require" | ||
|
||
apidefaults "github.com/gravitational/teleport/api/defaults" | ||
apievents "github.com/gravitational/teleport/api/types/events" | ||
"github.com/gravitational/teleport/lib/session" | ||
) | ||
|
||
func TestPreparerIncrementalIndex(t *testing.T) { | ||
sessionID := session.NewID() | ||
preparer, err := NewPreparer(PreparerConfig{ | ||
SessionID: sessionID, | ||
ClusterName: "root", | ||
}) | ||
require.NoError(t, err) | ||
|
||
for i := 0; i < 10; i++ { | ||
e, err := preparer.PrepareSessionEvent(generateEvent()) | ||
require.NoError(t, err) | ||
require.Equal(t, int64(i), e.GetAuditEvent().GetIndex(), "unexpected event index") | ||
} | ||
} | ||
|
||
func TestPreparerTimeBasedIndex(t *testing.T) { | ||
clock := clockwork.NewFakeClock() | ||
preparer, err := NewPreparer(PreparerConfig{ | ||
SessionID: session.NewID(), | ||
ServerID: uuid.New().String(), | ||
ClusterName: "root", | ||
Clock: clock, | ||
StartTime: clock.Now(), | ||
}) | ||
require.NoError(t, err) | ||
|
||
var lastIndex int64 | ||
for i := 0; i < 9; i++ { | ||
clock.Advance(time.Second) | ||
e, err := preparer.PrepareSessionEvent(generateEvent()) | ||
require.NoError(t, err) | ||
require.Greater(t, e.GetAuditEvent().GetIndex(), lastIndex, "expected a larger index") | ||
lastIndex = e.GetAuditEvent().GetIndex() | ||
} | ||
} | ||
|
||
func TestPreparerTimeBasedIndexCollisions(t *testing.T) { | ||
serverID := uuid.New().String() | ||
sessionID := session.NewID() | ||
clusterName := "root" | ||
clock := clockwork.NewFakeClock() | ||
loginTime := clock.Now() | ||
|
||
preparerOne, err := NewPreparer(PreparerConfig{ | ||
SessionID: sessionID, | ||
ServerID: serverID, | ||
ClusterName: clusterName, | ||
Clock: clock, | ||
StartTime: loginTime, | ||
}) | ||
require.NoError(t, err) | ||
|
||
preparerTwo, err := NewPreparer(PreparerConfig{ | ||
SessionID: sessionID, | ||
ServerID: serverID, | ||
ClusterName: clusterName, | ||
Clock: clock, | ||
StartTime: loginTime, | ||
}) | ||
require.NoError(t, err) | ||
|
||
for i := 0; i < 9; i++ { | ||
clock.Advance(time.Second) | ||
evtOne, err := preparerOne.PrepareSessionEvent(generateEvent()) | ||
require.NoError(t, err) | ||
idxOne := evtOne.GetAuditEvent().GetIndex() | ||
|
||
clock.Advance(time.Second) | ||
evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent()) | ||
require.NoError(t, err) | ||
idxTwo := evtTwo.GetAuditEvent().GetIndex() | ||
|
||
require.NotEqual(t, idxOne, idxTwo) | ||
gabrielcorado marked this conversation as resolved.
Show resolved
Hide resolved
|
||
require.Greater(t, idxTwo, idxOne) | ||
} | ||
} | ||
|
||
func generateEvent() apievents.AuditEvent { | ||
return &apievents.AppSessionChunk{ | ||
Metadata: apievents.Metadata{ | ||
Type: AppSessionChunkEvent, | ||
Code: AppSessionChunkCode, | ||
ClusterName: "root", | ||
}, | ||
ServerMetadata: apievents.ServerMetadata{ | ||
ServerID: uuid.NewString(), | ||
ServerNamespace: apidefaults.Namespace, | ||
}, | ||
AppMetadata: apievents.AppMetadata{ | ||
AppURI: "nginx", | ||
AppPublicAddr: "https://nginx", | ||
AppName: "nginx", | ||
}, | ||
SessionChunkID: uuid.NewString(), | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ package app | |
import ( | ||
"context" | ||
"crypto/tls" | ||
"crypto/x509" | ||
"errors" | ||
"net" | ||
"net/http" | ||
|
@@ -774,6 +775,10 @@ func (s *Server) handleConnection(conn net.Conn) (func(), error) { | |
} | ||
} | ||
|
||
// Add user certificate into the context after the monitor connection | ||
// initialization to ensure value is present on the context. | ||
ctx = authz.ContextWithUserCertificate(ctx, leafCertFromConn(tlsConn)) | ||
|
||
// Application access supports plain TCP connections which are handled | ||
// differently than HTTP requests from web apps. | ||
if app.IsTCP() { | ||
|
@@ -912,7 +917,7 @@ func (s *Server) serveSession(w http.ResponseWriter, r *http.Request, identity * | |
// minutes. Used to stream session chunks to the Audit Log. | ||
ttl := min(identity.Expires.Sub(s.c.Clock.Now()), 5*time.Minute) | ||
session, err := utils.FnCacheGetWithTTL(r.Context(), s.cache, identity.RouteToApp.SessionID, ttl, func(ctx context.Context) (*sessionChunk, error) { | ||
session, err := s.newSessionChunk(ctx, identity, app, opts...) | ||
session, err := s.newSessionChunk(ctx, identity, app, s.sessionStartTime(r.Context()), opts...) | ||
return session, trace.Wrap(err) | ||
}) | ||
if err != nil { | ||
|
@@ -1101,10 +1106,10 @@ func (s *Server) newHTTPServer(clusterName string) *http.Server { | |
// newTCPServer creates a server that proxies TCP applications. | ||
func (s *Server) newTCPServer() (*tcpServer, error) { | ||
return &tcpServer{ | ||
newAudit: func(sessionID string) (common.Audit, error) { | ||
newAudit: func(ctx context.Context, sessionID string) (common.Audit, error) { | ||
// Audit stream is using server context, not session context, | ||
// to make sure that session is uploaded even after it is closed. | ||
rec, err := s.newSessionRecorder(s.closeContext, sessionID) | ||
rec, err := s.newSessionRecorder(s.closeContext, s.sessionStartTime(ctx), sessionID) | ||
if err != nil { | ||
return nil, trace.Wrap(err) | ||
} | ||
|
@@ -1139,6 +1144,17 @@ func (s *Server) getProxyPort() string { | |
return port | ||
} | ||
|
||
// sessionStartTime fetches the session start time based on the the certificate | ||
// valid date. | ||
func (s *Server) sessionStartTime(ctx context.Context) time.Time { | ||
if userCert, err := authz.UserCertificateFromContext(ctx); err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume we use the certificate NotBefore because we want this to be deterministic for a user, correct? It would be nice if we could just draw a random number - it's simple, less prone to time clashes (although with nanos precision that is quite difficult) and we could use that to guarantee that this number always exists, dropping the old logic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed interesting approach. Chunks are every five minutes if I recall. So using nanoseconds ensures chunks are relatively in order. However in HA case, time on different app servers may not be perfectly in sync. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The idea is to keep the index characteristics as an always-growing number. Since the app sessions are stateless, I relied on the certificate
The random numbers would work here because our events storage doesn't rely on the index to sort (they use the event timestamps). However, it is not guaranteed that the index will always increase within the session. If we're okay with changing this index characteristic, I don't see any problem using a random number.
We can still have collisions in that case, although they will be rare (given that we're using nanoseconds elapsed time). The solution for this would be to consider the ServerID while generating the index, which is similar to what Snowflake IDs do, or set the index on the proxy/auth servers. |
||
return userCert.NotBefore | ||
} | ||
|
||
s.log.Warn("Unable to retrieve session start time from certificate.") | ||
return time.Time{} | ||
} | ||
|
||
// CopyAndConfigureTLS can be used to copy and modify an existing *tls.Config | ||
// for Teleport application proxy servers. | ||
func CopyAndConfigureTLS(log logrus.FieldLogger, client auth.AccessCache, config *tls.Config) *tls.Config { | ||
|
@@ -1187,3 +1203,13 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl | |
return tlsCopy, nil | ||
} | ||
} | ||
|
||
// leafCertFromConn returns the leaf certificate from the connection. | ||
func leafCertFromConn(tlsConn *tls.Conn) *x509.Certificate { | ||
state := tlsConn.ConnectionState() | ||
if len(state.PeerCertificates) == 0 { | ||
return nil | ||
} | ||
|
||
return state.PeerCertificates[0] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this break existing environments when auth is upgraded but app server is not yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as soon as they update their auth servers and events from older app servers, they will hit this, at least for the first
app.session.chunk
(which conflicts withapp.session.start
). As mentioned in the PR description, if the app server serving the requests changes, there will be more conflicts (generating more errors).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we release this check with the patch, customers's auth will be "spammed" with this before app servers are upgraded. As discussed offline, it is preferred to handle this separately and ideally release it in a new major version. By that time, app servers should have been upgraded with this patch already and we can likely catch other events that have the same problem during the release testing.
Though, since this check does not block using the app, and we are losing events before app servers are upgraded anyway, I don't mind if we really want to release this together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer having an error log to silently overwriting audit events (current behavior).
@codingllama, Any thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid I don't have enough context to have a strong opinion here.