Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: add migration tool to migrate between any two backends #41866

Merged
merged 29 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8bcc979
backend: add migration tool to migrate between any two backends
dboslee May 21, 2024
f8512be
cleanup bad rebase
dboslee May 21, 2024
272f5cd
missing return value
dboslee May 21, 2024
6999c36
close backends
dboslee May 22, 2024
af76ff7
Use workers and fixed size channel to limit in memory items
dboslee May 23, 2024
608f6c1
Update tool/teleport/common/migrate.go
dboslee May 23, 2024
27e326b
use a single errgroup for both get and put operations
dboslee May 24, 2024
0193ebc
refactor migration to clone
dboslee May 30, 2024
bffa1a1
Add destination check and force config option
dboslee May 30, 2024
d4f8580
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
a1d5014
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
47296d9
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
dda8fd3
move clone to lib/backend and remove struct
dboslee Jun 7, 2024
8422584
Merge branch 'master' into david/migrate
dboslee Jun 7, 2024
2a71757
Update tool/teleport/common/backend.go
dboslee Jun 10, 2024
c6d346f
Update lib/backend/clone.go
dboslee Jun 10, 2024
dd92860
Update lib/backend/clone.go
dboslee Jun 10, 2024
1ba5b27
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
53d1e72
Update lib/backend/clone.go
dboslee Jun 10, 2024
65e2fb4
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
e38191e
Merge branch 'master' into david/migrate
dboslee Jun 10, 2024
ecb8bdd
fix lint
dboslee Jun 11, 2024
e57e0c4
Merge branch 'master' into david/migrate
dboslee Jun 11, 2024
fc0871f
fix license
dboslee Jun 20, 2024
fe22880
add example config in alias
dboslee Jun 20, 2024
a84c31e
Merge branch 'master' into david/migrate
dboslee Jun 20, 2024
5beceba
Merge branch 'master' into david/migrate
dboslee Jun 20, 2024
773cd18
Merge branch 'master' into david/migrate
dboslee Jun 20, 2024
80d534c
Merge branch 'master' into david/migrate
dboslee Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions lib/backend/clone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package backend
dboslee marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"log/slog"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"golang.org/x/sync/errgroup"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/utils/retryutils"
)

// bufferSize is the number of backend items that are queried at a time.
const bufferSize = 10000

// CloneConfig contains the configuration for cloning a [Backend].
// All items from the source are copied to the destination. All Teleport Auth
// Service instances should be stopped when running clone to avoid data
// inconsistencies.
type CloneConfig struct {
// Source is the backend [Config] items are cloned from.
Source Config `yaml:"src"`
// Destination is the [Config] items are cloned to.
Destination Config `yaml:"dst"`
// Parallel is the number of items that will be cloned in parallel.
Parallel int `yaml:"parallel"`
// Force indicates whether to clone data regardless of whether data already
// exists in the destination [Backend].
Force bool `yaml:"force"`
}

// Clone copies all items from a source to a destination [Backend].
func Clone(ctx context.Context, src, dst Backend, parallel int, force bool) error {
log := slog.With(teleport.ComponentKey, "clone")
itemC := make(chan Item, bufferSize)
start := Key("")
migrated := &atomic.Int32{}

if parallel <= 0 {
parallel = 1
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if !force {
result, err := dst.GetRange(ctx, start, RangeEnd(start), 1)
if err != nil {
return trace.Wrap(err, "failed to check destination for existing data")
}
if len(result.Items) > 0 {
return trace.Errorf("unable to clone data to destination with existing data; this may be overridden by configuring 'force: true'")
}
} else {
log.WarnContext(ctx, "Skipping check for existing data in destination.")
}

group, ctx := errgroup.WithContext(ctx)
// Add 1 to ensure a goroutine exists for getting items.
group.SetLimit(parallel + 1)

group.Go(func() error {
var result *GetResult
pageKey := start
defer close(itemC)
for {
err := retry(ctx, 3, func() error {
var err error
result, err = src.GetRange(ctx, pageKey, RangeEnd(start), bufferSize)
if err != nil {
return trace.Wrap(err)
}
return nil
})
if err != nil {
return trace.Wrap(err)
}
for _, item := range result.Items {
select {
case itemC <- item:
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}
}
if len(result.Items) < bufferSize {
return nil
}
pageKey = RangeEnd(result.Items[len(result.Items)-1].Key)
}
})

logProgress := func() {
log.InfoContext(ctx, "Backend clone still in progress", "items_copied", migrated.Load())
}
defer logProgress()
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
logProgress()
case <-ctx.Done():
return
}
}()

for item := range itemC {
item := item
group.Go(func() error {
if err := retry(ctx, 3, func() error {
if _, err := dst.Put(ctx, item); err != nil {
return trace.Wrap(err)
}
return nil
}); err != nil {
return trace.Wrap(err)
}
migrated.Add(1)
return nil
})
if err := ctx.Err(); err != nil {
break
}
}
if err := group.Wait(); err != nil {
return trace.Wrap(err)
}
return nil
}

func retry(ctx context.Context, attempts int, fn func() error) error {
retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{
Driver: retryutils.NewExponentialDriver(time.Millisecond * 100),
Max: time.Second * 2,
})
if err != nil {
return trace.Wrap(err)
}
if attempts <= 0 {
return trace.Errorf("retry attempts must be > 0")
}

