Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] backend: add migration tool to migrate between any two backends #43419

Merged
merged 24 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b5ca16a
backend: add migration tool to migrate between any two backends
dboslee May 21, 2024
7647979
cleanup bad rebase
dboslee May 21, 2024
5d0e833
missing return value
dboslee May 21, 2024
177b4d0
close backends
dboslee May 22, 2024
c8d3cb2
Use workers and fixed size channel to limit in memory items
dboslee May 23, 2024
01f1e1c
Update tool/teleport/common/migrate.go
dboslee May 23, 2024
aad6888
use a single errgroup for both get and put operations
dboslee May 24, 2024
3881067
refactor migration to clone
dboslee May 30, 2024
ca53ad2
Add destination check and force config option
dboslee May 30, 2024
73955d8
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
134e390
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
0923ebe
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
b853932
move clone to lib/backend and remove struct
dboslee Jun 7, 2024
b4dfd24
Update tool/teleport/common/backend.go
dboslee Jun 10, 2024
6a464a9
Update lib/backend/clone.go
dboslee Jun 10, 2024
fa77cda
Update lib/backend/clone.go
dboslee Jun 10, 2024
c58af4d
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
9f15671
Update lib/backend/clone.go
dboslee Jun 10, 2024
eb4988c
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
505a646
fix lint
dboslee Jun 11, 2024
8a74e2a
fix license
dboslee Jun 20, 2024
ad3ecaa
add example config in alias
dboslee Jun 20, 2024
ef0a3ee
Merge branch 'branch/v16' into bot/backport-41866-branch/v16
dboslee Jun 24, 2024
ac673a4
Merge branch 'branch/v16' into bot/backport-41866-branch/v16
dboslee Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions lib/backend/clone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package backend

import (
"context"
"log/slog"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"golang.org/x/sync/errgroup"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/utils/retryutils"
)

// bufferSize is the number of backend items that are queried at a time.
const bufferSize = 10000

// CloneConfig contains the configuration for cloning a [Backend].
// All items from the source are copied to the destination. All Teleport Auth
// Service instances should be stopped when running clone to avoid data
// inconsistencies.
type CloneConfig struct {
// Source is the backend [Config] items are cloned from.
Source Config `yaml:"src"`
// Destination is the [Config] items are cloned to.
Destination Config `yaml:"dst"`
// Parallel is the number of items that will be cloned in parallel.
Parallel int `yaml:"parallel"`
// Force indicates whether to clone data regardless of whether data already
// exists in the destination [Backend].
Force bool `yaml:"force"`
}

// Clone copies all items from a source to a destination [Backend].
func Clone(ctx context.Context, src, dst Backend, parallel int, force bool) error {
log := slog.With(teleport.ComponentKey, "clone")
itemC := make(chan Item, bufferSize)
start := Key("")
migrated := &atomic.Int32{}

if parallel <= 0 {
parallel = 1
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if !force {
result, err := dst.GetRange(ctx, start, RangeEnd(start), 1)
if err != nil {
return trace.Wrap(err, "failed to check destination for existing data")
}
if len(result.Items) > 0 {
return trace.Errorf("unable to clone data to destination with existing data; this may be overridden by configuring 'force: true'")
}
} else {
log.WarnContext(ctx, "Skipping check for existing data in destination.")
}

group, ctx := errgroup.WithContext(ctx)
// Add 1 to ensure a goroutine exists for getting items.
group.SetLimit(parallel + 1)

group.Go(func() error {
var result *GetResult
pageKey := start
defer close(itemC)
for {
err := retry(ctx, 3, func() error {
var err error
result, err = src.GetRange(ctx, pageKey, RangeEnd(start), bufferSize)
if err != nil {
return trace.Wrap(err)
}
return nil
})
if err != nil {
return trace.Wrap(err)
}
for _, item := range result.Items {
select {
case itemC <- item:
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}
}
if len(result.Items) < bufferSize {
return nil
}
pageKey = RangeEnd(result.Items[len(result.Items)-1].Key)
}
})

logProgress := func() {
log.InfoContext(ctx, "Backend clone still in progress", "items_copied", migrated.Load())
}
defer logProgress()
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
logProgress()
case <-ctx.Done():
return
}
}()

for item := range itemC {
item := item
group.Go(func() error {
if err := retry(ctx, 3, func() error {
if _, err := dst.Put(ctx, item); err != nil {
return trace.Wrap(err)
}
return nil
}); err != nil {
return trace.Wrap(err)
}
migrated.Add(1)
return nil
})
if err := ctx.Err(); err != nil {
break
}
}
if err := group.Wait(); err != nil {
return trace.Wrap(err)
}
return nil
}

func retry(ctx context.Context, attempts int, fn func() error) error {
retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{
Driver: retryutils.NewExponentialDriver(time.Millisecond * 100),
Max: time.Second * 2,
})
if err != nil {
return trace.Wrap(err)
}
if attempts <= 0 {
return trace.Errorf("retry attempts must be > 0")
}

