Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] backend: add migration tool to migrate between any two backends #43418

Merged
merged 27 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fe056b1
backend: add migration tool to migrate between any two backends
dboslee May 21, 2024
2d79e9a
cleanup bad rebase
dboslee May 21, 2024
f474412
missing return value
dboslee May 21, 2024
22fc8bb
close backends
dboslee May 22, 2024
8208f71
Use workers and fixed size channel to limit in memory items
dboslee May 23, 2024
aa71635
Update tool/teleport/common/migrate.go
dboslee May 23, 2024
14fc18c
use a single errgroup for both get and put operations
dboslee May 24, 2024
9bc9475
refactor migration to clone
dboslee May 30, 2024
e2a7c5d
Add destination check and force config option
dboslee May 30, 2024
2184280
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
9b36866
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
7f67d53
Update lib/backend/clone/clone.go
dboslee Jun 7, 2024
f20e6cf
move clone to lib/backend and remove struct
dboslee Jun 7, 2024
46e2916
Update tool/teleport/common/backend.go
dboslee Jun 10, 2024
af46c96
Update lib/backend/clone.go
dboslee Jun 10, 2024
fabe3c4
Update lib/backend/clone.go
dboslee Jun 10, 2024
655e3c6
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
ca7744e
Update lib/backend/clone.go
dboslee Jun 10, 2024
2b83772
Update lib/backend/clone_test.go
dboslee Jun 10, 2024
4293818
fix lint
dboslee Jun 11, 2024
2138664
fix license
dboslee Jun 20, 2024
5bfaa51
add example config in alias
dboslee Jun 20, 2024
223a934
Merge branch 'branch/v15' into bot/backport-41866-branch/v15
dboslee Jun 24, 2024
328e5d6
Merge branch 'branch/v15' into bot/backport-41866-branch/v15
dboslee Jun 24, 2024
3d8795f
Merge branch 'branch/v15' into bot/backport-41866-branch/v15
dboslee Jun 25, 2024
af1b425
Merge branch 'branch/v15' into bot/backport-41866-branch/v15
dboslee Jun 25, 2024
c7c7939
ignore backend.Item ID field
dboslee Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions lib/backend/clone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package backend

import (
"context"
"log/slog"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"golang.org/x/sync/errgroup"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/utils/retryutils"
)

// bufferSize is the number of backend items that are queried at a time.
const bufferSize = 10000

// CloneConfig contains the configuration for cloning a [Backend].
// All items from the source are copied to the destination. All Teleport Auth
// Service instances should be stopped when running clone to avoid data
// inconsistencies.
type CloneConfig struct {
// Source is the backend [Config] items are cloned from.
Source Config `yaml:"src"`
// Destination is the [Config] items are cloned to.
Destination Config `yaml:"dst"`
// Parallel is the number of items that will be cloned in parallel.
Parallel int `yaml:"parallel"`
// Force indicates whether to clone data regardless of whether data already
// exists in the destination [Backend].
Force bool `yaml:"force"`
}

// Clone copies all items from a source to a destination [Backend].
func Clone(ctx context.Context, src, dst Backend, parallel int, force bool) error {
log := slog.With(teleport.ComponentKey, "clone")
itemC := make(chan Item, bufferSize)
start := Key("")
migrated := &atomic.Int32{}

if parallel <= 0 {
parallel = 1
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

if !force {
result, err := dst.GetRange(ctx, start, RangeEnd(start), 1)
if err != nil {
return trace.Wrap(err, "failed to check destination for existing data")
}
if len(result.Items) > 0 {
return trace.Errorf("unable to clone data to destination with existing data; this may be overridden by configuring 'force: true'")
}
} else {
log.WarnContext(ctx, "Skipping check for existing data in destination.")
}

group, ctx := errgroup.WithContext(ctx)
// Add 1 to ensure a goroutine exists for getting items.
group.SetLimit(parallel + 1)

group.Go(func() error {
var result *GetResult
pageKey := start
defer close(itemC)
for {
err := retry(ctx, 3, func() error {
var err error
result, err = src.GetRange(ctx, pageKey, RangeEnd(start), bufferSize)
if err != nil {
return trace.Wrap(err)
}
return nil
})
if err != nil {
return trace.Wrap(err)
}
for _, item := range result.Items {
select {
case itemC <- item:
case <-ctx.Done():
return trace.Wrap(ctx.Err())
}
}
if len(result.Items) < bufferSize {
return nil
}
pageKey = RangeEnd(result.Items[len(result.Items)-1].Key)
}
})

logProgress := func() {
log.InfoContext(ctx, "Backend clone still in progress", "items_copied", migrated.Load())
}
defer logProgress()
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
select {
case <-ticker.C:
logProgress()
case <-ctx.Done():
return
}
}()

for item := range itemC {
item := item
group.Go(func() error {
if err := retry(ctx, 3, func() error {
if _, err := dst.Put(ctx, item); err != nil {
return trace.Wrap(err)
}
return nil
}); err != nil {
return trace.Wrap(err)
}
migrated.Add(1)
return nil
})
if err := ctx.Err(); err != nil {
break
}
}
if err := group.Wait(); err != nil {
return trace.Wrap(err)
}
return nil
}

func retry(ctx context.Context, attempts int, fn func() error) error {
retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{
Driver: retryutils.NewExponentialDriver(time.Millisecond * 100),
Max: time.Second * 2,
})
if err != nil {
return trace.Wrap(err)
}
if attempts <= 0 {
return trace.Errorf("retry attempts must be > 0")
}

