Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: add migration tool to migrate between any two backends #41866

Merged
merged 29 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8bcc979
backend: add migration tool to migrate between any two backends
dboslee May 21, 2024
f8512be
cleanup bad rebase
dboslee May 21, 2024
272f5cd
missing return value
dboslee May 21, 2024
6999c36
close backends
dboslee May 22, 2024
af76ff7
Use workers and fixed size channel to limit in memory items
dboslee May 23, 2024
608f6c1
Update tool/teleport/common/migrate.go
dboslee May 23, 2024
27e326b
use a single errgroup for both get and put operations
dboslee May 24, 2024
0193ebc
refactor migration to clone
dboslee May 30, 2024
bffa1a1
Add destination check and force config option
dboslee May 30, 2024
d4f8580
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
a1d5014
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
47296d9
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
dda8fd3
move clone to lib/backend and remove struct
dboslee Jun 7, 2024
8422584
Merge branch 'master' into david/migrate
dboslee Jun 7, 2024
2a71757
Update tool/teleport/common/backend.go
dboslee Jun 10, 2024
c6d346f
Update lib/backend/clone.go
dboslee Jun 10, 2024
dd92860
Update lib/backend/clone.go
dboslee Jun 10, 2024
1ba5b27
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
53d1e72
Update lib/backend/clone.go
dboslee Jun 10, 2024
65e2fb4
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
e38191e
Merge branch 'master' into david/migrate
dboslee Jun 10, 2024
ecb8bdd
fix lint
dboslee Jun 11, 2024
e57e0c4
Merge branch 'master' into david/migrate
dboslee Jun 11, 2024
fc0871f
fix license
dboslee Jun 20, 2024
fe22880
add example config in alias
dboslee Jun 20, 2024
a84c31e
Merge branch 'master' into david/migrate
dboslee Jun 20, 2024
5beceba
Merge branch 'master' into david/migrate
dboslee Jun 20, 2024
773cd18
Merge branch 'master' into david/migrate
dboslee Jun 20, 2024
80d534c
Merge branch 'master' into david/migrate
dboslee Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions lib/backend/migration/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package migration

import (
"context"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/trace"
)

// Migration manages a migration between two [backend.Backend] interfaces.
type Migration struct {
dboslee marked this conversation as resolved.
Show resolved Hide resolved
src backend.Backend
dst backend.Backend
parallel int
total int
migrated atomic.Int64
log logrus.FieldLogger
}
dboslee marked this conversation as resolved.
Show resolved Hide resolved

// MigrationConfig configures a [Migration] with a source and destination backend.
// All items from the source are copied to the destination. All Teleport Auth
// Service instances should be stopped when running a migration to avoid data
// inconsistencies.
type MigrationConfig struct {
// Source is the backend [backend.Config] items are migrated from.
Source backend.Config `yaml:"src"`
// Destination is the [backend.Config] items are migrated to.
Destination backend.Config `yaml:"dst"`
// Parallel is the number of items that will be migraated in parallel.
Parallel int `yaml:"parallel"`
// Log logs the progress of a [Migration]
Log logrus.FieldLogger
dboslee marked this conversation as resolved.
Show resolved Hide resolved
}

// New returns a [Migration] based on the provided [MigrationConfig].
func New(ctx context.Context, config MigrationConfig) (*Migration, error) {
src, err := backend.New(ctx, config.Source.Type, config.Source.Params)
if err != nil {
return nil, trace.Wrap(err, "failed to create source backend")
}
dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params)
if err != nil {
return nil, trace.Wrap(err, "failed to create destination backend")
}
dboslee marked this conversation as resolved.
Show resolved Hide resolved
migration := &Migration{
src: src,
dst: dst,
parallel: config.Parallel,
log: config.Log,
}
if migration.parallel == 0 {
migration.parallel = 1
}
if migration.log == nil {
migration.log = logrus.WithField(teleport.ComponentKey, "migration")
}
return migration, nil
}

// Run runs a [Migration] until complete.
func (m *Migration) Run(ctx context.Context) error {
var all []backend.Item
start := backend.Key("")
dboslee marked this conversation as resolved.
Show resolved Hide resolved
err := retry(ctx, 3, func() error {
result, err := m.src.GetRange(ctx, start, backend.RangeEnd(start), 0)
dboslee marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return trace.Wrap(err)
}
all = result.Items
return nil
})
if err != nil {
return trace.Wrap(err)
}
m.total = len(all)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

