Skip to content

Commit

Permalink
meta/sql: simplify backup v2 (#5352)
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
Co-authored-by: jiefenghuang <[email protected]>
  • Loading branch information
davies and jiefenghuang authored Dec 9, 2024
1 parent 36a7502 commit 08e52bb
Show file tree
Hide file tree
Showing 10 changed files with 970 additions and 1,817 deletions.
404 changes: 180 additions & 224 deletions pkg/meta/backup.go

Large diffs are not rendered by default.

58 changes: 21 additions & 37 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ type engine interface {

newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler

execETxn(ctx Context, txn *eTxn, fn func(ctx Context, txn *eTxn) error) error
buildDumpedSeg(typ int, opt *DumpOption, txn *eTxn) iDumpedSeg
buildLoadedSeg(typ int, opt *LoadOption) iLoadedSeg
dump(ctx Context, opt *DumpOption, ch chan<- *dumpedResult) error
load(ctx Context, typ int, opt *LoadOption, val proto.Message) error
prepareLoad(ctx Context, opt *LoadOption) error
}

Expand Down Expand Up @@ -3086,33 +3085,15 @@ func (h *dirHandler) Close() {
func (m *baseMeta) DumpMetaV2(ctx Context, w io.Writer, opt *DumpOption) error {
opt = opt.check()

bak := NewBakFormat()
bak := newBakFormat()
ch := make(chan *dumpedResult, 100)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

txn := &eTxn{
en: m.en,
opt: &bTxnOption{
coNum: opt.CoNum,
maxRetry: 1,
maxStmtRetry: 3,
},
}
err := m.en.execETxn(ctx, txn, func(ctx Context, txn *eTxn) error {
for typ := SegTypeFormat; typ < SegTypeMax; typ++ {
seg := m.en.buildDumpedSeg(typ, opt, txn)
if seg != nil {
if err := seg.dump(ctx, ch); err != nil {
return fmt.Errorf("dump %s err: %w", seg, err)
}
}
}
return nil
})
err := m.en.dump(ctx, opt, ch)
if err != nil {
logger.Errorf("dump meta err: %v", err)
ctx.Cancel()
} else {
close(ch)
Expand All @@ -3130,17 +3111,20 @@ func (m *baseMeta) DumpMetaV2(ctx Context, w io.Writer, opt *DumpOption) error {
if res == nil {
break
}
if err := bak.WriteSegment(w, &BakSegment{Val: res.msg}); err != nil {
logger.Errorf("write %s err: %v", res.seg, err)
seg := &bakSegment{val: res.msg}
if err := bak.writeSegment(w, seg); err != nil {
logger.Errorf("write %d err: %v", seg.typ, err)
ctx.Cancel()
wg.Wait()
return err
}
res.seg.release(res.msg)
if res.release != nil {
res.release(res.msg)
}
}

wg.Wait()
return bak.WriteFooter(w)
return bak.writeFooter(w)
}

func (m *baseMeta) LoadMetaV2(ctx Context, r io.Reader, opt *LoadOption) error {
Expand All @@ -3152,8 +3136,8 @@ func (m *baseMeta) LoadMetaV2(ctx Context, r io.Reader, opt *LoadOption) error {
}

type task struct {
typ int
msg proto.Message
seg iLoadedSeg
}

var wg sync.WaitGroup
Expand All @@ -3171,24 +3155,25 @@ func (m *baseMeta) LoadMetaV2(ctx Context, r io.Reader, opt *LoadOption) error {
if task == nil {
break
}
if err := task.seg.load(ctx, task.msg); err != nil {
logger.Errorf("failed to insert %s: %s", task.seg, err)
err := m.en.load(ctx, task.typ, opt, task.msg)
if err != nil {
logger.Errorf("failed to insert %d: %s", task.typ, err)
ctx.Cancel()
return
}
}
}

for i := 0; i < opt.CoNum; i++ {
for i := 0; i < opt.Threads; i++ {
wg.Add(1)
go workerFunc(ctx, taskCh)
}

bak := &BakFormat{}
bak := &bakFormat{}
for {
seg, err := bak.ReadSegment(r)
seg, err := bak.readSegment(r)
if err != nil {
if errors.Is(err, ErrBakEOF) {
if errors.Is(err, errBakEOF) {
close(taskCh)
break
}
Expand All @@ -3197,12 +3182,11 @@ func (m *baseMeta) LoadMetaV2(ctx Context, r io.Reader, opt *LoadOption) error {
return err
}

ls := m.en.buildLoadedSeg(int(seg.Typ), opt)
select {
case <-ctx.Done():
wg.Wait()
return ctx.Err()
case taskCh <- &task{seg.Val, ls}:
case taskCh <- &task{int(seg.typ), seg.val}:
}
}
wg.Wait()
Expand Down
16 changes: 9 additions & 7 deletions pkg/meta/load_dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package meta

import (
"bytes"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -337,7 +338,7 @@ func TestLoadDump(t *testing.T) { //skip mutate

func testDumpV2(t *testing.T, m Meta, result string, opt *DumpOption) {
if opt == nil {
opt = &DumpOption{CoNum: 10, KeepSecret: true}
opt = &DumpOption{Threads: 10, KeepSecret: true}
}
fp, err := os.OpenFile(result, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
Expand All @@ -347,7 +348,7 @@ func testDumpV2(t *testing.T, m Meta, result string, opt *DumpOption) {
if _, err = m.Load(true); err != nil {
t.Fatalf("load setting: %s", err)
}
if err = m.DumpMetaV2(Background, fp, opt); err != nil {
if err = m.DumpMetaV2(WrapContext(context.Background()), fp, opt); err != nil {
t.Fatalf("dump meta: %s", err)
}
fp.Sync()
Expand All @@ -363,7 +364,7 @@ func testLoadV2(t *testing.T, uri, fname string) Meta {
t.Fatalf("open file: %s", fname)
}
defer fp.Close()
if err = m.LoadMetaV2(Background, fp, &LoadOption{CoNum: 10}); err != nil {
if err = m.LoadMetaV2(WrapContext(context.Background()), fp, &LoadOption{Threads: 10}); err != nil {
t.Fatalf("load meta: %s", err)
}
if _, err := m.Load(true); err != nil {
Expand Down Expand Up @@ -400,7 +401,8 @@ func TestLoadDumpV2(t *testing.T) {
logger.SetLevel(logrus.DebugLevel)

engines := map[string][]string{
"mysql": {"mysql://root:@/dev", "mysql://root:@/dev2"},
"sqlite3": {"sqlite3://dev.db", "sqlite3://dev2.db"},
// "mysql": {"mysql://root:@/dev", "mysql://root:@/dev2"},
// "redis": {"redis://127.0.0.1:6379/2", "redis://127.0.0.1:6379/3"},
// "tikv": {"tikv://127.0.0.1:2379/jfs-load-dump-1", "tikv://127.0.0.1:2379/jfs-load-dump-2"},
}
Expand Down Expand Up @@ -454,13 +456,13 @@ func TestLoadDump_MemKV(t *testing.T) {

func testSecretAndTrash(t *testing.T, addr, addr2 string) {
m := testLoad(t, addr, sampleFile)
testDumpV2(t, m, "sqlite-secret.dump", &DumpOption{CoNum: 10, KeepSecret: true})
testDumpV2(t, m, "sqlite-secret.dump", &DumpOption{Threads: 10, KeepSecret: true})
m2 := testLoadV2(t, addr2, "sqlite-secret.dump")
if m2.GetFormat().EncryptKey != m.GetFormat().EncryptKey {
t.Fatalf("encrypt key not valid: %s", m2.GetFormat().EncryptKey)
}

testDumpV2(t, m, "sqlite-non-secret.dump", &DumpOption{CoNum: 10, KeepSecret: false})
testDumpV2(t, m, "sqlite-non-secret.dump", &DumpOption{Threads: 10, KeepSecret: false})
m2.Reset()
m2 = testLoadV2(t, addr2, "sqlite-non-secret.dump")
if m2.GetFormat().EncryptKey != "removed" {
Expand All @@ -481,7 +483,7 @@ func testSecretAndTrash(t *testing.T, addr, addr2 string) {
return false, nil
})
if cnt != len(trashs) {
t.Fatalf("trash count: %d", cnt)
t.Fatalf("trash count: %d != %d", cnt, len(trashs))
}
}

Expand Down
Loading

0 comments on commit 08e52bb

Please sign in to comment.