for i := 0; i < attempts; i++ {
err = fn()
if err == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-retry.After():
retry.Inc()
}
}
return trace.Wrap(err)
}
107 changes: 107 additions & 0 deletions lib/backend/clone_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package backend_test

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/memory"
)

func TestClone(t *testing.T) {
ctx := context.Background()
src, err := memory.New(memory.Config{})
require.NoError(t, err)
defer src.Close()

dst, err := memory.New(memory.Config{})
require.NoError(t, err)
defer dst.Close()

itemCount := 11111
items := make([]backend.Item, itemCount)

for i := 0; i < itemCount; i++ {
item := backend.Item{
Key: backend.Key(fmt.Sprintf("key-%05d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
_, err := src.Put(ctx, item)
require.NoError(t, err)
items[i] = item
}

err = backend.Clone(ctx, src, dst, 10, false)
require.NoError(t, err)

start := backend.Key("")
result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0)
require.NoError(t, err)

diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision"))
require.Empty(t, diff)
require.NoError(t, err)
require.Len(t, result.Items, itemCount)
}

func TestCloneForce(t *testing.T) {
ctx := context.Background()
src, err := memory.New(memory.Config{})
require.NoError(t, err)
defer src.Close()

dst, err := memory.New(memory.Config{})
require.NoError(t, err)
defer dst.Close()

itemCount := 100
items := make([]backend.Item, itemCount)

for i := 0; i < itemCount; i++ {
item := backend.Item{
Key: backend.Key(fmt.Sprintf("key-%05d", i)),
Value: []byte(fmt.Sprintf("value-%d", i)),
}
_, err := src.Put(ctx, item)
require.NoError(t, err)
items[i] = item
}

_, err = dst.Put(ctx, items[0])
require.NoError(t, err)

err = backend.Clone(ctx, src, dst, 10, false)
require.Error(t, err)

err = backend.Clone(ctx, src, dst, 10, true)
require.NoError(t, err)

start := backend.Key("")
result, err := dst.GetRange(ctx, start, backend.RangeEnd(start), 0)
require.NoError(t, err)

diff := cmp.Diff(items, result.Items, cmpopts.IgnoreFields(backend.Item{}, "Revision"))
require.Empty(t, diff)
require.Len(t, result.Items, itemCount)
}
55 changes: 55 additions & 0 deletions tool/teleport/common/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package common

import (
"context"
"os"

"github.com/gravitational/trace"
"gopkg.in/yaml.v3"

"github.com/gravitational/teleport/lib/backend"
)

func onClone(ctx context.Context, configPath string) error {
data, err := os.ReadFile(configPath)
if err != nil {
return trace.Wrap(err)
}
var config backend.CloneConfig
if err := yaml.Unmarshal(data, &config); err != nil {
return trace.Wrap(err)
}

src, err := backend.New(ctx, config.Source.Type, config.Source.Params)
if err != nil {
return trace.Wrap(err, "failed to create source backend")
}
defer src.Close()

dst, err := backend.New(ctx, config.Destination.Type, config.Destination.Params)
if err != nil {
return trace.Wrap(err, "failed to create destination backend")
}
defer dst.Close()

if err := backend.Clone(ctx, src, dst, config.Parallel, config.Force); err != nil {
return trace.Wrap(err)
}
return nil
}
36 changes: 36 additions & 0 deletions tool/teleport/common/teleport.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,40 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
collectProfilesCmd.Arg("PROFILES", fmt.Sprintf("Comma-separated profile names to be exported. Supported profiles: %s. Default: %s", strings.Join(maps.Keys(debugclient.SupportedProfiles), ","), strings.Join(defaultCollectProfiles, ","))).StringVar(&ccf.Profiles)
collectProfilesCmd.Flag("seconds", "For CPU and trace profiles, profile for the given duration (if set to 0, it returns a profile snapshot). For other profiles, return a delta profile. Default: 0").Short('s').Default("0").IntVar(&ccf.ProfileSeconds)

backendCmd := app.Command("backend", "Commands for managing backend data.")
backendCmd.Hidden()
backendCloneCmd := backendCmd.Command("clone", "Clones data from a source to a destination backend.")
backendCloneCmd.Flag("config", "Path to the clone config file.").
Required().
Short('c').
StringVar(&ccf.ConfigFile)
backendCloneCmd.Alias(`
Examples:

When cloning a backend you must specify a clone configuration file:

> teleport backend clone --config clone.yaml

The following example configuration will clone Teleport's backend
data from sqlite to dynamodb:

# src is the configuration for the backend where data is cloned from.
src:
type: sqlite
path: /var/lib/teleport_data
# dst is the configuration for the backend where data is cloned to.
dst:
type: dynamodb
region: us-east-1
table: teleport_backend
# parallel is the amount of backend data cloned in parallel.
# If a clone operation is taking too long consider increasing this value.
parallel: 100
# force, if set to true, will continue cloning data to a destination
# regardless of whether data is already present. By default this is false
# to protect against overwriting the data of an existing Teleport cluster.
force: false`)

// parse CLI commands+flags:
utils.UpdateAppUsageTemplate(app, options.Args)
command, err := app.Parse(options.Args)
Expand Down Expand Up @@ -663,6 +697,8 @@ func Run(options Options) (app *kingpin.Application, executedCommand string, con
err = onGetLogLevel(ccf.ConfigFile)
case collectProfilesCmd.FullCommand():
err = onCollectProfiles(ccf.ConfigFile, ccf.Profiles, ccf.ProfileSeconds)
case backendCloneCmd.FullCommand():
err = onClone(context.Background(), ccf.ConfigFile)
}
if err != nil {
utils.FatalError(err)
Expand Down
Loading