Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Nov 21, 2024
1 parent d2815bf commit 8cffe83
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 20 deletions.
113 changes: 112 additions & 1 deletion go/vt/mysqlctl/backup_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/ioutil"
"vitess.io/vitess/go/test/utils"

"vitess.io/vitess/go/mysql/capabilities"
Expand Down Expand Up @@ -587,6 +587,117 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) {
}
}

func TestAA(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// Set up local backup directory
id := fmt.Sprintf("%d", time.Now().UnixNano())
backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id)
filebackupstorage.FileBackupStorageRoot = backupRoot
require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir"))
dataDir := path.Join(backupRoot, "datadir")
// Add some files under data directory to force backup to execute semaphore acquire inside
// backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483).
require.NoError(t, createBackupDir(dataDir, "test1"))
require.NoError(t, createBackupDir(dataDir, "test2"))
require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd"))
require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd"))
defer os.RemoveAll(backupRoot)

needIt, err := needInnoDBRedoLogSubdir()
require.NoError(t, err)
if needIt {
fpath := path.Join("log", mysql.DynamicRedoLogSubdir)
if err := createBackupDir(backupRoot, fpath); err != nil {
require.Failf(t, err.Error(), "failed to create directory: %s", fpath)
}
}

// Set up topo
keyspace, shard := "mykeyspace", "-"
ts := memorytopo.NewServer(ctx, "cell1")
defer ts.Close()

require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{}))
require.NoError(t, ts.CreateShard(ctx, keyspace, shard))

tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100")
tablet.Keyspace = keyspace
tablet.Shard = shard

require.NoError(t, ts.CreateTablet(ctx, tablet))

_, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"}

now := time.Now()
si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())}

return nil
})
require.NoError(t, err)

bufferPerFiles := make(map[string]ioutil.BytesBufferWriter)
be := &mysqlctl.BuiltinBackupEngine{}
bh := &mysqlctl.FakeBackupHandle{}
bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn {
// This mimics what happens with the other BackupHandles where doing AddFile will either truncate or override
// any existing data if the same filename already exists.
writerCloserBuffer := ioutil.NewBytesBufferWriter()
bufferPerFiles[filename] = writerCloserBuffer

// if filename == "MANIFEST" {
// return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: writerCloserBuffer}
// }
//
// count := 0
// for _, call := range bh.AddFileCalls {
// if call.Filename == filename {
// count++
// }
// }
// // if it is the first time we call AddFile for this file, let's fail the call
// if count == 1 {
// return mysqlctl.FakeBackupHandleAddFileReturn{Err: fmt.Errorf("failed to AddFile on %s", filename)}
// }
// if it is the second time we call AddFile for this file, let's make it pass
return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: writerCloserBuffer}
}

// Spin up a fake daemon to be used in backups. It needs to be allowed to receive:
// "STOP REPLICA", "START REPLICA", in that order.
fakedb := fakesqldb.New(t)
defer fakedb.Close()
mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb)
defer mysqld.Close()
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"}

logger := logutil.NewMemoryLogger()
backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{
Logger: logger,
Mysqld: mysqld,
Cnf: &mysqlctl.Mycnf{
InnodbDataHomeDir: path.Join(backupRoot, "innodb"),
InnodbLogGroupHomeDir: path.Join(backupRoot, "log"),
DataDir: path.Join(backupRoot, "datadir"),
},
Stats: backupstats.NewFakeStats(),
Concurrency: 1,
HookExtraEnv: map[string]string{},
TopoServer: ts,
Keyspace: keyspace,
Shard: shard,
MysqlShutdownTimeout: mysqlShutdownTimeout,
}, bh)

for _, event := range logger.Events {
fmt.Println(event.String())
}

require.NoError(t, err)
require.Equal(t, mysqlctl.BackupUsable, backupResult)
}

// needInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory.
// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the
// <innodb_log_group_home_dir> (<datadir>/. by default) called "#innodb_redo". See:
Expand Down
87 changes: 68 additions & 19 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
builtinBackupEngineName = "builtin"
AutoIncrementalFromPos = "auto"
dataDictionaryFile = "mysql.ibd"