for i := 0; i < attempts; i++ {
err = fn()
if err == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-retry.After():
retry.Inc()
}
}
return trace.Wrap(err)
}
107 changes: 107 additions & 0 deletions lib/backend/clone_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package backend_test

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/memory"
)

func TestClone(t *testing.T) {
ctx := context.Background()
src, err := memory.New(memory.Config{})
require.NoError(t, err)
defer src.Close()

dst, err := memory.New(memory.Config{})
require.NoError(t, err)
defer dst.Close()

itemCount := 11111
items := make([]backend.Item, itemCount)

for i := 0; i < itemCount; i++ {
item := backend.Item{
Key: backend.Key(fmt.Sprintf("key-%05d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
_, err := src.Put(ctx, item)
require.NoError(t, err)
items[i] = item
}

err = backend.Clone(ctx, src, dst, 10, false)
require.NoError(t, err)

start := backend.Key("")
result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0)
require.NoError(t, err)

diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision"))
require.Empty(t, diff)
require.NoError(t, err)
require.Len(t, result.Items, itemCount)
}

func TestCloneForce(t *testing.T) {
ctx := context.Background()
src, err := memory.New(memory.Config{})
require.NoError(t, err)
defer src.Close()

dst, err := memory.New(memory.Config{})
require.NoError(t, err)
defer dst.Close()

itemCount := 100
items := make([]backend.Item, itemCount)

for i := 0; i < itemCount; i++ {
item := backend.Item{
Key: backend.Key(fmt.Sprintf("key-%05d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
_, err := src.Put(ctx, item)
require.NoError(t, err)
items[i] = item
}

_, err = dst.Put(ctx, items[0])
require.NoError(t, err)

err = backend.Clone(ctx, src, dst, 10, false)
require.Error(t, err)

err = backend.Clone(ctx, src, dst, 10, true)
require.NoError(t, err)

start := backend.Key("")
result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0)
require.NoError(t, err)

diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision"))
require.Empty(t, diff)
require.Len(t, result.Items, itemCount)
}
55 changes: 55 additions & 0 deletions tool/teleport/common/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package common

import (
"context"
"os"

"github.com/gravitational/trace"
"gopkg.in/yaml.v3"

"github.com/gravitational/teleport/lib/backend"
)

func onClone(ctx context.Context, configPath string) error {
data, err := os.ReadFile(configPath)
if err != nil {
return trace.Wrap(err)
}
var config backend.CloneConfig
if err := yaml.Unmarshal(data, &config); err != nil {
return trace.Wrap(err)
}

src, err := backend.New(ctx, config.Source.Type, config.Source.Params)
if err != nil {
return trace.Wrap(err, "failed to create source backend")
}
defer src.Close()

dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params)
if err != nil {
return trace.Wrap(err, "failed to create destination backend")
}
defer dst.Close()

if err := backend.Clone(ctx, src, dst, config.Parallel, config.Force); err != nil {
return trace.Wrap(err)
}
return nil
}
36 changes: 36 additions & 0 deletions tool/teleport/common/teleport.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,40 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
collectProfilesCmd.Arg("PROFILES", fmt.Sprintf("Comma-separated profile names to be exported. Supported profiles: %s. Default: %s", strings.Join(maps.Keys(debugclient.SupportedProfiles), ","), strings.Join(defaultCollectProfiles, ","))).StringVar(&ccf.Profiles)
collectProfilesCmd.Flag("seconds", "For CPU and trace profiles, profile for the given duration (if set to 0, it returns a profile snapshot). For other profiles, return a delta profile. Default: 0").Short('s').Default("0").IntVar(&ccf.ProfileSeconds)

backendCmd := app.Command("backend", "Commands for managing backend data.")
backendCmd.Hidden()
backendCloneCmd := backendCmd.Command("clone", "Clones data from a source to a destination backend.")
backendCloneCmd.Flag("config", "Path to the clone config file.").
Required().
Short('c').
StringVar(&ccf.ConfigFile)
backendCloneCmd.Alias(`
Examples:

When cloning a backend you must specify a clone configuration file:

> teleport backend clone --config clone.yaml

The following example configuration will clone Teleport's backend
data from sqlite to dynamodb:

# src is the configuration for the backend where data is cloned from.
src:
type: sqlite
path: /var/lib/teleport_data
# dst is the configuration for the backend where data is cloned to.
dst:
type: dynamodb
region: us-east-1
table: teleport_backend
# parallel is the amount of backend data cloned in parallel.
# If a clone operation is taking too long consider increasing this value.
parallel: 100
# force, if set to true, will continue cloning data to a destination
# regardless of whether data is already present. By default this is false
# to protect against overwriting the data of an existing Teleport cluster.
force: false`)

// parse CLI commands+flags:
utils.UpdateAppUsageTemplate(app, options.Args)
command, err := app.Parse(options.Args)
Expand Down Expand Up @@ -663,6 +697,8 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
err = onGetLogLevel(ccf.ConfigFile)
case collectProfilesCmd.FullCommand():
err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds)
case backendCloneCmd.FullCommand():
err = onClone(context.Background(), ccf.ConfigFile)
}
if err != nil {
utils.FatalError(err)
Expand Down
Loading