diff --git a/lib/backend/firestore/firestorebk.go b/lib/backend/firestore/firestorebk.go index 3f406b863aa77..4790e4ae08b27 100644 --- a/lib/backend/firestore/firestorebk.go +++ b/lib/backend/firestore/firestorebk.go @@ -46,6 +46,8 @@ import ( "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/defaults" + "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/interval" ) func init() { @@ -190,6 +192,7 @@ func newRecord(from backend.Item, clock clockwork.Clock) record { return r } +// TODO(tigrato|rosstimothy): Simplify this function by removing the brokenRecord and legacyRecord struct in 19.0.0 func newRecordFromDoc(doc *firestore.DocumentSnapshot) (*record, error) { k, err := doc.DataAt(keyDocProperty) if err != nil { @@ -413,7 +416,28 @@ func New(ctx context.Context, params backend.Params, options Options) (*Backend, go RetryingAsyncFunctionRunner(b.clientContext, linearConfig, b.logger.With("task_name", "purged_expired_documents"), b.purgeExpiredDocuments) } + // Migrate incorrect key types to the correct type. + // TODO(tigrato|rosstimothy): DELETE in 19.0.0 + go func() { + migrationInterval := interval.New(interval.Config{ + Duration: time.Hour * 12, + FirstDuration: utils.FullJitter(time.Minute * 5), + Jitter: retryutils.NewSeventhJitter(), + Clock: b.clock, + }) + defer migrationInterval.Stop() + for { + select { + case <-migrationInterval.Next(): + b.migrateIncorrectKeyTypes() + case <-b.clientContext.Done(): + return + } + } + }() + l.InfoContext(b.clientContext, "Backend created.") + return b, nil } @@ -640,7 +664,6 @@ func (b *Backend) CompareAndSwap(ctx context.Context, expected backend.Item, rep return nil }, firestore.MaxAttempts(maxTxnAttempts)) - if err != nil { if status.Code(err) == codes.Aborted { // RunTransaction does not officially document what error is returned if MaxAttempts is exceeded, @@ -705,7 +728,6 @@ func (b *Backend) ConditionalDelete(ctx context.Context, key backend.Key, rev st return nil }, firestore.MaxAttempts(maxTxnAttempts)) - if err != nil { if status.Code(err) == codes.Aborted { // RunTransaction does not officially document what error is returned if MaxAttempts is exceeded, @@ -772,7 +794,6 @@ func (b *Backend) ConditionalUpdate(ctx context.Context, item backend.Item) (*ba return nil }, firestore.MaxAttempts(maxTxnAttempts)) - if err != nil { if status.Code(err) == codes.Aborted { // RunTransaction does not officially document what error is returned if MaxAttempts is exceeded, diff --git a/lib/backend/firestore/firestorebk_test.go b/lib/backend/firestore/firestorebk_test.go index 1fa013661053f..315d4d8f2688f 100644 --- a/lib/backend/firestore/firestorebk_test.go +++ b/lib/backend/firestore/firestorebk_test.go @@ -20,6 +20,7 @@ package firestore import ( "context" + "encoding/base64" "errors" "fmt" "log/slog" @@ -454,3 +455,59 @@ func TestDeleteDocuments(t *testing.T) { } } + +// TestFirestoreMigration tests the migration of incorrect key types in Firestore. +// TODO(tigrato|rosstimothy): DELETE In 19.0.0: Remove this migration in 19.0.0. +func TestFirestoreMigration(t *testing.T) { + cfg := firestoreParams() + ensureTestsEnabled(t) + ensureEmulatorRunning(t, cfg) + + clock := clockwork.NewRealClock() + + uut, err := New(context.Background(), cfg, Options{Clock: clock}) + require.NoError(t, err) + + type byteAlias []byte + type badRecord struct { + Key byteAlias `firestore:"key,omitempty"` + Timestamp int64 `firestore:"timestamp,omitempty"` + Expires int64 `firestore:"expires,omitempty"` + Value []byte `firestore:"value,omitempty"` + RevisionV2 string `firestore:"revision,omitempty"` + RevisionV1 string `firestore:"-"` + } + + for i := 0; i < 301; i++ { + key := []byte(fmt.Sprintf("test-%d", i)) + _, err = uut.svc.Collection(uut.CollectionName). + Doc(base64.URLEncoding.EncodeToString(key)). + Set(context.Background(), &badRecord{ + Key: key, + Timestamp: clock.Now().UTC().Unix(), + Expires: clock.Now().Add(time.Minute).UTC().Unix(), + Value: key, + RevisionV2: "v2", + }) + require.NoError(t, err) + } + + // Migrate the collection + uut.migrateIncorrectKeyTypes() + + // Ensure that all incorrect key types have been migrated + docs, err := uut.svc.Collection(uut.CollectionName). + Where(keyDocProperty, ">", byteAlias("/")). + Limit(100). + Documents(context.Background()).GetAll() + require.NoError(t, err) + + require.Empty(t, docs, "expected all incorrect key types to be migrated") + + // Ensure that all incorrect key types have been migrated to the correct key type []byte + docs, err = uut.svc.Collection(uut.CollectionName). + Where(keyDocProperty, ">", []byte("/")). + Documents(context.Background()).GetAll() + require.NoError(t, err) + require.Len(t, docs, 301) +} diff --git a/lib/backend/firestore/migration.go b/lib/backend/firestore/migration.go new file mode 100644 index 0000000000000..23701053b0b97 --- /dev/null +++ b/lib/backend/firestore/migration.go @@ -0,0 +1,166 @@ +/* + * Teleport + * Copyright (C) 2023 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package firestore + +import ( + "context" + "log/slog" + "time" + + "cloud.google.com/go/firestore" + "github.com/gravitational/trace" + + "github.com/gravitational/teleport/lib/backend" +) + +// migrateIncorrectKeyTypes migrates incorrect key types (backend.Key and string) to the correct type (bytes) +// in the backend. This is necessary because the backend was incorrectly storing keys as strings and backend.Key +// types and Firestore clients mapped them to different database types. This forces calling ReadRange 3 times. +// This migration will fix the issue by converting all keys to the correct type (bytes). +// TODO(tigrato|rosstimothy): DELETE In 19.0.0: Remove this migration in 19.0.0. +func (b *Backend) migrateIncorrectKeyTypes() { + var ( + numberOfDocsMigrated int + duration time.Duration + ) + err := backend.RunWhileLocked( + b.clientContext, + backend.RunWhileLockedConfig{ + LockConfiguration: backend.LockConfiguration{ + LockNameComponents: []string{"firestore_migrate_incorrect_key_types"}, + Backend: b, + TTL: 5 * time.Minute, + RetryInterval: time.Minute, + }, + ReleaseCtxTimeout: 10 * time.Second, + RefreshLockInterval: time.Minute, + }, + func(ctx context.Context) error { + start := time.Now() + defer func() { + duration = time.Since(start) + }() + // backend.Key is converted to array of ints when sending to the db. + toArray := func(key []byte) []any { + arrKey := make([]any, len(key)) + for i, b := range key { + arrKey[i] = int(b) + } + return arrKey + } + nDocs, err := migrateKeyType[[]any](ctx, b, toArray) + numberOfDocsMigrated += nDocs + if err != nil { + return trace.Wrap(err, "failed to migrate backend key") + } + + stringKey := func(key []byte) string { + return string(key) + } + nDocs, err = migrateKeyType[string](ctx, b, stringKey) + numberOfDocsMigrated += nDocs + if err != nil { + return trace.Wrap(err, "failed to migrate legacy key") + } + return nil + }) + + entry := b.logger.With( + slog.Duration("duration", duration), + slog.Int("migrated", numberOfDocsMigrated), + ) + if err != nil { + entry.ErrorContext(b.clientContext, "Failed to migrate incorrect key types.", "error", err) + return + } + entry.InfoContext(b.clientContext, "Successfully migrated incorrect key types.") +} + +func migrateKeyType[T any](ctx context.Context, b *Backend, newKey func([]byte) T) (int, error) { + limit := 300 + startKey := newKey([]byte("/")) + + bulkWriter := b.svc.BulkWriter(b.clientContext) + defer bulkWriter.End() + + nDocs := 0 + // handle the migration in batches of 300 documents per second + t := time.NewTimer(time.Second) + defer t.Stop() + for { + + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-t.C: + } + + docs, err := b.svc.Collection(b.CollectionName). + // passing the key type here forces the client to map the key to the underlying type + // and return all the keys in that share the same underlying type. + // backend.Key is mapped to Array in Firestore. + // []byte is mapped to Bytes in Firestore. + // string is mapped to String in Firestore. + // Searching for keys with the same underlying type will return all keys with the same type. + Where(keyDocProperty, ">", startKey). + Limit(limit). + Documents(ctx).GetAll() + if err != nil { + return nDocs, trace.Wrap(err) + } + + jobs := make([]*firestore.BulkWriterJob, len(docs)) + for i, dbDoc := range docs { + newDoc, err := newRecordFromDoc(dbDoc) + if err != nil { + return nDocs, trace.Wrap(err, "failed to convert document") + } + + // use conditional update to ensure that the document has not been updated since the read + jobs[i], err = bulkWriter.Update( + b.svc.Collection(b.CollectionName). + Doc(b.keyToDocumentID(newDoc.Key)), + newDoc.updates(), + firestore.LastUpdateTime(dbDoc.UpdateTime), + ) + if err != nil { + return nDocs, trace.Wrap(err, "failed stream bulk action") + } + + startKey = newKey(newDoc.Key) // update start key + } + + bulkWriter.Flush() // flush the buffer + + for _, job := range jobs { + if _, err := job.Results(); err != nil { + // log the error and continue + b.logger.ErrorContext(ctx, "failed to write bulk action", "error", err) + } + } + + nDocs += len(docs) + if len(docs) < limit { + break + } + + t.Reset(time.Second) + } + return nDocs, nil +}