Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: memiavl background snapshot rewriting panic when shutdown #1292

Merged
merged 5 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## UNRELEASED

- [#1292](https://github.com/crypto-org-chain/cronos/pull/1292) memiavl cancel background snapshot rewriting when graceful shutdown.

*January 5, 2024*

## v1.1.0-rc2
Expand Down
39 changes: 34 additions & 5 deletions memiavl/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -50,6 +51,9 @@

// result channel of snapshot rewrite goroutine
snapshotRewriteChan chan snapshotResult
// context cancel function to cancel the snapshot rewrite goroutine
snapshotRewriteCancel context.CancelFunc

// the number of old snapshots to keep (excluding the latest one)
snapshotKeepRecent uint32
// block interval to take a new snapshot
Expand Down Expand Up @@ -414,6 +418,7 @@
select {
case result := <-db.snapshotRewriteChan:
db.snapshotRewriteChan = nil
db.snapshotRewriteCancel = nil

if result.mtree == nil {
// background snapshot rewrite failed
Expand Down Expand Up @@ -629,6 +634,11 @@

// RewriteSnapshot writes the current version of memiavl into a snapshot, and update the `current` symlink.
func (db *DB) RewriteSnapshot() error {
return db.RewriteSnapshotWithContext(context.Background())
}

// RewriteSnapshotWithContext writes the current version of memiavl into a snapshot, and update the `current` symlink.
func (db *DB) RewriteSnapshotWithContext(ctx context.Context) error {
db.mtx.Lock()
defer db.mtx.Unlock()

Expand All @@ -639,7 +649,7 @@
snapshotDir := snapshotName(db.lastCommitInfo.Version)
tmpDir := snapshotDir + TmpSuffix
path := filepath.Join(db.dir, tmpDir)
if err := db.MultiTree.WriteSnapshot(path, db.snapshotWriterPool); err != nil {
if err := db.MultiTree.WriteSnapshotWithContext(ctx, path, db.snapshotWriterPool); err != nil {
return errors.Join(err, os.RemoveAll(path))
}
if err := os.Rename(path, filepath.Join(db.dir, snapshotDir)); err != nil {
Expand Down Expand Up @@ -707,16 +717,19 @@
return errors.New("there's another ongoing snapshot rewriting process")
}

ctx, cancel := context.WithCancel(context.Background())

ch := make(chan snapshotResult)
db.snapshotRewriteChan = ch
db.snapshotRewriteCancel = cancel

cloned := db.copy(0)
wal := db.wal
go func() {
defer close(ch)

cloned.logger.Info("start rewriting snapshot", "version", cloned.Version())
if err := cloned.RewriteSnapshot(); err != nil {
if err := cloned.RewriteSnapshotWithContext(ctx); err != nil {
ch <- snapshotResult{err: err}
return
}
Expand Down Expand Up @@ -745,9 +758,20 @@
db.mtx.Lock()
defer db.mtx.Unlock()

errs := []error{
db.waitAsyncCommit(), db.MultiTree.Close(), db.wal.Close(),
errs := []error{db.waitAsyncCommit()}

if db.snapshotRewriteChan != nil {
db.snapshotRewriteCancel()
<-db.snapshotRewriteChan
db.snapshotRewriteChan = nil
db.snapshotRewriteCancel = nil
}

errs = append(errs,
db.MultiTree.Close(),
db.wal.Close(),
)

db.wal = nil

if db.fileLock != nil {
Expand Down Expand Up @@ -814,10 +838,15 @@

// WriteSnapshot wraps MultiTree.WriteSnapshot to add a lock.
func (db *DB) WriteSnapshot(dir string) error {
return db.WriteSnapshotWithContext(context.Background(), dir)

Check warning on line 841 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L841

Added line #L841 was not covered by tests
}

// WriteSnapshotWithContext wraps MultiTree.WriteSnapshotWithContext to add a lock.
func (db *DB) WriteSnapshotWithContext(ctx context.Context, dir string) error {

Check warning on line 845 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L845

Added line #L845 was not covered by tests
db.mtx.Lock()
defer db.mtx.Unlock()

return db.MultiTree.WriteSnapshot(dir, db.snapshotWriterPool)
return db.MultiTree.WriteSnapshotWithContext(ctx, dir, db.snapshotWriterPool)

Check warning on line 849 in memiavl/db.go

View check run for this annotation

Codecov / codecov/patch

memiavl/db.go#L849

Added line #L849 was not covered by tests
}

func snapshotName(version int64) string {
Expand Down
3 changes: 2 additions & 1 deletion memiavl/import.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package memiavl

import (
"context"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -133,7 +134,7 @@
return errors.New("version overflows uint32")
}

return writeSnapshot(dir, uint32(version), func(w *snapshotWriter) (uint32, error) {
return writeSnapshot(context.Background(), dir, uint32(version), func(w *snapshotWriter) (uint32, error) {
mmsqe marked this conversation as resolved.
Show resolved Hide resolved

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion Error

Potential integer overflow by integer type conversion
i := &importer{
snapshotWriter: *w,
}
Expand Down
6 changes: 5 additions & 1 deletion memiavl/multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ func (t *MultiTree) CatchupWAL(wal *wal.Log, endVersion int64) error {
}

func (t *MultiTree) WriteSnapshot(dir string, wp *pond.WorkerPool) error {
return t.WriteSnapshotWithContext(context.Background(), dir, wp)
}

func (t *MultiTree) WriteSnapshotWithContext(ctx context.Context, dir string, wp *pond.WorkerPool) error {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}
Expand All @@ -368,7 +372,7 @@ func (t *MultiTree) WriteSnapshot(dir string, wp *pond.WorkerPool) error {
for _, entry := range t.trees {
tree, name := entry.Tree, entry.Name
group.Submit(func() error {
return tree.WriteSnapshot(filepath.Join(dir, name))
return tree.WriteSnapshotWithContext(ctx, filepath.Join(dir, name))
})
}

Expand Down
29 changes: 25 additions & 4 deletions memiavl/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -24,6 +25,9 @@
FileNameLeaves = "leaves"
FileNameKVs = "kvs"
FileNameMetadata = "metadata"

// check for cancel every 1000 leaves
CancelCheckInterval = 1000
yihuang marked this conversation as resolved.
Show resolved Hide resolved
)

// Snapshot manage the lifecycle of mmap-ed files for the snapshot,
Expand Down Expand Up @@ -347,9 +351,13 @@
}
}

// WriteSnapshot save the IAVL tree to a new snapshot directory.
func (t *Tree) WriteSnapshot(snapshotDir string) error {
return writeSnapshot(snapshotDir, t.version, func(w *snapshotWriter) (uint32, error) {
return t.WriteSnapshotWithContext(context.Background(), snapshotDir)
}

// WriteSnapshotWithContext save the IAVL tree to a new snapshot directory.
func (t *Tree) WriteSnapshotWithContext(ctx context.Context, snapshotDir string) error {
return writeSnapshot(ctx, snapshotDir, t.version, func(w *snapshotWriter) (uint32, error) {
if t.root == nil {
return 0, nil
} else {
Expand All @@ -362,6 +370,7 @@
}

func writeSnapshot(
ctx context.Context,
dir string, version uint32,
doWrite func(*snapshotWriter) (uint32, error),
) (returnErr error) {
Expand Down Expand Up @@ -407,7 +416,7 @@
leavesWriter := bufio.NewWriter(fpLeaves)
kvsWriter := bufio.NewWriter(fpKVs)

w := newSnapshotWriter(nodesWriter, leavesWriter, kvsWriter)
w := newSnapshotWriter(ctx, nodesWriter, leavesWriter, kvsWriter)
leaves, err := doWrite(w)
if err != nil {
return err
Expand Down Expand Up @@ -460,6 +469,9 @@
}

type snapshotWriter struct {
// context for cancel the writing process
ctx context.Context

nodesWriter, leavesWriter, kvWriter io.Writer

// count how many nodes have been written
Expand All @@ -469,8 +481,9 @@
kvsOffset uint64
}

func newSnapshotWriter(nodesWriter, leavesWriter, kvsWriter io.Writer) *snapshotWriter {
func newSnapshotWriter(ctx context.Context, nodesWriter, leavesWriter, kvsWriter io.Writer) *snapshotWriter {
return &snapshotWriter{
ctx: ctx,
nodesWriter: nodesWriter,
leavesWriter: leavesWriter,
kvWriter: kvsWriter,
Expand Down Expand Up @@ -502,6 +515,14 @@
}

func (w *snapshotWriter) writeLeaf(version uint32, key, value, hash []byte) error {
if w.leafCounter%CancelCheckInterval == 0 {
select {
case <-w.ctx.Done():
return w.ctx.Err()

Check warning on line 521 in memiavl/snapshot.go

View check run for this annotation

Codecov / codecov/patch

memiavl/snapshot.go#L520-L521

Added lines #L520 - L521 were not covered by tests
default:
}
}

var buf [SizeLeafWithoutHash]byte
binary.LittleEndian.PutUint32(buf[OffsetLeafVersion:], version)
binary.LittleEndian.PutUint32(buf[OffsetLeafKeyLen:], uint32(len(key)))
Expand Down
Loading