diff --git a/lib/backend/clone.go b/lib/backend/clone.go new file mode 100644 index 0000000000000..9579d2571eebc --- /dev/null +++ b/lib/backend/clone.go @@ -0,0 +1,175 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package backend + +import ( + "context" + "log/slog" + "sync/atomic" + "time" + + "github.com/gravitational/trace" + "golang.org/x/sync/errgroup" + + "github.com/gravitational/teleport" + "github.com/gravitational/teleport/api/utils/retryutils" +) + +// bufferSize is the number of backend items that are queried at a time. +const bufferSize = 10000 + +// CloneConfig contains the configuration for cloning a [Backend]. +// All items from the source are copied to the destination. All Teleport Auth +// Service instances should be stopped when running clone to avoid data +// inconsistencies. +type CloneConfig struct { + // Source is the backend [Config] items are cloned from. + Source Config `yaml:"src"` + // Destination is the [Config] items are cloned to. + Destination Config `yaml:"dst"` + // Parallel is the number of items that will be cloned in parallel. + Parallel int `yaml:"parallel"` + // Force indicates whether to clone data regardless of whether data already + // exists in the destination [Backend]. + Force bool `yaml:"force"` +} + +// Clone copies all items from a source to a destination [Backend]. +func Clone(ctx context.Context, src, dst Backend, parallel int, force bool) error { + log := slog.With(teleport.ComponentKey, "clone") + itemC := make(chan Item, bufferSize) + start := Key("") + migrated := &atomic.Int32{} + + if parallel <= 0 { + parallel = 1 + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if !force { + result, err := dst.GetRange(ctx, start, RangeEnd(start), 1) + if err != nil { + return trace.Wrap(err, "failed to check destination for existing data") + } + if len(result.Items) > 0 { + return trace.Errorf("unable to clone data to destination with existing data; this may be overridden by configuring 'force: true'") + } + } else { + log.WarnContext(ctx, "Skipping check for existing data in destination.") + } + + group, ctx := errgroup.WithContext(ctx) + // Add 1 to ensure a goroutine exists for getting items. + group.SetLimit(parallel + 1) + + group.Go(func() error { + var result *GetResult + pageKey := start + defer close(itemC) + for { + err := retry(ctx, 3, func() error { + var err error + result, err = src.GetRange(ctx, pageKey, RangeEnd(start), bufferSize) + if err != nil { + return trace.Wrap(err) + } + return nil + }) + if err != nil { + return trace.Wrap(err) + } + for _, item := range result.Items { + select { + case itemC <- item: + case <-ctx.Done(): + return trace.Wrap(ctx.Err()) + } + } + if len(result.Items) < bufferSize { + return nil + } + pageKey = RangeEnd(result.Items[len(result.Items)-1].Key) + } + }) + + logProgress := func() { + log.InfoContext(ctx, "Backend clone still in progress", "items_copied", migrated.Load()) + } + defer logProgress() + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + select { + case <-ticker.C: + logProgress() + case <-ctx.Done(): + return + } + }() + + for item := range itemC { + item := item + group.Go(func() error { + if err := retry(ctx, 3, func() error { + if _, err := dst.Put(ctx, item); err != nil { + return trace.Wrap(err) + } + return nil + }); err != nil { + return trace.Wrap(err) + } + migrated.Add(1) + return nil + }) + if err := ctx.Err(); err != nil { + break + } + } + if err := group.Wait(); err != nil { + return trace.Wrap(err) + } + return nil +} + +func retry(ctx context.Context, attempts int, fn func() error) error { + retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{ + Driver: retryutils.NewExponentialDriver(time.Millisecond * 100), + Max: time.Second * 2, + }) + if err != nil { + return trace.Wrap(err) + } + if attempts <= 0 { + return trace.Errorf("retry attempts must be > 0") + } + + for i := 0; i < attempts; i++ { + err = fn() + if err == nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-retry.After(): + retry.Inc() + } + } + return trace.Wrap(err) +} diff --git a/lib/backend/clone_test.go b/lib/backend/clone_test.go new file mode 100644 index 0000000000000..c4cc492441049 --- /dev/null +++ b/lib/backend/clone_test.go @@ -0,0 +1,107 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package backend_test + +import ( + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + + "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/backend/memory" +) + +func TestClone(t *testing.T) { + ctx := context.Background() + src, err := memory.New(memory.Config{}) + require.NoError(t, err) + defer src.Close() + + dst, err := memory.New(memory.Config{}) + require.NoError(t, err) + defer dst.Close() + + itemCount := 11111 + items := make([]backend.Item, itemCount) + + for i := 0; i < itemCount; i++ { + item := backend.Item{ + Key: backend.Key(fmt.Sprintf("key-%05d", i)), + Value: []byte(fmt.Sprintf("value-%d", i)), + } + _, err := src.Put(ctx, item) + require.NoError(t, err) + items[i] = item + } + + err = backend.Clone(ctx, src, dst, 10, false) + require.NoError(t, err) + + start := backend.Key("") + result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0) + require.NoError(t, err) + + diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision")) + require.Empty(t, diff) + require.NoError(t, err) + require.Len(t, result.Items, itemCount) +} + +func TestCloneForce(t *testing.T) { + ctx := context.Background() + src, err := memory.New(memory.Config{}) + require.NoError(t, err) + defer src.Close() + + dst, err := memory.New(memory.Config{}) + require.NoError(t, err) + defer dst.Close() + + itemCount := 100 + items := make([]backend.Item, itemCount) + + for i := 0; i < itemCount; i++ { + item := backend.Item{ + Key: backend.Key(fmt.Sprintf("key-%05d", i)), + Value: []byte(fmt.Sprintf("value-%d", i)), + } + _, err := src.Put(ctx, item) + require.NoError(t, err) + items[i] = item + } + + _, err = dst.Put(ctx, items[0]) + require.NoError(t, err) + + err = backend.Clone(ctx, src, dst, 10, false) + require.Error(t, err) + + err = backend.Clone(ctx, src, dst, 10, true) + require.NoError(t, err) + + start := backend.Key("") + result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0) + require.NoError(t, err) + + diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision")) + require.Empty(t, diff) + require.Len(t, result.Items, itemCount) +} diff --git a/tool/teleport/common/backend.go b/tool/teleport/common/backend.go new file mode 100644 index 0000000000000..c859af4cc088c --- /dev/null +++ b/tool/teleport/common/backend.go @@ -0,0 +1,55 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package common + +import ( + "context" + "os" + + "github.com/gravitational/trace" + "gopkg.in/yaml.v3" + + "github.com/gravitational/teleport/lib/backend" +) + +func onClone(ctx context.Context, configPath string) error { + data, err := os.ReadFile(configPath) + if err != nil { + return trace.Wrap(err) + } + var config backend.CloneConfig + if err := yaml.Unmarshal(data, &config); err != nil { + return trace.Wrap(err) + } + + src, err := backend.New(ctx, config.Source.Type, config.Source.Params) + if err != nil { + return trace.Wrap(err, "failed to create source backend") + } + defer src.Close() + + dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params) + if err != nil { + return trace.Wrap(err, "failed to create destination backend") + } + defer dst.Close() + + if err := backend.Clone(ctx, src, dst, config.Parallel, config.Force); err != nil { + return trace.Wrap(err) + } + return nil +} diff --git a/tool/teleport/common/teleport.go b/tool/teleport/common/teleport.go index 59d90d41dbf24..4e48e86c40243 100644 --- a/tool/teleport/common/teleport.go +++ b/tool/teleport/common/teleport.go @@ -541,6 +541,40 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con collectProfilesCmd.Arg("PROFILES", fmt.Sprintf("Comma-separated profile names to be exported. Supported profiles: %s. Default: %s", strings.Join(maps.Keys(debugclient.SupportedProfiles), ","), strings.Join(defaultCollectProfiles, ","))).StringVar(&ccf.Profiles) collectProfilesCmd.Flag("seconds", "For CPU and trace profiles, profile for the given duration (if set to 0, it returns a profile snapshot). For other profiles, return a delta profile. Default: 0").Short('s').Default("0").IntVar(&ccf.ProfileSeconds) + backendCmd := app.Command("backend", "Commands for managing backend data.") + backendCmd.Hidden() + backendCloneCmd := backendCmd.Command("clone", "Clones data from a source to a destination backend.") + backendCloneCmd.Flag("config", "Path to the clone config file."). + Required(). + Short('c'). + StringVar(&ccf.ConfigFile) + backendCloneCmd.Alias(` +Examples: + + When cloning a backend you must specify a clone configuration file: + + > teleport backend clone --config clone.yaml + + The following example configuration will clone Teleport's backend + data from sqlite to dynamodb: + + # src is the configuration for the backend where data is cloned from. + src: + type: sqlite + path: /var/lib/teleport_data + # dst is the configuration for the backend where data is cloned to. + dst: + type: dynamodb + region: us-east-1 + table: teleport_backend + # parallel is the amount of backend data cloned in parallel. + # If a clone operation is taking too long consider increasing this value. + parallel: 100 + # force, if set to true, will continue cloning data to a destination + # regardless of whether data is already present. By default this is false + # to protect against overwriting the data of an existing Teleport cluster. + force: false`) + // parse CLI commands+flags: utils.UpdateAppUsageTemplate(app, options.Args) command, err := app.Parse(options.Args) @@ -663,6 +697,8 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con err = onGetLogLevel(ccf.ConfigFile) case collectProfilesCmd.FullCommand(): err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds) + case backendCloneCmd.FullCommand(): + err = onClone(context.Background(), ccf.ConfigFile) } if err != nil { utils.FatalError(err)