From b5ca16aca2faf584bb480a0fadb8b439d1b0d578 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Tue, 21 May 2024 14:32:18 -0600 Subject: [PATCH 01/22] backend: add migration tool to migrate between any two backends --- lib/backend/migration/migration.go | 155 ++++++++++++++++++++++++ lib/backend/migration/migration_test.go | 56 +++++++++ tool/teleport/common/migrate.go | 31 +++++ tool/teleport/common/teleport.go | 8 ++ 4 files changed, 250 insertions(+) create mode 100644 lib/backend/migration/migration.go create mode 100644 lib/backend/migration/migration_test.go create mode 100644 tool/teleport/common/migrate.go diff --git a/lib/backend/migration/migration.go b/lib/backend/migration/migration.go new file mode 100644 index 0000000000000..9f84deb81604e --- /dev/null +++ b/lib/backend/migration/migration.go @@ -0,0 +1,155 @@ +package migration + +import ( + "context" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + + "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/utils/retryutils" + "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/trace" +) + +// Migration manages a migration between two [backend.Backend] interfaces. +type Migration struct { + src backend.Backend + dst backend.Backend + parallel int + total int + migrated atomic.Int64 + log logrus.FieldLogger +} + +// MigrationConfig configures a [Migration] with a source and destination backend. +// All items from the source are copied to the destination. All Teleport Auth +// Service instances should be stopped when running a migration to avoid data +// inconsistencies. +type MigrationConfig struct { + // Source is the backend [backend.Config] items are migrated from. + Source backend.Config `yaml:"src"` + // Destination is the [backend.Config] items are migrated to. + Destination backend.Config `yaml:"dst"` + // Parallel is the number of items that will be migraated in parallel. + Parallel int `yaml:"parallel"` + // Log logs the progress of a [Migration] + Log logrus.FieldLogger +} + +// New returns a [Migration] based on the provided [MigrationConfig]. +func New(ctx context.Context, config MigrationConfig) (*Migration, error) { + src, err := backend.New(ctx, config.Source.Type, config.Source.Params) + if err != nil { + return nil, trace.Wrap(err, "failed to create source backend") + } + dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params) + if err != nil { + return nil, trace.Wrap(err, "failed to create destination backend") + } + migration := &Migration{ + src: src, + dst: dst, + parallel: config.Parallel, + log: config.Log, + } + if migration.parallel == 0 { + migration.parallel = 1 + } + if migration.log == nil { + migration.log = logrus.WithField(teleport.ComponentKey, "migration") + } + return nil, nil +} + +// Run runs a [Migration] until complete. +func (m *Migration) Run(ctx context.Context) error { + var all []backend.Item + start := backend.Key("") + err := retry(ctx, 3, func() error { + result, err := m.src.GetRange(ctx, start, backend.RangeEnd(start), 0) + if err != nil { + return trace.Wrap(err) + } + all = result.Items + return nil + }) + if err != nil { + return trace.Wrap(err) + } + m.total = len(all) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(m.parallel) + + logProgress := func() { + m.log.Info("Migrated %d/%d", m.migrated.Load(), m.total) + } + defer logProgress() + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + select { + case <-ticker.C: + logProgress() + case <-ctx.Done(): + return + } + }() + + for _, item := range all { + item := item + group.Go(func() error { + if err := retry(ctx, 3, func() error { + if _, err := m.dst.Put(ctx, item); err != nil { + return trace.Wrap(err) + } + return nil + }); err != nil { + return trace.Wrap(err) + } + m.migrated.Add(1) + return nil + }) + if err := ctx.Err(); err != nil { + break + } + } + + if err := group.Wait(); err != nil { + return trace.Wrap(err) + } + return nil +} + +func retry(ctx context.Context, attempts int, fn func() error) error { + retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ + Driver: retryutils.NewExponentialDriver(time.Millisecond * 100), + Max: time.Second * 2, + }) + if err != nil { + return trace.Wrap(err) + } + if attempts <= 0 { + return trace.Errorf("retry attempts must be > 0") + } + + for i := 0; i < attempts; i++ { + err = fn() + if err == nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-retry.After(): + retry.Inc() + } + } + return trace.Wrap(err) +} diff --git a/lib/backend/migration/migration_test.go b/lib/backend/migration/migration_test.go new file mode 100644 index 0000000000000..717b9921590ef --- /dev/null +++ b/lib/backend/migration/migration_test.go @@ -0,0 +1,56 @@ +package migration + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + + "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/backend/memory" +) + +func TestMigration(t *testing.T) { + ctx := context.Background() + src, err := memory.New(memory.Config{}) + require.NoError(t, err) + + dst, err := memory.New(memory.Config{}) + require.NoError(t, err) + + itemCount := 1111 + items := make([]backend.Item, itemCount) + + for i := 0; i < itemCount; i++ { + item := backend.Item{ + Key: backend.Key(fmt.Sprintf("key-%05d", i)), + Value: []byte(fmt.Sprintf("value-%d", i)), + } + _, err := src.Put(ctx, item) + require.NoError(t, err) + items[i] = item + } + + migration := Migration{ + src: src, + dst: dst, + parallel: 10, + log: logrus.New(), + } + + err = migration.Run(ctx) + require.NoError(t, err) + + start := backend.Key("") + result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0) + require.NoError(t, err) + + diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) + require.Empty(t, diff) + require.Equal(t, itemCount, migration.total) + require.Equal(t, itemCount, int(migration.migrated.Load())) +} diff --git a/tool/teleport/common/migrate.go b/tool/teleport/common/migrate.go new file mode 100644 index 0000000000000..3f999fd146a48 --- /dev/null +++ b/tool/teleport/common/migrate.go @@ -0,0 +1,31 @@ +package common + +import ( + "context" + "os" + + "gopkg.in/yaml.v3" + + "github.com/gravitational/teleport/lib/backend/migration" + "github.com/gravitational/trace" +) + +func runMigration(ctx context.Context, path string) error { + data, err := os.ReadFile(path) + if err != nil { + return trace.Wrap(err) + } + config := migration.MigrationConfig{} + if err := yaml.Unmarshal(data, &config); err != nil { + return trace.Wrap(err) + } + + migration, err := migration.New(ctx, config) + if err != nil { + return trace.Wrap(err) + } + if err := migration.Run(ctx); err != nil { + return trace.Wrap(err) + } + return nil +} diff --git a/tool/teleport/common/teleport.go b/tool/teleport/common/teleport.go index 59d90d41dbf24..91ae0b8d0568c 100644 --- a/tool/teleport/common/teleport.go +++ b/tool/teleport/common/teleport.go @@ -107,8 +107,13 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con remoteForward := app.Command(teleport.RemoteForwardSubCommand, "Used internally by Teleport to re-exec itself to remote port forward.").Hidden() checkHomeDir := app.Command(teleport.CheckHomeDirSubCommand, "Used internally by Teleport to re-exec itself to check access to a directory.").Hidden() park := app.Command(teleport.ParkSubCommand, "Used internally by Teleport to re-exec itself to do nothing.").Hidden() + migrate := app.Command("migrate", "Migrate data between two backends.").Hidden() app.HelpFlag.Short('h') + migrate.Flag("config", "Path to migration config file."). + Short('c'). + StringVar(&ccf.ConfigFile) + // define start flags: start.Flag("debug", "Enable verbose logging to stderr"). Short('d'). @@ -663,6 +668,9 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con err = onGetLogLevel(ccf.ConfigFile) case collectProfilesCmd.FullCommand(): err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds) + err = onIntegrationConfSAMLIdPGCPWorkforce(ccf.IntegrationConfSAMLIdPGCPWorkforceArguments) + case migrate.FullCommand(): + err = runMigration(context.Background(), ccf.ConfigFile) } if err != nil { utils.FatalError(err) From 7647979240153630f59eabedc7aa4496930ae4b2 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Tue, 21 May 2024 14:47:51 -0600 Subject: [PATCH 02/22] cleanup bad rebase --- tool/teleport/common/teleport.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tool/teleport/common/teleport.go b/tool/teleport/common/teleport.go index 91ae0b8d0568c..3efd2dc8d86f7 100644 --- a/tool/teleport/common/teleport.go +++ b/tool/teleport/common/teleport.go @@ -668,7 +668,6 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con err = onGetLogLevel(ccf.ConfigFile) case collectProfilesCmd.FullCommand(): err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds) - err = onIntegrationConfSAMLIdPGCPWorkforce(ccf.IntegrationConfSAMLIdPGCPWorkforceArguments) case migrate.FullCommand(): err = runMigration(context.Background(), ccf.ConfigFile) } From 5d0e833ce0bf4a34ef8e1ea5191de65d6f12af0a Mon Sep 17 00:00:00 2001 From: David Boslee Date: Tue, 21 May 2024 14:53:16 -0600 Subject: [PATCH 03/22] missing return value Co-authored-by: Stephen Levine --- lib/backend/migration/migration.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/migration/migration.go b/lib/backend/migration/migration.go index 9f84deb81604e..a29bcb429cec6 100644 --- a/lib/backend/migration/migration.go +++ b/lib/backend/migration/migration.go @@ -61,7 +61,7 @@ func New(ctx context.Context, config MigrationConfig) (*Migration, error) { if migration.log == nil { migration.log = logrus.WithField(teleport.ComponentKey, "migration") } - return nil, nil + return migration, nil } // Run runs a [Migration] until complete. From 177b4d073be089a1fc33a6c9d40e896408ea2297 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Wed, 22 May 2024 11:47:14 -0600 Subject: [PATCH 04/22] close backends --- lib/backend/migration/migration.go | 17 +++++++++++++++++ tool/teleport/common/migrate.go | 2 ++ 2 files changed, 19 insertions(+) diff --git a/lib/backend/migration/migration.go b/lib/backend/migration/migration.go index a29bcb429cec6..8687928a45055 100644 --- a/lib/backend/migration/migration.go +++ b/lib/backend/migration/migration.go @@ -64,6 +64,23 @@ func New(ctx context.Context, config MigrationConfig) (*Migration, error) { return migration, nil } +func (m *Migration) Close() error { + var errs []error + if m.src != nil { + err := m.src.Close() + if err != nil { + errs = append(errs, err) + } + } + if m.dst != nil { + err := m.dst.Close() + if err != nil { + errs = append(errs, err) + } + } + return trace.NewAggregate(errs...) +} + // Run runs a [Migration] until complete. func (m *Migration) Run(ctx context.Context) error { var all []backend.Item diff --git a/tool/teleport/common/migrate.go b/tool/teleport/common/migrate.go index 3f999fd146a48..d8d5eb5a3f816 100644 --- a/tool/teleport/common/migrate.go +++ b/tool/teleport/common/migrate.go @@ -24,6 +24,8 @@ func runMigration(ctx context.Context, path string) error { if err != nil { return trace.Wrap(err) } + defer migration.Close() + if err := migration.Run(ctx); err != nil { return trace.Wrap(err) } From c8d3cb23c8d803dd24d5a227e888795015850e52 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Thu, 23 May 2024 15:50:38 -0600 Subject: [PATCH 05/22] Use workers and fixed size channel to limit in memory items --- lib/backend/migration/migration.go | 78 ++++++++++++++++--------- lib/backend/migration/migration_test.go | 4 +- 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/lib/backend/migration/migration.go b/lib/backend/migration/migration.go index 8687928a45055..8f7f54d52265e 100644 --- a/lib/backend/migration/migration.go +++ b/lib/backend/migration/migration.go @@ -14,12 +14,16 @@ import ( "github.com/gravitational/trace" ) +const ( + // bufferSize is the number of backend items that are queried at a time. + bufferSize = 10000 +) + // Migration manages a migration between two [backend.Backend] interfaces. type Migration struct { src backend.Backend dst backend.Backend parallel int - total int migrated atomic.Int64 log logrus.FieldLogger } @@ -55,7 +59,7 @@ func New(ctx context.Context, config MigrationConfig) (*Migration, error) { parallel: config.Parallel, log: config.Log, } - if migration.parallel == 0 { + if migration.parallel <= 0 { migration.parallel = 1 } if migration.log == nil { @@ -83,29 +87,50 @@ func (m *Migration) Close() error { // Run runs a [Migration] until complete. func (m *Migration) Run(ctx context.Context) error { - var all []backend.Item + itemC := make(chan backend.Item, bufferSize) start := backend.Key("") - err := retry(ctx, 3, func() error { - result, err := m.src.GetRange(ctx, start, backend.RangeEnd(start), 0) - if err != nil { - return trace.Wrap(err) - } - all = result.Items - return nil - }) - if err != nil { - return trace.Wrap(err) - } - m.total = len(all) ctx, cancel := context.WithCancel(ctx) defer cancel() - group, ctx := errgroup.WithContext(ctx) - group.SetLimit(m.parallel) + putGroup, putCtx := errgroup.WithContext(ctx) + putGroup.SetLimit(m.parallel) + + getGroup, getCtx := errgroup.WithContext(ctx) + getGroup.Go(func() error { + for { + var result *backend.GetResult + defer close(itemC) + err := retry(getCtx, 3, func() error { + var err error + result, err = m.src.GetRange(getCtx, start, backend.RangeEnd(start), bufferSize) + if err != nil { + return trace.Wrap(err) + } + return nil + }) + if err != nil { + return trace.Wrap(err) + } + for _, item := range result.Items { + select { + case itemC <- item: + case <-getCtx.Done(): + return trace.Wrap(getCtx.Err()) + // This case indicates no consumers are pulling items + // from the channel. Return to avoid deadlock. + case <-putCtx.Done(): + return trace.Wrap(putCtx.Err()) + } + } + if len(result.Items) < bufferSize { + return nil + } + } + }) logProgress := func() { - m.log.Info("Migrated %d/%d", m.migrated.Load(), m.total) + m.log.Infof("Migrated %d", m.migrated.Load()) } defer logProgress() go func() { @@ -119,11 +144,11 @@ func (m *Migration) Run(ctx context.Context) error { } }() - for _, item := range all { + for item := range itemC { item := item - group.Go(func() error { - if err := retry(ctx, 3, func() error { - if _, err := m.dst.Put(ctx, item); err != nil { + putGroup.Go(func() error { + if err := retry(putCtx, 3, func() error { + if _, err := m.dst.Put(putCtx, item); err != nil { return trace.Wrap(err) } return nil @@ -133,15 +158,14 @@ func (m *Migration) Run(ctx context.Context) error { m.migrated.Add(1) return nil }) - if err := ctx.Err(); err != nil { + if err := putCtx.Err(); err != nil { break } } - if err := group.Wait(); err != nil { - return trace.Wrap(err) - } - return nil + getErr := getGroup.Wait() + putErr := putGroup.Wait() + return trace.NewAggregate(getErr, putErr) } func retry(ctx context.Context, attempts int, fn func() error) error { diff --git a/lib/backend/migration/migration_test.go b/lib/backend/migration/migration_test.go index 717b9921590ef..c85ce551b949b 100644 --- a/lib/backend/migration/migration_test.go +++ b/lib/backend/migration/migration_test.go @@ -22,7 +22,7 @@ func TestMigration(t *testing.T) { dst, err := memory.New(memory.Config{}) require.NoError(t, err) - itemCount := 1111 + itemCount := 11111 items := make([]backend.Item, itemCount) for i := 0; i < itemCount; i++ { @@ -51,6 +51,6 @@ func TestMigration(t *testing.T) { diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) require.Empty(t, diff) - require.Equal(t, itemCount, migration.total) require.Equal(t, itemCount, int(migration.migrated.Load())) + require.NoError(t, migration.Close()) } From 01f1e1c76bb2c74110bdc4e6977a41afc3b9d088 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Thu, 23 May 2024 16:49:14 -0600 Subject: [PATCH 06/22] Update tool/teleport/common/migrate.go Co-authored-by: Edoardo Spadolini --- tool/teleport/common/migrate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tool/teleport/common/migrate.go b/tool/teleport/common/migrate.go index d8d5eb5a3f816..de4d9fea45378 100644 --- a/tool/teleport/common/migrate.go +++ b/tool/teleport/common/migrate.go @@ -15,7 +15,7 @@ func runMigration(ctx context.Context, path string) error { if err != nil { return trace.Wrap(err) } - config := migration.MigrationConfig{} + var config migration.MigrationConfig if err := yaml.Unmarshal(data, &config); err != nil { return trace.Wrap(err) } From aad68881329614a84f03aacc464d845288dd48a5 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Fri, 24 May 2024 09:35:08 -0600 Subject: [PATCH 07/22] use a single errgroup for both get and put operations --- lib/backend/migration/migration.go | 44 ++++++++++++++---------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/lib/backend/migration/migration.go b/lib/backend/migration/migration.go index 8f7f54d52265e..3cd891dab3218 100644 --- a/lib/backend/migration/migration.go +++ b/lib/backend/migration/migration.go @@ -93,17 +93,18 @@ func (m *Migration) Run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - putGroup, putCtx := errgroup.WithContext(ctx) - putGroup.SetLimit(m.parallel) - - getGroup, getCtx := errgroup.WithContext(ctx) - getGroup.Go(func() error { + group, ctx := errgroup.WithContext(ctx) + // Add 1 to ensure a goroutine exists for getting items. + group.SetLimit(m.parallel + 1) + + group.Go(func() error { + var result *backend.GetResult + pageKey := start + defer close(itemC) for { - var result *backend.GetResult - defer close(itemC) - err := retry(getCtx, 3, func() error { + err := retry(ctx, 3, func() error { var err error - result, err = m.src.GetRange(getCtx, start, backend.RangeEnd(start), bufferSize) + result, err = m.src.GetRange(ctx, pageKey, backend.RangeEnd(start), bufferSize) if err != nil { return trace.Wrap(err) } @@ -115,17 +116,14 @@ func (m *Migration) Run(ctx context.Context) error { for _, item := range result.Items { select { case itemC <- item: - case <-getCtx.Done(): - return trace.Wrap(getCtx.Err()) - // This case indicates no consumers are pulling items - // from the channel. Return to avoid deadlock. - case <-putCtx.Done(): - return trace.Wrap(putCtx.Err()) + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) } } if len(result.Items) < bufferSize { return nil } + pageKey = backend.RangeEnd(result.Items[len(result.Items)-1].Key) } }) @@ -146,9 +144,9 @@ func (m *Migration) Run(ctx context.Context) error { for item := range itemC { item := item - putGroup.Go(func() error { - if err := retry(putCtx, 3, func() error { - if _, err := m.dst.Put(putCtx, item); err != nil { + group.Go(func() error { + if err := retry(ctx, 3, func() error { + if _, err := m.dst.Put(ctx, item); err != nil { return trace.Wrap(err) } return nil @@ -158,14 +156,14 @@ func (m *Migration) Run(ctx context.Context) error { m.migrated.Add(1) return nil }) - if err := putCtx.Err(); err != nil { + if err := ctx.Err(); err != nil { break } } - - getErr := getGroup.Wait() - putErr := putGroup.Wait() - return trace.NewAggregate(getErr, putErr) + if err := group.Wait(); err != nil { + return trace.Wrap(err) + } + return nil } func retry(ctx context.Context, attempts int, fn func() error) error { From 38810671d676a609dabdd689eb6e1aa9a0aa1334 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Thu, 30 May 2024 16:40:53 -0600 Subject: [PATCH 08/22] refactor migration to clone --- .../migration.go => clone/clone.go} | 68 ++++++++++--------- .../migration_test.go => clone/clone_test.go} | 14 ++-- .../common/{migrate.go => backend.go} | 14 ++-- tool/teleport/common/teleport.go | 17 +++-- 4 files changed, 59 insertions(+), 54 deletions(-) rename lib/backend/{migration/migration.go => clone/clone.go} (69%) rename lib/backend/{migration/migration_test.go => clone/clone_test.go} (81%) rename tool/teleport/common/{migrate.go => backend.go} (51%) diff --git a/lib/backend/migration/migration.go b/lib/backend/clone/clone.go similarity index 69% rename from lib/backend/migration/migration.go rename to lib/backend/clone/clone.go index 3cd891dab3218..0043735a45cb6 100644 --- a/lib/backend/migration/migration.go +++ b/lib/backend/clone/clone.go @@ -1,16 +1,17 @@ -package migration +package clone import ( "context" + "log/slog" "sync/atomic" "time" - "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/backend" + logutils "github.com/gravitational/teleport/lib/utils/log" "github.com/gravitational/trace" ) @@ -19,32 +20,32 @@ const ( bufferSize = 10000 ) -// Migration manages a migration between two [backend.Backend] interfaces. -type Migration struct { +// Cloner manages cloning data between two [backend.Backend] interfaces. +type Cloner struct { src backend.Backend dst backend.Backend parallel int migrated atomic.Int64 - log logrus.FieldLogger + log *slog.Logger } -// MigrationConfig configures a [Migration] with a source and destination backend. +// Config contains the configuration for cloning a [backend.Backend]. // All items from the source are copied to the destination. All Teleport Auth -// Service instances should be stopped when running a migration to avoid data +// Service instances should be stopped when running clone to avoid data // inconsistencies. -type MigrationConfig struct { - // Source is the backend [backend.Config] items are migrated from. +type Config struct { + // Source is the backend [backend.Config] items are cloned from. Source backend.Config `yaml:"src"` - // Destination is the [backend.Config] items are migrated to. + // Destination is the [backend.Config] items are cloned to. Destination backend.Config `yaml:"dst"` - // Parallel is the number of items that will be migraated in parallel. + // Parallel is the number of items that will be cloned in parallel. Parallel int `yaml:"parallel"` - // Log logs the progress of a [Migration] - Log logrus.FieldLogger + // Log logs the progress of cloning. + Log *slog.Logger } -// New returns a [Migration] based on the provided [MigrationConfig]. -func New(ctx context.Context, config MigrationConfig) (*Migration, error) { +// New returns a [Cloner] based on the provided [Config]. +func New(ctx context.Context, config Config) (*Cloner, error) { src, err := backend.New(ctx, config.Source.Type, config.Source.Params) if err != nil { return nil, trace.Wrap(err, "failed to create source backend") @@ -53,31 +54,32 @@ func New(ctx context.Context, config MigrationConfig) (*Migration, error) { if err != nil { return nil, trace.Wrap(err, "failed to create destination backend") } - migration := &Migration{ + cloner := &Cloner{ src: src, dst: dst, parallel: config.Parallel, log: config.Log, } - if migration.parallel <= 0 { - migration.parallel = 1 + if cloner.parallel <= 0 { + cloner.parallel = 1 } - if migration.log == nil { - migration.log = logrus.WithField(teleport.ComponentKey, "migration") + if cloner.log == nil { + cloner.log = logutils.NewPackageLogger(teleport.ComponentKey, "backend.clone") } - return migration, nil + return cloner, nil } -func (m *Migration) Close() error { +// Close ensures the source and destination backends are closed. +func (c *Cloner) Close() error { var errs []error - if m.src != nil { - err := m.src.Close() + if c.src != nil { + err := c.src.Close() if err != nil { errs = append(errs, err) } } - if m.dst != nil { - err := m.dst.Close() + if c.dst != nil { + err := c.dst.Close() if err != nil { errs = append(errs, err) } @@ -85,8 +87,8 @@ func (m *Migration) Close() error { return trace.NewAggregate(errs...) } -// Run runs a [Migration] until complete. -func (m *Migration) Run(ctx context.Context) error { +// Run runs backend cloning until complete. +func (c *Cloner) Clone(ctx context.Context) error { itemC := make(chan backend.Item, bufferSize) start := backend.Key("") @@ -95,7 +97,7 @@ func (m *Migration) Run(ctx context.Context) error { group, ctx := errgroup.WithContext(ctx) // Add 1 to ensure a goroutine exists for getting items. - group.SetLimit(m.parallel + 1) + group.SetLimit(c.parallel + 1) group.Go(func() error { var result *backend.GetResult @@ -104,7 +106,7 @@ func (m *Migration) Run(ctx context.Context) error { for { err := retry(ctx, 3, func() error { var err error - result, err = m.src.GetRange(ctx, pageKey, backend.RangeEnd(start), bufferSize) + result, err = c.src.GetRange(ctx, pageKey, backend.RangeEnd(start), bufferSize) if err != nil { return trace.Wrap(err) } @@ -128,7 +130,7 @@ func (m *Migration) Run(ctx context.Context) error { }) logProgress := func() { - m.log.Infof("Migrated %d", m.migrated.Load()) + c.log.Info("Migrated %d", c.migrated.Load()) } defer logProgress() go func() { @@ -146,14 +148,14 @@ func (m *Migration) Run(ctx context.Context) error { item := item group.Go(func() error { if err := retry(ctx, 3, func() error { - if _, err := m.dst.Put(ctx, item); err != nil { + if _, err := c.dst.Put(ctx, item); err != nil { return trace.Wrap(err) } return nil }); err != nil { return trace.Wrap(err) } - m.migrated.Add(1) + c.migrated.Add(1) return nil }) if err := ctx.Err(); err != nil { diff --git a/lib/backend/migration/migration_test.go b/lib/backend/clone/clone_test.go similarity index 81% rename from lib/backend/migration/migration_test.go rename to lib/backend/clone/clone_test.go index c85ce551b949b..8cf96c463fa60 100644 --- a/lib/backend/migration/migration_test.go +++ b/lib/backend/clone/clone_test.go @@ -1,4 +1,4 @@ -package migration +package clone import ( "context" @@ -7,11 +7,11 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/memory" + logutils "github.com/gravitational/teleport/lib/utils/log" ) func TestMigration(t *testing.T) { @@ -35,14 +35,14 @@ func TestMigration(t *testing.T) { items[i] = item } - migration := Migration{ + cloner := Cloner{ src: src, dst: dst, parallel: 10, - log: logrus.New(), + log: logutils.NewPackageLogger(), } - err = migration.Run(ctx) + err = cloner.Clone(ctx) require.NoError(t, err) start := backend.Key("") @@ -51,6 +51,6 @@ func TestMigration(t *testing.T) { diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) require.Empty(t, diff) - require.Equal(t, itemCount, int(migration.migrated.Load())) - require.NoError(t, migration.Close()) + require.Equal(t, itemCount, int(cloner.migrated.Load())) + require.NoError(t, cloner.Close()) } diff --git a/tool/teleport/common/migrate.go b/tool/teleport/common/backend.go similarity index 51% rename from tool/teleport/common/migrate.go rename to tool/teleport/common/backend.go index de4d9fea45378..3b6f48e115d7b 100644 --- a/tool/teleport/common/migrate.go +++ b/tool/teleport/common/backend.go @@ -6,27 +6,27 @@ import ( "gopkg.in/yaml.v3" - "github.com/gravitational/teleport/lib/backend/migration" + "github.com/gravitational/teleport/lib/backend/clone" "github.com/gravitational/trace" ) -func runMigration(ctx context.Context, path string) error { - data, err := os.ReadFile(path) +func onClone(ctx context.Context, configPath string) error { + data, err := os.ReadFile(configPath) if err != nil { return trace.Wrap(err) } - var config migration.MigrationConfig + var config clone.Config if err := yaml.Unmarshal(data, &config); err != nil { return trace.Wrap(err) } - migration, err := migration.New(ctx, config) + cloner, err := clone.New(ctx, config) if err != nil { return trace.Wrap(err) } - defer migration.Close() + defer cloner.Close() - if err := migration.Run(ctx); err != nil { + if err := cloner.Clone(ctx); err != nil { return trace.Wrap(err) } return nil diff --git a/tool/teleport/common/teleport.go b/tool/teleport/common/teleport.go index 3efd2dc8d86f7..abbc892b511a2 100644 --- a/tool/teleport/common/teleport.go +++ b/tool/teleport/common/teleport.go @@ -107,13 +107,8 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con remoteForward := app.Command(teleport.RemoteForwardSubCommand, "Used internally by Teleport to re-exec itself to remote port forward.").Hidden() checkHomeDir := app.Command(teleport.CheckHomeDirSubCommand, "Used internally by Teleport to re-exec itself to check access to a directory.").Hidden() park := app.Command(teleport.ParkSubCommand, "Used internally by Teleport to re-exec itself to do nothing.").Hidden() - migrate := app.Command("migrate", "Migrate data between two backends.").Hidden() app.HelpFlag.Short('h') - migrate.Flag("config", "Path to migration config file."). - Short('c'). - StringVar(&ccf.ConfigFile) - // define start flags: start.Flag("debug", "Enable verbose logging to stderr"). Short('d'). @@ -546,6 +541,14 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con collectProfilesCmd.Arg("PROFILES", fmt.Sprintf("Comma-separated profile names to be exported. Supported profiles: %s. Default: %s", strings.Join(maps.Keys(debugclient.SupportedProfiles), ","), strings.Join(defaultCollectProfiles, ","))).StringVar(&ccf.Profiles) collectProfilesCmd.Flag("seconds", "For CPU and trace profiles, profile for the given duration (if set to 0, it returns a profile snapshot). For other profiles, return a delta profile. Default: 0").Short('s').Default("0").IntVar(&ccf.ProfileSeconds) + backendCmd := app.Command("backend", "Commands for managing backend data.") + backendCmd.Hidden() + backendCloneCmd := backendCmd.Command("clone", "Clones data from a source to a destination backend.") + backendCloneCmd.Flag("config", "Path to the clone config file."). + Required(). + Short('c'). + StringVar(&ccf.ConfigFile) + // parse CLI commands+flags: utils.UpdateAppUsageTemplate(app, options.Args) command, err := app.Parse(options.Args) @@ -668,8 +671,8 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con err = onGetLogLevel(ccf.ConfigFile) case collectProfilesCmd.FullCommand(): err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds) - case migrate.FullCommand(): - err = runMigration(context.Background(), ccf.ConfigFile) + case backendCloneCmd.FullCommand(): + err = onClone(context.Background(), ccf.ConfigFile) } if err != nil { utils.FatalError(err) From ca53ad20389edf5c83f21d556b5738a48ae75013 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Thu, 30 May 2024 17:18:56 -0600 Subject: [PATCH 09/22] Add destination check and force config option --- lib/backend/clone/clone.go | 20 ++++++++++++- lib/backend/clone/clone_test.go | 50 ++++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/lib/backend/clone/clone.go b/lib/backend/clone/clone.go index 0043735a45cb6..6b2a793ff4565 100644 --- a/lib/backend/clone/clone.go +++ b/lib/backend/clone/clone.go @@ -2,6 +2,7 @@ package clone import ( "context" + "fmt" "log/slog" "sync/atomic" "time" @@ -25,6 +26,7 @@ type Cloner struct { src backend.Backend dst backend.Backend parallel int + force bool migrated atomic.Int64 log *slog.Logger } @@ -40,6 +42,9 @@ type Config struct { Destination backend.Config `yaml:"dst"` // Parallel is the number of items that will be cloned in parallel. Parallel int `yaml:"parallel"` + // Force indicates whether to clone data regardless of whether data already + // exists in the destination [backend.Backend]. + Force bool `yaml:"force"` // Log logs the progress of cloning. Log *slog.Logger } @@ -58,6 +63,7 @@ func New(ctx context.Context, config Config) (*Cloner, error) { src: src, dst: dst, parallel: config.Parallel, + force: config.Force, log: config.Log, } if cloner.parallel <= 0 { @@ -95,6 +101,18 @@ func (c *Cloner) Clone(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() + if !c.force { + result, err := c.dst.GetRange(ctx, start, backend.RangeEnd(start), 1) + if err != nil { + return trace.Wrap(err, "failed to check destination for existing data") + } + if len(result.Items) > 0 { + return trace.Errorf("unable to clone data to destination with existing data; this may be overriden by configuring 'force: true'") + } + } else { + c.log.Warn("Skipping check for existing data in destination.") + } + group, ctx := errgroup.WithContext(ctx) // Add 1 to ensure a goroutine exists for getting items. group.SetLimit(c.parallel + 1) @@ -130,7 +148,7 @@ func (c *Cloner) Clone(ctx context.Context) error { }) logProgress := func() { - c.log.Info("Migrated %d", c.migrated.Load()) + c.log.Info(fmt.Sprintf("Migrated %d", c.migrated.Load())) } defer logProgress() go func() { diff --git a/lib/backend/clone/clone_test.go b/lib/backend/clone/clone_test.go index 8cf96c463fa60..574fe4636f14b 100644 --- a/lib/backend/clone/clone_test.go +++ b/lib/backend/clone/clone_test.go @@ -14,7 +14,7 @@ import ( logutils "github.com/gravitational/teleport/lib/utils/log" ) -func TestMigration(t *testing.T) { +func TestClone(t *testing.T) { ctx := context.Background() src, err := memory.New(memory.Config{}) require.NoError(t, err) @@ -54,3 +54,51 @@ func TestMigration(t *testing.T) { require.Equal(t, itemCount, int(cloner.migrated.Load())) require.NoError(t, cloner.Close()) } + +func TestCloneForce(t *testing.T) { + ctx := context.Background() + src, err := memory.New(memory.Config{}) + require.NoError(t, err) + + dst, err := memory.New(memory.Config{}) + require.NoError(t, err) + + itemCount := 100 + items := make([]backend.Item, itemCount) + + for i := 0; i < itemCount; i++ { + item := backend.Item{ + Key: backend.Key(fmt.Sprintf("key-%05d", i)), + Value: []byte(fmt.Sprintf("value-%d", i)), + } + _, err := src.Put(ctx, item) + require.NoError(t, err) + items[i] = item + } + + _, err = dst.Put(ctx, items[0]) + require.NoError(t, err) + + cloner := Cloner{ + src: src, + dst: dst, + parallel: 10, + log: logutils.NewPackageLogger(), + } + + err = cloner.Clone(ctx) + require.Error(t, err) + + cloner.force = true + err = cloner.Clone(ctx) + require.NoError(t, err) + + start := backend.Key("") + result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0) + require.NoError(t, err) + + diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) + require.Empty(t, diff) + require.Equal(t, itemCount, int(cloner.migrated.Load())) + require.NoError(t, cloner.Close()) +} From 73955d8585ea03ca019b60e3c96b4874ff9a33f2 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Fri, 7 Jun 2024 08:51:31 -0600 Subject: [PATCH 10/22] Update lib/backend/clone/clone.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone/clone.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/clone/clone.go b/lib/backend/clone/clone.go index 6b2a793ff4565..493e683a8bcd0 100644 --- a/lib/backend/clone/clone.go +++ b/lib/backend/clone/clone.go @@ -148,7 +148,7 @@ func (c *Cloner) Clone(ctx context.Context) error { }) logProgress := func() { - c.log.Info(fmt.Sprintf("Migrated %d", c.migrated.Load())) + c.log.InfoContext(ctx, "Backend clone still in progress", "items_copied" c.migrated.Load())) } defer logProgress() go func() { From 134e3905ccd643421e3296452d0fadfaed97ef74 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Fri, 7 Jun 2024 08:51:40 -0600 Subject: [PATCH 11/22] Update lib/backend/clone/clone.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone/clone.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/clone/clone.go b/lib/backend/clone/clone.go index 493e683a8bcd0..f03a3dcd12cf2 100644 --- a/lib/backend/clone/clone.go +++ b/lib/backend/clone/clone.go @@ -70,7 +70,7 @@ func New(ctx context.Context, config Config) (*Cloner, error) { cloner.parallel = 1 } if cloner.log == nil { - cloner.log = logutils.NewPackageLogger(teleport.ComponentKey, "backend.clone") + cloner.log = slog.With(teleport.ComponentKey, "backend.clone") } return cloner, nil } From 0923ebe391ea2a8a65f2db5d30d4a8144b259781 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Fri, 7 Jun 2024 08:52:33 -0600 Subject: [PATCH 12/22] Update lib/backend/clone/clone.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone/clone.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/clone/clone.go b/lib/backend/clone/clone.go index f03a3dcd12cf2..dd0e1125abc97 100644 --- a/lib/backend/clone/clone.go +++ b/lib/backend/clone/clone.go @@ -110,7 +110,7 @@ func (c *Cloner) Clone(ctx context.Context) error { return trace.Errorf("unable to clone data to destination with existing data; this may be overriden by configuring 'force: true'") } } else { - c.log.Warn("Skipping check for existing data in destination.") + c.log.WarnContext(ctx, "Skipping check for existing data in destination.") } group, ctx := errgroup.WithContext(ctx) From b853932df0df690d88a33288703b4590c51bb324 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Fri, 7 Jun 2024 09:56:23 -0600 Subject: [PATCH 13/22] move clone to lib/backend and remove struct --- lib/backend/{clone => }/clone.go | 105 +++++++------------------- lib/backend/{clone => }/clone_test.go | 35 +++------ tool/teleport/common/backend.go | 18 +++-- 3 files changed, 49 insertions(+), 109 deletions(-) rename lib/backend/{clone => }/clone.go (51%) rename lib/backend/{clone => }/clone_test.go (76%) diff --git a/lib/backend/clone/clone.go b/lib/backend/clone.go similarity index 51% rename from lib/backend/clone/clone.go rename to lib/backend/clone.go index dd0e1125abc97..93603c262f286 100644 --- a/lib/backend/clone/clone.go +++ b/lib/backend/clone.go @@ -1,8 +1,7 @@ -package clone +package backend import ( "context" - "fmt" "log/slog" "sync/atomic" "time" @@ -11,8 +10,6 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/utils/retryutils" - "github.com/gravitational/teleport/lib/backend" - logutils "github.com/gravitational/teleport/lib/utils/log" "github.com/gravitational/trace" ) @@ -21,88 +18,38 @@ const ( bufferSize = 10000 ) -// Cloner manages cloning data between two [backend.Backend] interfaces. -type Cloner struct { - src backend.Backend - dst backend.Backend - parallel int - force bool - migrated atomic.Int64 - log *slog.Logger -} - -// Config contains the configuration for cloning a [backend.Backend]. +// CloneConfig contains the configuration for cloning a [Backend]. // All items from the source are copied to the destination. All Teleport Auth // Service instances should be stopped when running clone to avoid data // inconsistencies. -type Config struct { - // Source is the backend [backend.Config] items are cloned from. - Source backend.Config `yaml:"src"` - // Destination is the [backend.Config] items are cloned to. - Destination backend.Config `yaml:"dst"` +type CloneConfig struct { + // Source is the backend [Config] items are cloned from. + Source Config `yaml:"src"` + // Destination is the [Config] items are cloned to. + Destination Config `yaml:"dst"` // Parallel is the number of items that will be cloned in parallel. Parallel int `yaml:"parallel"` // Force indicates whether to clone data regardless of whether data already - // exists in the destination [backend.Backend]. + // exists in the destination [Backend]. Force bool `yaml:"force"` - // Log logs the progress of cloning. - Log *slog.Logger } -// New returns a [Cloner] based on the provided [Config]. -func New(ctx context.Context, config Config) (*Cloner, error) { - src, err := backend.New(ctx, config.Source.Type, config.Source.Params) - if err != nil { - return nil, trace.Wrap(err, "failed to create source backend") - } - dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params) - if err != nil { - return nil, trace.Wrap(err, "failed to create destination backend") - } - cloner := &Cloner{ - src: src, - dst: dst, - parallel: config.Parallel, - force: config.Force, - log: config.Log, - } - if cloner.parallel <= 0 { - cloner.parallel = 1 - } - if cloner.log == nil { - cloner.log = slog.With(teleport.ComponentKey, "backend.clone") - } - return cloner, nil -} +// Clone copies all items from a source to a destination [Backend]. +func Clone(ctx context.Context, src, dst Backend, parallel int, force bool) error { + log := slog.With(teleport.ComponentKey, "clone") + itemC := make(chan Item, bufferSize) + start := Key("") + migrated := &atomic.Int32{} -// Close ensures the source and destination backends are closed. -func (c *Cloner) Close() error { - var errs []error - if c.src != nil { - err := c.src.Close() - if err != nil { - errs = append(errs, err) - } - } - if c.dst != nil { - err := c.dst.Close() - if err != nil { - errs = append(errs, err) - } + if parallel <= 0 { + parallel = 1 } - return trace.NewAggregate(errs...) -} - -// Run runs backend cloning until complete. -func (c *Cloner) Clone(ctx context.Context) error { - itemC := make(chan backend.Item, bufferSize) - start := backend.Key("") ctx, cancel := context.WithCancel(ctx) defer cancel() - if !c.force { - result, err := c.dst.GetRange(ctx, start, backend.RangeEnd(start), 1) + if !force { + result, err := dst.GetRange(ctx, start, RangeEnd(start), 1) if err != nil { return trace.Wrap(err, "failed to check destination for existing data") } @@ -110,21 +57,21 @@ func (c *Cloner) Clone(ctx context.Context) error { return trace.Errorf("unable to clone data to destination with existing data; this may be overriden by configuring 'force: true'") } } else { - c.log.WarnContext(ctx, "Skipping check for existing data in destination.") + log.WarnContext(ctx, "Skipping check for existing data in destination.") } group, ctx := errgroup.WithContext(ctx) // Add 1 to ensure a goroutine exists for getting items. - group.SetLimit(c.parallel + 1) + group.SetLimit(parallel + 1) group.Go(func() error { - var result *backend.GetResult + var result *GetResult pageKey := start defer close(itemC) for { err := retry(ctx, 3, func() error { var err error - result, err = c.src.GetRange(ctx, pageKey, backend.RangeEnd(start), bufferSize) + result, err = src.GetRange(ctx, pageKey, RangeEnd(start), bufferSize) if err != nil { return trace.Wrap(err) } @@ -143,12 +90,12 @@ func (c *Cloner) Clone(ctx context.Context) error { if len(result.Items) < bufferSize { return nil } - pageKey = backend.RangeEnd(result.Items[len(result.Items)-1].Key) + pageKey = RangeEnd(result.Items[len(result.Items)-1].Key) } }) logProgress := func() { - c.log.InfoContext(ctx, "Backend clone still in progress", "items_copied" c.migrated.Load())) + log.InfoContext(ctx, "Backend clone still in progress", "items_copied", migrated.Load()) } defer logProgress() go func() { @@ -166,14 +113,14 @@ func (c *Cloner) Clone(ctx context.Context) error { item := item group.Go(func() error { if err := retry(ctx, 3, func() error { - if _, err := c.dst.Put(ctx, item); err != nil { + if _, err := dst.Put(ctx, item); err != nil { return trace.Wrap(err) } return nil }); err != nil { return trace.Wrap(err) } - c.migrated.Add(1) + migrated.Add(1) return nil }) if err := ctx.Err(); err != nil { diff --git a/lib/backend/clone/clone_test.go b/lib/backend/clone_test.go similarity index 76% rename from lib/backend/clone/clone_test.go rename to lib/backend/clone_test.go index 574fe4636f14b..1a81e459d783f 100644 --- a/lib/backend/clone/clone_test.go +++ b/lib/backend/clone_test.go @@ -1,4 +1,4 @@ -package clone +package backend_test import ( "context" @@ -11,16 +11,17 @@ import ( "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/memory" - logutils "github.com/gravitational/teleport/lib/utils/log" ) func TestClone(t *testing.T) { ctx := context.Background() src, err := memory.New(memory.Config{}) require.NoError(t, err) + defer src.Close() dst, err := memory.New(memory.Config{}) require.NoError(t, err) + defer dst.Close() itemCount := 11111 items := make([]backend.Item, itemCount) @@ -35,14 +36,7 @@ func TestClone(t *testing.T) { items[i] = item } - cloner := Cloner{ - src: src, - dst: dst, - parallel: 10, - log: logutils.NewPackageLogger(), - } - - err = cloner.Clone(ctx) + err = backend.Clone(ctx, src, dst, 10, false) require.NoError(t, err) start := backend.Key("") @@ -51,17 +45,19 @@ func TestClone(t *testing.T) { diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) require.Empty(t, diff) - require.Equal(t, itemCount, int(cloner.migrated.Load())) - require.NoError(t, cloner.Close()) + require.NoError(t, err) + require.Equal(t, itemCount, len(result.Items)) } func TestCloneForce(t *testing.T) { ctx := context.Background() src, err := memory.New(memory.Config{}) require.NoError(t, err) + defer src.Close() dst, err := memory.New(memory.Config{}) require.NoError(t, err) + defer dst.Close() itemCount := 100 items := make([]backend.Item, itemCount) @@ -79,18 +75,10 @@ func TestCloneForce(t *testing.T) { _, err = dst.Put(ctx, items[0]) require.NoError(t, err) - cloner := Cloner{ - src: src, - dst: dst, - parallel: 10, - log: logutils.NewPackageLogger(), - } - - err = cloner.Clone(ctx) + err = backend.Clone(ctx, src, dst, 10, false) require.Error(t, err) - cloner.force = true - err = cloner.Clone(ctx) + err = backend.Clone(ctx, src, dst, 10, true) require.NoError(t, err) start := backend.Key("") @@ -99,6 +87,5 @@ func TestCloneForce(t *testing.T) { diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) require.Empty(t, diff) - require.Equal(t, itemCount, int(cloner.migrated.Load())) - require.NoError(t, cloner.Close()) + require.Equal(t, itemCount, len(result.Items)) } diff --git a/tool/teleport/common/backend.go b/tool/teleport/common/backend.go index 3b6f48e115d7b..b414dce4bbc9c 100644 --- a/tool/teleport/common/backend.go +++ b/tool/teleport/common/backend.go @@ -6,7 +6,7 @@ import ( "gopkg.in/yaml.v3" - "github.com/gravitational/teleport/lib/backend/clone" + "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/trace" ) @@ -15,18 +15,24 @@ func onClone(ctx context.Context, configPath string) error { if err != nil { return trace.Wrap(err) } - var config clone.Config + var config backend.CloneConfig if err := yaml.Unmarshal(data, &config); err != nil { return trace.Wrap(err) } - cloner, err := clone.New(ctx, config) + src, err := backend.New(ctx, config.Source.Type, config.Source.Params) if err != nil { - return trace.Wrap(err) + return trace.Wrap(err, "failed to create source backend") + } + defer src.Close() + + dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params) + if err != nil { + return trace.Wrap(err, "failed to create destination backend") } - defer cloner.Close() + defer dst.Close() - if err := cloner.Clone(ctx); err != nil { + if err := backend.Clone(ctx, src, dst, config.Parallel, config.Force); err != nil { return trace.Wrap(err) } return nil From b4dfd2458ee834e16cace7ae8f4ad893989fc74b Mon Sep 17 00:00:00 2001 From: David Boslee Date: Mon, 10 Jun 2024 10:38:58 -0600 Subject: [PATCH 14/22] Update tool/teleport/common/backend.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- tool/teleport/common/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tool/teleport/common/backend.go b/tool/teleport/common/backend.go index b414dce4bbc9c..0e2ef7d087d76 100644 --- a/tool/teleport/common/backend.go +++ b/tool/teleport/common/backend.go @@ -4,10 +4,10 @@ import ( "context" "os" + "github.com/gravitational/trace" "gopkg.in/yaml.v3" "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/trace" ) func onClone(ctx context.Context, configPath string) error { From 6a464a9397febf938d04aa818d7b36170ca701bc Mon Sep 17 00:00:00 2001 From: David Boslee Date: Mon, 10 Jun 2024 10:39:05 -0600 Subject: [PATCH 15/22] Update lib/backend/clone.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/clone.go b/lib/backend/clone.go index 93603c262f286..ed04f626b9aea 100644 --- a/lib/backend/clone.go +++ b/lib/backend/clone.go @@ -54,7 +54,7 @@ func Clone(ctx context.Context, src, dst Backend, parallel int, force bool) erro return trace.Wrap(err, "failed to check destination for existing data") } if len(result.Items) > 0 { - return trace.Errorf("unable to clone data to destination with existing data; this may be overriden by configuring 'force: true'") + return trace.Errorf("unable to clone data to destination with existing data; this may be overridden by configuring 'force: true'") } } else { log.WarnContext(ctx, "Skipping check for existing data in destination.") From fa77cda5c216d74dbb50f2a3b5ef40f4abc99bea Mon Sep 17 00:00:00 2001 From: David Boslee Date: Mon, 10 Jun 2024 10:39:12 -0600 Subject: [PATCH 16/22] Update lib/backend/clone.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/clone.go b/lib/backend/clone.go index ed04f626b9aea..8e7655405fdf6 100644 --- a/lib/backend/clone.go +++ b/lib/backend/clone.go @@ -6,11 +6,11 @@ import ( "sync/atomic" "time" + "github.com/gravitational/trace" "golang.org/x/sync/errgroup" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/utils/retryutils" - "github.com/gravitational/trace" ) const ( From c58af4d90b56404a069e1c43b10feb7068ff2a3f Mon Sep 17 00:00:00 2001 From: David Boslee Date: Mon, 10 Jun 2024 10:39:20 -0600 Subject: [PATCH 17/22] Update lib/backend/clone_test.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/clone_test.go b/lib/backend/clone_test.go index 1a81e459d783f..ee895962c0e42 100644 --- a/lib/backend/clone_test.go +++ b/lib/backend/clone_test.go @@ -43,7 +43,7 @@ func TestClone(t *testing.T) { result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0) require.NoError(t, err) - diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) + diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision")) require.Empty(t, diff) require.NoError(t, err) require.Equal(t, itemCount, len(result.Items)) From 9f156719be4c684b0329a6973e3aaaa26acb9c65 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Mon, 10 Jun 2024 10:39:27 -0600 Subject: [PATCH 18/22] Update lib/backend/clone.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/backend/clone.go b/lib/backend/clone.go index 8e7655405fdf6..323201007dc4b 100644 --- a/lib/backend/clone.go +++ b/lib/backend/clone.go @@ -13,10 +13,8 @@ import ( "github.com/gravitational/teleport/api/utils/retryutils" ) -const ( - // bufferSize is the number of backend items that are queried at a time. - bufferSize = 10000 -) +// bufferSize is the number of backend items that are queried at a time. +const bufferSize = 10000 // CloneConfig contains the configuration for cloning a [Backend]. // All items from the source are copied to the destination. All Teleport Auth From eb4988c115805f701b23e7c6824d89db08f9bf20 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Mon, 10 Jun 2024 10:39:37 -0600 Subject: [PATCH 19/22] Update lib/backend/clone_test.go Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/backend/clone_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/backend/clone_test.go b/lib/backend/clone_test.go index ee895962c0e42..f837f5507641a 100644 --- a/lib/backend/clone_test.go +++ b/lib/backend/clone_test.go @@ -85,7 +85,7 @@ func TestCloneForce(t *testing.T) { result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0) require.NoError(t, err) - diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID")) + diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision")) require.Empty(t, diff) require.Equal(t, itemCount, len(result.Items)) } From 505a6469f7aad37677a1660789fcd15160ac7cd0 Mon Sep 17 00:00:00 2001 From: David Boslee Date: Tue, 11 Jun 2024 14:37:01 -0600 Subject: [PATCH 20/22] fix lint --- lib/backend/clone_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/backend/clone_test.go b/lib/backend/clone_test.go index f837f5507641a..5e6adda534ecc 100644 --- a/lib/backend/clone_test.go +++ b/lib/backend/clone_test.go @@ -46,7 +46,7 @@ func TestClone(t *testing.T) { diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision")) require.Empty(t, diff) require.NoError(t, err) - require.Equal(t, itemCount, len(result.Items)) + require.Len(t, result.Items, itemCount) } func TestCloneForce(t *testing.T) { @@ -87,5 +87,5 @@ func TestCloneForce(t *testing.T) { diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision")) require.Empty(t, diff) - require.Equal(t, itemCount, len(result.Items)) + require.Len(t, result.Items, itemCount) } From 8a74e2a73e0766ca0ddc2e217b63b826e3aaaf8e Mon Sep 17 00:00:00 2001 From: David Boslee Date: Thu, 20 Jun 2024 10:16:17 -0600 Subject: [PATCH 21/22] fix license --- lib/backend/clone.go | 16 ++++++++++++++++ lib/backend/clone_test.go | 16 ++++++++++++++++ tool/teleport/common/backend.go | 16 ++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/lib/backend/clone.go b/lib/backend/clone.go index 323201007dc4b..9579d2571eebc 100644 --- a/lib/backend/clone.go +++ b/lib/backend/clone.go @@ -1,3 +1,19 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package backend import ( diff --git a/lib/backend/clone_test.go b/lib/backend/clone_test.go index 5e6adda534ecc..c4cc492441049 100644 --- a/lib/backend/clone_test.go +++ b/lib/backend/clone_test.go @@ -1,3 +1,19 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package backend_test import ( diff --git a/tool/teleport/common/backend.go b/tool/teleport/common/backend.go index 0e2ef7d087d76..c859af4cc088c 100644 --- a/tool/teleport/common/backend.go +++ b/tool/teleport/common/backend.go @@ -1,3 +1,19 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package common import ( From ad3ecaa07d021a05a609617552df84e2672fe5fe Mon Sep 17 00:00:00 2001 From: David Boslee Date: Thu, 20 Jun 2024 11:08:07 -0600 Subject: [PATCH 22/22] add example config in alias --- tool/teleport/common/teleport.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tool/teleport/common/teleport.go b/tool/teleport/common/teleport.go index abbc892b511a2..4e48e86c40243 100644 --- a/tool/teleport/common/teleport.go +++ b/tool/teleport/common/teleport.go @@ -548,6 +548,32 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con Required(). Short('c'). StringVar(&ccf.ConfigFile) + backendCloneCmd.Alias(` +Examples: + + When cloning a backend you must specify a clone configuration file: + + > teleport backend clone --config clone.yaml + + The following example configuration will clone Teleport's backend + data from sqlite to dynamodb: + + # src is the configuration for the backend where data is cloned from. + src: + type: sqlite + path: /var/lib/teleport_data + # dst is the configuration for the backend where data is cloned to. + dst: + type: dynamodb + region: us-east-1 + table: teleport_backend + # parallel is the amount of backend data cloned in parallel. + # If a clone operation is taking too long consider increasing this value. + parallel: 100 + # force, if set to true, will continue cloning data to a destination + # regardless of whether data is already present. By default this is false + # to protect against overwriting the data of an existing Teleport cluster. + force: false`) // parse CLI commands+flags: utils.UpdateAppUsageTemplate(app, options.Args)