group, ctx := errgroup.WithContext(ctx)
group.SetLimit(m.parallel)

logProgress := func() {
m.log.Info("Migrated %d/%d", m.migrated.Load(), m.total)
}
defer logProgress()
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
logProgress()
case <-ctx.Done():
return
}
}()

for _, item := range all {
item := item
group.Go(func() error {
if err := retry(ctx, 3, func() error {
if _, err := m.dst.Put(ctx, item); err != nil {
return trace.Wrap(err)
}
return nil
}); err != nil {
return trace.Wrap(err)
}
m.migrated.Add(1)
return nil
})
if err := ctx.Err(); err != nil {
break
}
}

if err := group.Wait(); err != nil {
return trace.Wrap(err)
}
return nil
}

func retry(ctx context.Context, attempts int, fn func() error) error {
dboslee marked this conversation as resolved.
Show resolved Hide resolved
retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{
Driver: retryutils.NewExponentialDriver(time.Millisecond * 100),
Max: time.Second * 2,
})
if err != nil {
return trace.Wrap(err)
}
if attempts <= 0 {
return trace.Errorf("retry attempts must be > 0")
}

for i := 0; i < attempts; i++ {
err = fn()
if err == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-retry.After():
retry.Inc()
}
}
return trace.Wrap(err)
}
56 changes: 56 additions & 0 deletions lib/backend/migration/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package migration

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/memory"
)

func TestMigration(t *testing.T) {
ctx := context.Background()
src, err := memory.New(memory.Config{})
require.NoError(t, err)

dst, err := memory.New(memory.Config{})
require.NoError(t, err)

itemCount := 1111
items := make([]backend.Item, itemCount)

for i := 0; i < itemCount; i++ {
item := backend.Item{
Key: backend.Key(fmt.Sprintf("key-%05d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
_, err := src.Put(ctx, item)
require.NoError(t, err)
items[i] = item
}

migration := Migration{
src: src,
dst: dst,
parallel: 10,
log: logrus.New(),
}

err = migration.Run(ctx)
require.NoError(t, err)

start := backend.Key("")
result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0)
require.NoError(t, err)

diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision", "ID"))
dboslee marked this conversation as resolved.
Show resolved Hide resolved
require.Empty(t, diff)
require.Equal(t, itemCount, migration.total)
require.Equal(t, itemCount, int(migration.migrated.Load()))
}
31 changes: 31 additions & 0 deletions tool/teleport/common/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package common

import (
"context"
"os"

"gopkg.in/yaml.v3"

"github.com/gravitational/teleport/lib/backend/migration"
"github.com/gravitational/trace"
)

func runMigration(ctx context.Context, path string) error {
dboslee marked this conversation as resolved.
Show resolved Hide resolved
data, err := os.ReadFile(path)
if err != nil {
return trace.Wrap(err)
}
config := migration.MigrationConfig{}
dboslee marked this conversation as resolved.
Show resolved Hide resolved
if err := yaml.Unmarshal(data, &config); err != nil {
return trace.Wrap(err)
}

migration, err := migration.New(ctx, config)
if err != nil {
return trace.Wrap(err)
}
if err := migration.Run(ctx); err != nil {
return trace.Wrap(err)
}
return nil
}
7 changes: 7 additions & 0 deletions tool/teleport/common/teleport.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
remoteForward := app.Command(teleport.RemoteForwardSubCommand, "Used internally by Teleport to re-exec itself to remote port forward.").Hidden()
checkHomeDir := app.Command(teleport.CheckHomeDirSubCommand, "Used internally by Teleport to re-exec itself to check access to a directory.").Hidden()
park := app.Command(teleport.ParkSubCommand, "Used internally by Teleport to re-exec itself to do nothing.").Hidden()
migrate := app.Command("migrate", "Migrate data between two backends.").Hidden()
dboslee marked this conversation as resolved.
Show resolved Hide resolved
app.HelpFlag.Short('h')

migrate.Flag("config", "Path to migration config file.").
Short('c').
StringVar(&ccf.ConfigFile)

// define start flags:
start.Flag("debug", "Enable verbose logging to stderr").
Short('d').
Expand Down Expand Up @@ -655,6 +660,8 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
err = onGetLogLevel(ccf.ConfigFile)
case collectProfilesCmd.FullCommand():
err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds)
case migrate.FullCommand():
err = runMigration(context.Background(), ccf.ConfigFile)
}
if err != nil {
utils.FatalError(err)
Expand Down
Loading