for i := 0; i < attempts; i++ {
err = fn()
if err == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-retry.After():
retry.Inc()
}
}
return trace.Wrap(err)
}
107 changes: 107 additions & 0 deletions lib/backend/clone_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package backend_test

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/memory"
)

func TestClone(t *testing.T) {
ctx := context.Background()
src, err := memory.New(memory.Config{})
require.NoError(t, err)
defer src.Close()

dst, err := memory.New(memory.Config{})
require.NoError(t, err)
defer dst.Close()

itemCount := 11111
items := make([]backend.Item, itemCount)

for i := 0; i < itemCount; i++ {
item := backend.Item{
Key: backend.Key(fmt.Sprintf("key-%05d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
_, err := src.Put(ctx, item)
require.NoError(t, err)
items[i] = item
}

err = backend.Clone(ctx, src, dst, 10, false)
require.NoError(t, err)

start := backend.Key("")
result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0)
require.NoError(t, err)

diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision"))
require.Empty(t, diff)
require.NoError(t, err)
require.Len(t, result.Items, itemCount)
}

func TestCloneForce(t *testing.T) {
ctx := context.Background()
src, err := memory.New(memory.Config{})
require.NoError(t, err)
defer src.Close()

dst, err := memory.New(memory.Config{})
require.NoError(t, err)
defer dst.Close()

itemCount := 100
items := make([]backend.Item, itemCount)

for i := 0; i < itemCount; i++ {
item := backend.Item{
Key: backend.Key(fmt.Sprintf("key-%05d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
_, err := src.Put(ctx, item)
require.NoError(t, err)
items[i] = item
}

_, err = dst.Put(ctx, items[0])
require.NoError(t, err)

err = backend.Clone(ctx, src, dst, 10, false)
require.Error(t, err)

err = backend.Clone(ctx, src, dst, 10, true)
require.NoError(t, err)

start := backend.Key("")
result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0)
require.NoError(t, err)

diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision"))
require.Empty(t, diff)
require.Len(t, result.Items, itemCount)
}
55 changes: 55 additions & 0 deletions tool/teleport/common/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package common

import (
"context"
"os"

"github.com/gravitational/trace"
"gopkg.in/yaml.v3"

"github.com/gravitational/teleport/lib/backend"
)

func onClone(ctx context.Context, configPath string) error {
data, err := os.ReadFile(configPath)
if err != nil {
return trace.Wrap(err)
}
var config backend.CloneConfig
if err := yaml.Unmarshal(data, &config); err != nil {
return trace.Wrap(err)
}

src, err := backend.New(ctx, config.Source.Type, config.Source.Params)
if err != nil {
return trace.Wrap(err, "failed to create source backend")
}
defer src.Close()

dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params)
if err != nil {
return trace.Wrap(err, "failed to create destination backend")
}
defer dst.Close()

if err := backend.Clone(ctx, src, dst, config.Parallel, config.Force); err != nil {
return trace.Wrap(err)
}
return nil
}
36 changes: 36 additions & 0 deletions tool/teleport/common/teleport.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,40 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
collectProfilesCmd.Arg("PROFILES", fmt.Sprintf("Comma-separated profile names to be exported. Supported profiles: %s. Default: %s", strings.Join(maps.Keys(debugclient.SupportedProfiles), ","), strings.Join(defaultCollectProfiles, ","))).StringVar(&ccf.Profiles)
collectProfilesCmd.Flag("seconds", "For CPU and trace profiles, profile for the given duration (if set to 0, it returns a profile snapshot). For other profiles, return a delta profile. Default: 0").Short('s').Default("0").IntVar(&ccf.ProfileSeconds)

backendCmd := app.Command("backend", "Commands for managing backend data.")
backendCmd.Hidden()
backendCloneCmd := backendCmd.Command("clone", "Clones data from a source to a destination backend.")
backendCloneCmd.Flag("config", "Path to the clone config file.").
Required().
Short('c').
StringVar(&ccf.ConfigFile)
rosstimothy marked this conversation as resolved.
Show resolved Hide resolved
backendCloneCmd.Alias(`
Examples:

When cloning a backend you must specify a clone configuration file:

> teleport backend clone --config clone.yaml

The following example configuration will clone Teleport's backend
data from sqlite to dynamodb:

# src is the configuration for the backend where data is cloned from.
src:
type: sqlite
path: /var/lib/teleport_data
# dst is the configuration for the backend where data is cloned to.
dst:
type: dynamodb
region: us-east-1
table: teleport_backend
# parallel is the amount of backend data cloned in parallel.
# If a clone operation is taking too long consider increasing this value.
parallel: 100
# force, if set to true, will continue cloning data to a destination
# regardless of whether data is already present. By default this is false
# to protect against overwriting the data of an existing Teleport cluster.
force: false`)

// parse CLI commands+flags:
utils.UpdateAppUsageTemplate(app, options.Args)
command, err := app.Parse(options.Args)
Expand Down Expand Up @@ -663,6 +697,8 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
err = onGetLogLevel(ccf.ConfigFile)
case collectProfilesCmd.FullCommand():
err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds)
case backendCloneCmd.FullCommand():
err = onClone(context.Background(), ccf.ConfigFile)
}
if err != nil {
utils.FatalError(err)
Expand Down
Loading