maxRetriesPerFile = 1
)

var (
Expand Down Expand Up @@ -149,6 +151,11 @@ type FileEntry struct {
// ParentPath is an optional prefix to the Base path. If empty, it is ignored. Useful
// for writing files in a temporary directory
ParentPath string

// AttemptNb specifies how many times we attempted to restore/backup this FileEntry.
// If we fail to restore/backup this FileEntry, we will retry up to maxRetriesPerFile (= 1) times.
// Every time the builtin backup engine retries this file, we increment this field by 1.
AttemptNb int
}

func init() {
Expand Down Expand Up @@ -643,8 +650,22 @@ func (be *BuiltinBackupEngine) backupFiles(

// Backup the individual file.
name := fmt.Sprintf("%v", i)
if err := be.backupFile(ctxCancel, params, bh, fe, name); err != nil {
bh.RecordError(err)
var errBackupFile error
for fe.AttemptNb <= maxRetriesPerFile {
if errBackupFile = be.backupFile(ctxCancel, params, bh, fe, name); errBackupFile == nil || ctxCancel.Err() != nil {
// Either no failure or the context was canceled. In both situations, we don't want to retry.
// If the context was canceled, it does not make sense to retry as the backup process is
// bound to fail whether we retry this file or not.
break
}

// A failure was observed, let's retry.
fe.AttemptNb++
}

// If we ran out of retries, and we still have an error, let's record it and cancel the process.
if errBackupFile != nil {
bh.RecordError(vterrors.Wrapf(errBackupFile, "failed to backup file '%s' after %d attempts", name, fe.AttemptNb))
cancel()
}
}(i)
Expand Down Expand Up @@ -725,6 +746,7 @@ type backupPipe struct {
crc32 hash.Hash32
nn int64
done chan struct{}
failed chan struct{}
closed int32
}

Expand All @@ -735,6 +757,7 @@ func newBackupWriter(filename string, writerBufferSize int, maxSize int64, w io.
filename: filename,
maxSize: maxSize,
done: make(chan struct{}),
failed: make(chan struct{}),
}
}

Expand All @@ -744,10 +767,15 @@ func newBackupReader(filename string, maxSize int64, r io.Reader) *backupPipe {
r: r,
filename: filename,
done: make(chan struct{}),
failed: make(chan struct{}),
maxSize: maxSize,
}
}

func attemptToString(attempt int) string {
return fmt.Sprintf("(attempt %d/%d)", attempt+1, maxRetriesPerFile+1)
}

func (bp *backupPipe) Read(p []byte) (int, error) {
nn, err := bp.r.Read(p)
_, _ = bp.crc32.Write(p[:nn])
Expand All @@ -762,9 +790,17 @@ func (bp *backupPipe) Write(p []byte) (int, error) {
return nn, err
}

func (bp *backupPipe) Close() error {
func (bp *backupPipe) Close(isDone bool) (err error) {
if atomic.CompareAndSwapInt32(&bp.closed, 0, 1) {
close(bp.done)
// If we fail to Flush the writer we must report this backup as a failure.
defer func() {
if isDone && err == nil {
close(bp.done)
return
}
close(bp.failed)
}()

if bp.w != nil {
if err := bp.w.Flush(); err != nil {
return err
Expand All @@ -778,33 +814,38 @@ func (bp *backupPipe) HashString() string {
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool) {
messageStr := "restoring "
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool, attemptStr string) {
messageStr := "restoring"
if !restore {
messageStr = "backing up "
messageStr = "backing up"
}
tick := time.NewTicker(period)
defer tick.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %s of %q file", messageStr, bp.filename)
logger.Infof("Canceled %s of %q file %s", messageStr, bp.filename, attemptStr)
return
case <-bp.done:
logger.Infof("Completed %s %q", messageStr, bp.filename)
logger.Infof("Completed %s %q %s", messageStr, bp.filename, attemptStr)
return
case <-bp.failed:
logger.Infof("Failed %s %q %s", messageStr, bp.filename, attemptStr)
return
case <-tick.C:
written := float64(atomic.LoadInt64(&bp.nn))
if bp.maxSize == 0 {
logger.Infof("%s %q: %.02fkb", messageStr, bp.filename, written/1024.0)
logger.Infof("%s %q %s: %.02fkb", messageStr, bp.filename, attemptStr, written/1024.0)
} else {
maxSize := float64(bp.maxSize)
logger.Infof("%s %q: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, 100.0*written/maxSize, written/1024.0, maxSize/1024.0)
logger.Infof("%s %q %s: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, attemptStr, 100.0*written/maxSize, written/1024.0, maxSize/1024.0)
}
}
}
}

var done bool

// backupFile backs up an individual file.
func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) {
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -835,11 +876,12 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
return err
}

attemptStr := attemptToString(fe.AttemptNb)
br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, false /*restore*/)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, false /*restore*/, attemptStr)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v", fe.Name)
params.Logger.Infof("Backing up file: %v %s", fe.Name, attemptStr)
openDestAt := time.Now()
dest, err := bh.AddFile(ctx, name, fi.Size())
if err != nil {
Expand Down Expand Up @@ -870,14 +912,20 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
var writer io.Writer = bw

defer func() {
if name == "3" && done == false {
// done = true
createAndCopyErr = errors.New("failed to copy file 3")
}

// Close the backupPipe to finish writing on destination.
if err := bw.Close(); err != nil {
if err := bw.Close(createAndCopyErr == nil); err != nil {
createAndCopyErr = errors.Join(createAndCopyErr, vterrors.Wrapf(err, "cannot flush destination: %v", name))
}

if err := br.Close(); err != nil {
if err := br.Close(createAndCopyErr == nil); err != nil {
createAndCopyErr = errors.Join(createAndCopyErr, vterrors.Wrap(err, "failed to close the source reader"))
}

}()
// Create the gzip compression pipe, if necessary.
if backupStorageCompress {
Expand All @@ -898,9 +946,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
defer func() {
// Close gzip to flush it, after that all data is sent to writer.
closeCompressorAt := time.Now()
params.Logger.Infof("closing compressor")
params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, attemptStr)
if cerr := closer.Close(); err != nil {
cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name)
cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", fe.Name)
params.Logger.Error(cerr)
createAndCopyErr = errors.Join(createAndCopyErr, cerr)
}
Expand Down Expand Up @@ -1110,8 +1158,9 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
}()

attemptStr := attemptToString(fe.AttemptNb)
br := newBackupReader(name, 0, timedSource)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true, attemptStr)
var reader io.Reader = br

// Open the destination file for writing.
Expand Down Expand Up @@ -1197,7 +1246,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
return vterrors.Wrap(err, "failed to flush destination buffer")
}

if err := br.Close(); err != nil {
if err := br.Close(true); err != nil {
return vterrors.Wrap(err, "failed to close the source reader")
}

Expand Down
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/fakebackupstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type FakeBackupHandle struct {
AbortBackupReturn error
AddFileCalls []FakeBackupHandleAddFileCall
AddFileReturn FakeBackupHandleAddFileReturn
AddFileReturnF func(filename string) FakeBackupHandleAddFileReturn
EndBackupCalls []context.Context
EndBackupReturn error
ReadFileCalls []FakeBackupHandleReadFileCall
Expand Down Expand Up @@ -79,6 +80,11 @@ func (fbh *FakeBackupHandle) Name() string {

func (fbh *FakeBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
fbh.AddFileCalls = append(fbh.AddFileCalls, FakeBackupHandleAddFileCall{ctx, filename, filesize})

if fbh.AddFileReturnF != nil {
r := fbh.AddFileReturnF(filename)
return r.WriteCloser, r.Err
}
return fbh.AddFileReturn.WriteCloser, fbh.AddFileReturn.Err
}

Expand Down

0 comments on commit 8cffe83

Please sign in to comment.