diff --git a/.gitignore b/.gitignore index c7bd2b7a..c2a8cfa7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.test *.swp /bin/ +cmd/bolt/bolt diff --git a/Makefile b/Makefile index e035e63a..9dedcc36 100644 --- a/Makefile +++ b/Makefile @@ -7,12 +7,24 @@ default: build race: @go test -v -race -test.run="TestSimulate_(100op|1000op)" +fmt: + !(gofmt -l -s -d $(shell find . -name \*.go) | grep '[a-z]') + +# go get honnef.co/go/tools/simple +gosimple: + gosimple ./... + +# go get honnef.co/go/tools/unused +unused: + unused ./... + # go get github.com/kisielk/errcheck errcheck: - @errcheck -ignorepkg=bytes -ignore=os:Remove github.com/boltdb/bolt + @errcheck -ignorepkg=bytes -ignore=os:Remove github.com/NebulousLabs/bolt -test: - @go test -v -cover . - @go test -v ./cmd/bolt +test: + go test -timeout 20m -v -coverprofile cover.out -covermode atomic + # Note: gets "program not an importable package" in out of path builds + go test -v ./cmd/bolt -.PHONY: fmt test +.PHONY: race fmt errcheck test gosimple unused diff --git a/README.md b/README.md index 1c1ca1c9..4819fd99 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,16 @@ -Bolt [![Coverage Status](https://coveralls.io/repos/boltdb/bolt/badge.svg?branch=master)](https://coveralls.io/r/boltdb/bolt?branch=master) [![GoDoc](https://godoc.org/github.com/boltdb/bolt?status.svg)](https://godoc.org/github.com/boltdb/bolt) ![Version](https://img.shields.io/badge/version-1.2.1-green.svg) +bbolt ==== +[![Go Report Card](https://goreportcard.com/badge/github.com/coreos/bbolt?style=flat-square)](https://goreportcard.com/report/github.com/coreos/bbolt) +[![Coverage](https://codecov.io/gh/coreos/bbolt/branch/master/graph/badge.svg)](https://codecov.io/gh/coreos/bbolt) +[![Godoc](http://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/github.com/coreos/bbolt) + +bbolt is a fork of [Ben Johnson's][gh_ben] [Bolt][bolt] key/value +store. The purpose of this fork is to provide the Go community with an active +maintenance and development target for Bolt; the goal is improved reliability +and stability. bbolt includes bug fixes, performance enhancements, and features +not found in Bolt while preserving backwards compatibility with the Bolt API. + Bolt is a pure Go key/value store inspired by [Howard Chu's][hyc_symas] [LMDB project][lmdb]. The goal of the project is to provide a simple, fast, and reliable database for projects that don't require a full database @@ -10,6 +20,8 @@ Since Bolt is meant to be used as such a low-level piece of functionality, simplicity is key. The API will be small and only focus on getting values and setting values. That's it. +[gh_ben]: https://github.com/benbjohnson +[bolt]: https://github.com/boltdb/bolt [hyc_symas]: https://twitter.com/hyc_symas [lmdb]: http://symas.com/mdb/ @@ -59,7 +71,7 @@ Shopify and Heroku use Bolt-backed services every day. To start using Bolt, install Go and run `go get`: ```sh -$ go get github.com/boltdb/bolt/... +$ go get github.com/coreos/bbolt/... ``` This will retrieve the library and install the `bolt` command line utility into @@ -522,7 +534,7 @@ this from a read-only transaction, it will perform a hot backup and not block your other database reads and writes. By default, it will use a regular file handle which will utilize the operating -system's page cache. See the [`Tx`](https://godoc.org/github.com/boltdb/bolt#Tx) +system's page cache. See the [`Tx`](https://godoc.org/github.com/coreos/bbolt#Tx) documentation for information about optimizing for larger-than-RAM datasets. One common use case is to backup over HTTP so you can use tools like `cURL` to @@ -811,7 +823,7 @@ Here are a few things to note when evaluating and using Bolt: ## Reading the Source -Bolt is a relatively small code base (<3KLOC) for an embedded, serializable, +Bolt is a relatively small code base (<5KLOC) for an embedded, serializable, transactional key/value database so it can be a good starting point for people interested in how databases work. @@ -907,6 +919,7 @@ Below is a list of public, open source projects that use Bolt: * [torrent](https://github.com/anacrolix/torrent) - Full-featured BitTorrent client package and utilities in Go. BoltDB is a storage backend in development. * [gopherpit](https://github.com/gopherpit/gopherpit) - A web service to manage Go remote import paths with custom domains * [bolter](https://github.com/hasit/bolter) - Command-line app for viewing BoltDB file in your terminal. +* [boltcli](https://github.com/spacewander/boltcli) - the redis-cli for boltdb with Lua script support. * [btcwallet](https://github.com/btcsuite/btcwallet) - A bitcoin wallet. * [dcrwallet](https://github.com/decred/dcrwallet) - A wallet for the Decred cryptocurrency. * [Ironsmith](https://github.com/timshannon/ironsmith) - A simple, script-driven continuous integration (build - > test -> release) tool, with no external dependencies diff --git a/allocate_test.go b/allocate_test.go new file mode 100644 index 00000000..8566b4df --- /dev/null +++ b/allocate_test.go @@ -0,0 +1,30 @@ +package bolt + +import ( + "testing" +) + +func TestTx_allocatePageStats(t *testing.T) { + f := newFreelist() + f.ids = []pgid{2, 3} + + tx := &Tx{ + db: &DB{ + freelist: f, + pageSize: defaultPageSize, + }, + meta: &meta{}, + pages: make(map[pgid]*page), + } + + prePageCnt := tx.Stats().PageCount + allocateCnt := f.free_count() + + if _, err := tx.allocate(allocateCnt); err != nil { + t.Fatal(err) + } + + if tx.Stats().PageCount != prePageCnt+allocateCnt { + t.Errorf("Allocated %d but got %d page in stats", allocateCnt, tx.Stats().PageCount) + } +} diff --git a/bolt_mips64x.go b/bolt_mips64x.go new file mode 100644 index 00000000..134b578b --- /dev/null +++ b/bolt_mips64x.go @@ -0,0 +1,12 @@ +// +build mips64 mips64le + +package bolt + +// maxMapSize represents the largest mmap size supported by Bolt. +const maxMapSize = 0x8000000000 // 512GB + +// maxAllocSize is the size used when creating array pointers. +const maxAllocSize = 0x7FFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/bolt_mipsx.go b/bolt_mipsx.go new file mode 100644 index 00000000..d5ecb059 --- /dev/null +++ b/bolt_mipsx.go @@ -0,0 +1,12 @@ +// +build mips mipsle + +package bolt + +// maxMapSize represents the largest mmap size supported by Bolt. +const maxMapSize = 0x40000000 // 1GB + +// maxAllocSize is the size used when creating array pointers. +const maxAllocSize = 0xFFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/bolt_ppc.go b/bolt_ppc.go index 645ddc3e..55cb8a72 100644 --- a/bolt_ppc.go +++ b/bolt_ppc.go @@ -7,3 +7,6 @@ const maxMapSize = 0x7FFFFFFF // 2GB // maxAllocSize is the size used when creating array pointers. const maxAllocSize = 0xFFFFFFF + +// Are unaligned load/stores broken on this arch? +var brokenUnaligned = false diff --git a/bolt_unix.go b/bolt_unix.go index cad62dda..06592a08 100644 --- a/bolt_unix.go +++ b/bolt_unix.go @@ -13,29 +13,32 @@ import ( // flock acquires an advisory lock on a file descriptor. func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error { var t time.Time + if timeout != 0 { + t = time.Now() + } + fd := db.file.Fd() + flag := syscall.LOCK_NB + if exclusive { + flag |= syscall.LOCK_EX + } else { + flag |= syscall.LOCK_SH + } for { - // If we're beyond our timeout then return an error. - // This can only occur after we've attempted a flock once. - if t.IsZero() { - t = time.Now() - } else if timeout > 0 && time.Since(t) > timeout { - return ErrTimeout - } - flag := syscall.LOCK_SH - if exclusive { - flag = syscall.LOCK_EX - } - - // Otherwise attempt to obtain an exclusive lock. - err := syscall.Flock(int(db.file.Fd()), flag|syscall.LOCK_NB) + // Attempt to obtain an exclusive lock. + err := syscall.Flock(int(fd), flag) if err == nil { return nil } else if err != syscall.EWOULDBLOCK { return err } + // If we timed out then return an error. + if timeout != 0 && time.Since(t) > timeout-flockRetryTimeout { + return ErrTimeout + } + // Wait for a bit and try again. - time.Sleep(50 * time.Millisecond) + time.Sleep(flockRetryTimeout) } } diff --git a/bolt_unix_solaris.go b/bolt_unix_solaris.go index 307bf2b3..fd8335ec 100644 --- a/bolt_unix_solaris.go +++ b/bolt_unix_solaris.go @@ -13,34 +13,33 @@ import ( // flock acquires an advisory lock on a file descriptor. func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error { var t time.Time + if timeout != 0 { + t = time.Now() + } + fd := db.file.Fd() + var lockType int16 + if exclusive { + lockType = syscall.F_WRLCK + } else { + lockType = syscall.F_RDLCK + } for { - // If we're beyond our timeout then return an error. - // This can only occur after we've attempted a flock once. - if t.IsZero() { - t = time.Now() - } else if timeout > 0 && time.Since(t) > timeout { - return ErrTimeout - } - var lock syscall.Flock_t - lock.Start = 0 - lock.Len = 0 - lock.Pid = 0 - lock.Whence = 0 - lock.Pid = 0 - if exclusive { - lock.Type = syscall.F_WRLCK - } else { - lock.Type = syscall.F_RDLCK - } - err := syscall.FcntlFlock(db.file.Fd(), syscall.F_SETLK, &lock) + // Attempt to obtain an exclusive lock. + lock := syscall.Flock_t{Type: lockType} + err := syscall.FcntlFlock(fd, syscall.F_SETLK, &lock) if err == nil { return nil } else if err != syscall.EAGAIN { return err } + // If we timed out then return an error. + if timeout != 0 && time.Since(t) > timeout-flockRetryTimeout { + return ErrTimeout + } + // Wait for a bit and try again. - time.Sleep(50 * time.Millisecond) + time.Sleep(flockRetryTimeout) } } diff --git a/bolt_windows.go b/bolt_windows.go index b00fb072..ca6f9a11 100644 --- a/bolt_windows.go +++ b/bolt_windows.go @@ -59,29 +59,30 @@ func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) erro db.lockfile = f var t time.Time + if timeout != 0 { + t = time.Now() + } + fd := f.Fd() + var flag uint32 = flagLockFailImmediately + if exclusive { + flag |= flagLockExclusive + } for { - // If we're beyond our timeout then return an error. - // This can only occur after we've attempted a flock once. - if t.IsZero() { - t = time.Now() - } else if timeout > 0 && time.Since(t) > timeout { - return ErrTimeout - } - - var flag uint32 = flagLockFailImmediately - if exclusive { - flag |= flagLockExclusive - } - - err := lockFileEx(syscall.Handle(db.lockfile.Fd()), flag, 0, 1, 0, &syscall.Overlapped{}) + // Attempt to obtain an exclusive lock. + err := lockFileEx(syscall.Handle(fd), flag, 0, 1, 0, &syscall.Overlapped{}) if err == nil { return nil } else if err != errLockViolation { return err } + // If we timed oumercit then return an error. + if timeout != 0 && time.Since(t) > timeout-flockRetryTimeout { + return ErrTimeout + } + // Wait for a bit and try again. - time.Sleep(50 * time.Millisecond) + time.Sleep(flockRetryTimeout) } } diff --git a/bucket.go b/bucket.go index 0c5bf274..44db88b8 100644 --- a/bucket.go +++ b/bucket.go @@ -14,13 +14,6 @@ const ( MaxValueSize = (1 << 31) - 2 ) -const ( - maxUint = ^uint(0) - minUint = 0 - maxInt = int(^uint(0) >> 1) - minInt = -maxInt - 1 -) - const bucketHeaderSize = int(unsafe.Sizeof(bucket{})) const ( @@ -323,7 +316,12 @@ func (b *Bucket) Delete(key []byte) error { // Move cursor to correct position. c := b.Cursor() - _, _, flags := c.seek(key) + k, _, flags := c.seek(key) + + // Return nil if the key doesn't exist. + if !bytes.Equal(key, k) { + return nil + } // Return an error if there is already existing bucket value. if (flags & bucketLeafFlag) != 0 { diff --git a/bucket_test.go b/bucket_test.go index 29044489..7ae670b7 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -113,6 +113,7 @@ func TestBucket_Get_Capacity(t *testing.T) { // Ensure slice can be appended to without a segfault. k = append(k, []byte("123")...) v = append(v, []byte("123")...) + _, _ = k, v // to pass ineffassign return nil }); err != nil { @@ -423,6 +424,55 @@ func TestBucket_Delete_FreelistOverflow(t *testing.T) { }); err != nil { t.Fatal(err) } + + // Check more than an overflow's worth of pages are freed. + stats := db.Stats() + freePages := stats.FreePageN + stats.PendingPageN + if freePages <= 0xFFFF { + t.Fatalf("expected more than 0xFFFF free pages, got %v", freePages) + } + + // Free page count should be preserved on reopen. + if err := db.DB.Close(); err != nil { + t.Fatal(err) + } + db.MustReopen() + if reopenFreePages := db.Stats().FreePageN; freePages != reopenFreePages { + t.Fatalf("expected %d free pages, got %+v", freePages, db.Stats()) + } +} + +// Ensure that deleting of non-existing key is a no-op. +func TestBucket_Delete_NonExisting(t *testing.T) { + db := MustOpenDB() + defer db.MustClose() + + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + if err != nil { + t.Fatal(err) + } + + if _, err = b.CreateBucket([]byte("nested")); err != nil { + t.Fatal(err) + } + return nil + }); err != nil { + t.Fatal(err) + } + + if err := db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("widgets")) + if err := b.Delete([]byte("foo")); err != nil { + t.Fatal(err) + } + if b.Bucket([]byte("nested")) == nil { + t.Fatal("nested bucket has been deleted") + } + return nil + }); err != nil { + t.Fatal(err) + } } // Ensure that accessing and updating nested buckets is ok across transactions. @@ -1198,7 +1248,7 @@ func TestBucket_Stats(t *testing.T) { } // Only check allocations for 4KB pages. - if os.Getpagesize() == 4096 { + if db.Info().PageSize == 4096 { if stats.BranchAlloc != 4096 { t.Fatalf("unexpected BranchAlloc: %d", stats.BranchAlloc) } else if stats.LeafAlloc != 36864 { @@ -1331,7 +1381,7 @@ func TestBucket_Stats_Small(t *testing.T) { t.Fatalf("unexpected LeafInuse: %d", stats.LeafInuse) } - if os.Getpagesize() == 4096 { + if db.Info().PageSize == 4096 { if stats.BranchAlloc != 0 { t.Fatalf("unexpected BranchAlloc: %d", stats.BranchAlloc) } else if stats.LeafAlloc != 0 { @@ -1390,7 +1440,7 @@ func TestBucket_Stats_EmptyBucket(t *testing.T) { t.Fatalf("unexpected LeafInuse: %d", stats.LeafInuse) } - if os.Getpagesize() == 4096 { + if db.Info().PageSize == 4096 { if stats.BranchAlloc != 0 { t.Fatalf("unexpected BranchAlloc: %d", stats.BranchAlloc) } else if stats.LeafAlloc != 0 { @@ -1492,7 +1542,7 @@ func TestBucket_Stats_Nested(t *testing.T) { t.Fatalf("unexpected LeafInuse: %d", stats.LeafInuse) } - if os.Getpagesize() == 4096 { + if db.Info().PageSize == 4096 { if stats.BranchAlloc != 0 { t.Fatalf("unexpected BranchAlloc: %d", stats.BranchAlloc) } else if stats.LeafAlloc != 8192 { @@ -1565,7 +1615,7 @@ func TestBucket_Stats_Large(t *testing.T) { t.Fatalf("unexpected LeafInuse: %d", stats.LeafInuse) } - if os.Getpagesize() == 4096 { + if db.Info().PageSize == 4096 { if stats.BranchAlloc != 53248 { t.Fatalf("unexpected BranchAlloc: %d", stats.BranchAlloc) } else if stats.LeafAlloc != 4898816 { @@ -1640,7 +1690,7 @@ func TestBucket_Put_Single(t *testing.T) { index++ return true - }, nil); err != nil { + }, qconfig()); err != nil { t.Error(err) } } diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go index 5c085e2f..ce5a5024 100644 --- a/cmd/bolt/main.go +++ b/cmd/bolt/main.go @@ -49,11 +49,17 @@ var ( // ErrPageIDRequired is returned when a required page id is not specified. ErrPageIDRequired = errors.New("page id required") - // ErrPageNotFound is returned when specifying a page above the high water mark. - ErrPageNotFound = errors.New("page not found") + // ErrBucketRequired is returned when a bucket is not specified. + ErrBucketRequired = errors.New("bucket required") - // ErrPageFreed is returned when reading a page that has already been freed. - ErrPageFreed = errors.New("page freed") + // ErrBucketNotFound is returned when a bucket is not found. + ErrBucketNotFound = errors.New("bucket not found") + + // ErrKeyRequired is returned when a key is not specified. + ErrKeyRequired = errors.New("key required") + + // ErrKeyNotFound is returned when a key is not found. + ErrKeyNotFound = errors.New("key not found") ) // PageHeaderSize represents the size of the bolt.page header. @@ -100,14 +106,22 @@ func (m *Main) Run(args ...string) error { return ErrUsage case "bench": return newBenchCommand(m).Run(args[1:]...) + case "buckets": + return newBucketsCommand(m).Run(args[1:]...) case "check": return newCheckCommand(m).Run(args[1:]...) case "compact": return newCompactCommand(m).Run(args[1:]...) case "dump": return newDumpCommand(m).Run(args[1:]...) + case "page-item": + return newPageItemCommand(m).Run(args[1:]...) + case "get": + return newGetCommand(m).Run(args[1:]...) case "info": return newInfoCommand(m).Run(args[1:]...) + case "keys": + return newKeysCommand(m).Run(args[1:]...) case "page": return newPageCommand(m).Run(args[1:]...) case "pages": @@ -131,11 +145,17 @@ Usage: The commands are: bench run synthetic benchmark against bolt + buckets print a list of buckets check verifies integrity of bolt database compact copies a bolt database, compacting it in the process + dump print a hexadecimal dump of a single page + get print the value of a key in a bucket info print basic info + keys print a list of keys in a bucket help print this screen + page print one or more pages in human readable format pages print list of pages with their types + page-item print the key and value of a page item. stats iterate over all pages and generate usage stats Use "bolt [command] -h" for more information about a command. @@ -188,17 +208,9 @@ func (cmd *CheckCommand) Run(args ...string) error { // Perform consistency check. return db.View(func(tx *bolt.Tx) error { var count int - ch := tx.Check() - loop: - for { - select { - case err, ok := <-ch: - if !ok { - break loop - } - fmt.Fprintln(cmd.Stdout, err) - count++ - } + for err := range tx.Check() { + fmt.Fprintln(cmd.Stdout, err) + count++ } // Print summary of errors. @@ -407,9 +419,173 @@ func (cmd *DumpCommand) PrintPage(w io.Writer, r io.ReaderAt, pageID int, pageSi // Usage returns the help message. func (cmd *DumpCommand) Usage() string { return strings.TrimLeft(` -usage: bolt dump -page PAGEID PATH +usage: bolt dump PATH pageid [pageid...] + +Dump prints a hexadecimal dump of one or more pages. +`, "\n") +} + +// PageItemCommand represents the "page-item" command execution. +type PageItemCommand struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +// newPageItemCommand returns a PageItemCommand. +func newPageItemCommand(m *Main) *PageItemCommand { + return &PageItemCommand{ + Stdin: m.Stdin, + Stdout: m.Stdout, + Stderr: m.Stderr, + } +} + +type pageItemOptions struct { + help bool + keyOnly bool + valueOnly bool + format string +} + +// Run executes the command. +func (cmd *PageItemCommand) Run(args ...string) error { + // Parse flags. + options := &pageItemOptions{} + fs := flag.NewFlagSet("", flag.ContinueOnError) + fs.BoolVar(&options.keyOnly, "key-only", false, "Print only the key") + fs.BoolVar(&options.valueOnly, "value-only", false, "Print only the value") + fs.StringVar(&options.format, "format", "ascii-encoded", "Output format. One of: ascii-encoded|hex|bytes") + fs.BoolVar(&options.help, "h", false, "") + if err := fs.Parse(args); err != nil { + return err + } else if options.help { + fmt.Fprintln(cmd.Stderr, cmd.Usage()) + return ErrUsage + } + + if options.keyOnly && options.valueOnly { + return fmt.Errorf("The --key-only or --value-only flag may be set, but not both.") + } + + // Require database path and page id. + path := fs.Arg(0) + if path == "" { + return ErrPathRequired + } else if _, err := os.Stat(path); os.IsNotExist(err) { + return ErrFileNotFound + } + + // Read page id. + pageID, err := strconv.Atoi(fs.Arg(1)) + if err != nil { + return err + } + + // Read item id. + itemID, err := strconv.Atoi(fs.Arg(2)) + if err != nil { + return err + } + + // Open database file handler. + f, err := os.Open(path) + if err != nil { + return err + } + defer func() { _ = f.Close() }() + + // Retrieve page info and page size. + _, buf, err := ReadPage(path, pageID) + if err != nil { + return err + } + + if !options.valueOnly { + err := cmd.PrintLeafItemKey(cmd.Stdout, buf, uint16(itemID), options.format) + if err != nil { + return err + } + } + if !options.keyOnly { + err := cmd.PrintLeafItemValue(cmd.Stdout, buf, uint16(itemID), options.format) + if err != nil { + return err + } + } + return nil +} + +// leafPageElement retrieves a leaf page element. +func (cmd *PageItemCommand) leafPageElement(pageBytes []byte, index uint16) (*leafPageElement, error) { + p := (*page)(unsafe.Pointer(&pageBytes[0])) + if index >= p.count { + return nil, fmt.Errorf("leafPageElement: expected item index less than %d, but got %d.", p.count, index) + } + if p.Type() != "leaf" { + return nil, fmt.Errorf("leafPageElement: expected page type of 'leaf', but got '%s'", p.Type()) + } + return p.leafPageElement(index), nil +} + +// writeBytes writes the byte to the writer. Supported formats: ascii-encoded, hex, bytes. +func (cmd *PageItemCommand) writeBytes(w io.Writer, b []byte, format string) error { + switch format { + case "ascii-encoded": + _, err := fmt.Fprintf(w, "%q", b) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "\n") + return err + case "hex": + _, err := fmt.Fprintf(w, "%x", b) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "\n") + return err + case "bytes": + _, err := w.Write(b) + return err + default: + return fmt.Errorf("writeBytes: unsupported format: %s", format) + } +} + +// PrintLeafItemKey writes the bytes of a leaf element's key. +func (cmd *PageItemCommand) PrintLeafItemKey(w io.Writer, pageBytes []byte, index uint16, format string) error { + e, err := cmd.leafPageElement(pageBytes, index) + if err != nil { + return err + } + return cmd.writeBytes(w, e.key(), format) +} + +// PrintLeafItemKey writes the bytes of a leaf element's value. +func (cmd *PageItemCommand) PrintLeafItemValue(w io.Writer, pageBytes []byte, index uint16, format string) error { + e, err := cmd.leafPageElement(pageBytes, index) + if err != nil { + return err + } + return cmd.writeBytes(w, e.value(), format) +} + +// Usage returns the help message. +func (cmd *PageItemCommand) Usage() string { + return strings.TrimLeft(` +usage: bolt page-item [options] PATH pageid itemid + +Additional options include: + + --key-only + Print only the key + --value-only + Print only the value + --format + Output format. One of: ascii-encoded|hex|bytes (default=ascii-encoded) -Dump prints a hexadecimal dump of a single page. +page-item prints a page item key and value. `, "\n") } @@ -583,13 +759,22 @@ func (cmd *PageCommand) PrintBranch(w io.Writer, buf []byte) error { func (cmd *PageCommand) PrintFreelist(w io.Writer, buf []byte) error { p := (*page)(unsafe.Pointer(&buf[0])) + // Check for overflow and, if present, adjust starting index and actual element count. + idx, count := 0, int(p.count) + if p.count == 0xFFFF { + idx = 1 + count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0]) + } + // Print number of items. - fmt.Fprintf(w, "Item Count: %d\n", p.count) + fmt.Fprintf(w, "Item Count: %d\n", count) + fmt.Fprintf(w, "Overflow: %d\n", p.overflow) + fmt.Fprintf(w, "\n") // Print each page in the freelist. ids := (*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)) - for i := uint16(0); i < p.count; i++ { + for i := int(idx); i < count; i++ { fmt.Fprintf(w, "%d\n", ids[i]) } fmt.Fprintf(w, "\n") @@ -644,7 +829,7 @@ func (cmd *PageCommand) PrintPage(w io.Writer, r io.ReaderAt, pageID int, pageSi // Usage returns the help message. func (cmd *PageCommand) Usage() string { return strings.TrimLeft(` -usage: bolt page -page PATH pageid [pageid...] +usage: bolt page PATH pageid [pageid...] Page prints one or more pages in human readable format. `, "\n") @@ -879,6 +1064,212 @@ experience corruption, please submit a ticket to the Bolt project page: `, "\n") } +// BucketsCommand represents the "buckets" command execution. +type BucketsCommand struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +// NewBucketsCommand returns a BucketsCommand. +func newBucketsCommand(m *Main) *BucketsCommand { + return &BucketsCommand{ + Stdin: m.Stdin, + Stdout: m.Stdout, + Stderr: m.Stderr, + } +} + +// Run executes the command. +func (cmd *BucketsCommand) Run(args ...string) error { + // Parse flags. + fs := flag.NewFlagSet("", flag.ContinueOnError) + help := fs.Bool("h", false, "") + if err := fs.Parse(args); err != nil { + return err + } else if *help { + fmt.Fprintln(cmd.Stderr, cmd.Usage()) + return ErrUsage + } + + // Require database path. + path := fs.Arg(0) + if path == "" { + return ErrPathRequired + } else if _, err := os.Stat(path); os.IsNotExist(err) { + return ErrFileNotFound + } + + // Open database. + db, err := bolt.Open(path, 0666, nil) + if err != nil { + return err + } + defer db.Close() + + // Print buckets. + return db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, _ *bolt.Bucket) error { + fmt.Fprintln(cmd.Stdout, string(name)) + return nil + }) + }) +} + +// Usage returns the help message. +func (cmd *BucketsCommand) Usage() string { + return strings.TrimLeft(` +usage: bolt buckets PATH + +Print a list of buckets. +`, "\n") +} + +// KeysCommand represents the "keys" command execution. +type KeysCommand struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +// NewKeysCommand returns a KeysCommand. +func newKeysCommand(m *Main) *KeysCommand { + return &KeysCommand{ + Stdin: m.Stdin, + Stdout: m.Stdout, + Stderr: m.Stderr, + } +} + +// Run executes the command. +func (cmd *KeysCommand) Run(args ...string) error { + // Parse flags. + fs := flag.NewFlagSet("", flag.ContinueOnError) + help := fs.Bool("h", false, "") + if err := fs.Parse(args); err != nil { + return err + } else if *help { + fmt.Fprintln(cmd.Stderr, cmd.Usage()) + return ErrUsage + } + + // Require database path and bucket. + path, bucket := fs.Arg(0), fs.Arg(1) + if path == "" { + return ErrPathRequired + } else if _, err := os.Stat(path); os.IsNotExist(err) { + return ErrFileNotFound + } else if bucket == "" { + return ErrBucketRequired + } + + // Open database. + db, err := bolt.Open(path, 0666, nil) + if err != nil { + return err + } + defer db.Close() + + // Print keys. + return db.View(func(tx *bolt.Tx) error { + // Find bucket. + b := tx.Bucket([]byte(bucket)) + if b == nil { + return ErrBucketNotFound + } + + // Iterate over each key. + return b.ForEach(func(key, _ []byte) error { + fmt.Fprintln(cmd.Stdout, string(key)) + return nil + }) + }) +} + +// Usage returns the help message. +func (cmd *KeysCommand) Usage() string { + return strings.TrimLeft(` +usage: bolt keys PATH BUCKET + +Print a list of keys in the given bucket. +`, "\n") +} + +// GetCommand represents the "get" command execution. +type GetCommand struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +// NewGetCommand returns a GetCommand. +func newGetCommand(m *Main) *GetCommand { + return &GetCommand{ + Stdin: m.Stdin, + Stdout: m.Stdout, + Stderr: m.Stderr, + } +} + +// Run executes the command. +func (cmd *GetCommand) Run(args ...string) error { + // Parse flags. + fs := flag.NewFlagSet("", flag.ContinueOnError) + help := fs.Bool("h", false, "") + if err := fs.Parse(args); err != nil { + return err + } else if *help { + fmt.Fprintln(cmd.Stderr, cmd.Usage()) + return ErrUsage + } + + // Require database path, bucket and key. + path, bucket, key := fs.Arg(0), fs.Arg(1), fs.Arg(2) + if path == "" { + return ErrPathRequired + } else if _, err := os.Stat(path); os.IsNotExist(err) { + return ErrFileNotFound + } else if bucket == "" { + return ErrBucketRequired + } else if key == "" { + return ErrKeyRequired + } + + // Open database. + db, err := bolt.Open(path, 0666, nil) + if err != nil { + return err + } + defer db.Close() + + // Print value. + return db.View(func(tx *bolt.Tx) error { + // Find bucket. + b := tx.Bucket([]byte(bucket)) + if b == nil { + return ErrBucketNotFound + } + + // Find value for given key. + val := b.Get([]byte(key)) + if val == nil { + return ErrKeyNotFound + } + + fmt.Fprintln(cmd.Stdout, string(val)) + return nil + }) +} + +// Usage returns the help message. +func (cmd *GetCommand) Usage() string { + return strings.TrimLeft(` +usage: bolt get PATH BUCKET KEY + +Print the value of the given key in the given bucket. +`, "\n") +} + var benchBucketName = []byte("bench") // BenchCommand represents the "bench" command execution. @@ -1031,12 +1422,12 @@ func (cmd *BenchCommand) runWritesRandom(db *bolt.DB, options *BenchOptions, res func (cmd *BenchCommand) runWritesSequentialNested(db *bolt.DB, options *BenchOptions, results *BenchResults) error { var i = uint32(0) - return cmd.runWritesWithSource(db, options, results, func() uint32 { i++; return i }) + return cmd.runWritesNestedWithSource(db, options, results, func() uint32 { i++; return i }) } func (cmd *BenchCommand) runWritesRandomNested(db *bolt.DB, options *BenchOptions, results *BenchResults) error { r := rand.New(rand.NewSource(time.Now().UnixNano())) - return cmd.runWritesWithSource(db, options, results, func() uint32 { return r.Uint32() }) + return cmd.runWritesNestedWithSource(db, options, results, func() uint32 { return r.Uint32() }) } func (cmd *BenchCommand) runWritesWithSource(db *bolt.DB, options *BenchOptions, results *BenchResults, keySource func() uint32) error { @@ -1183,12 +1574,14 @@ func (cmd *BenchCommand) runReadsSequentialNested(db *bolt.DB, options *BenchOpt var count int var top = tx.Bucket(benchBucketName) if err := top.ForEach(func(name, _ []byte) error { - c := top.Bucket(name).Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - if v == nil { - return ErrInvalidValue + if b := top.Bucket(name); b != nil { + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + if v == nil { + return ErrInvalidValue + } + count++ } - count++ } return nil }); err != nil { @@ -1664,6 +2057,9 @@ func (cmd *CompactCommand) compact(dst, src *bolt.DB) error { } } + // Fill the entire page for best compaction. + b.FillPercent = 1.0 + // If there is no value then this is a bucket call. if v == nil { bkt, err := b.CreateBucket(k) diff --git a/cmd/bolt/main_test.go b/cmd/bolt/main_test.go index 0cc2bcd1..9173ff09 100644 --- a/cmd/bolt/main_test.go +++ b/cmd/bolt/main_test.go @@ -146,6 +146,106 @@ func TestStatsCommand_Run(t *testing.T) { } } +// Ensure the "buckets" command can print a list of buckets. +func TestBucketsCommand_Run(t *testing.T) { + db := MustOpen(0666, nil) + defer db.Close() + + if err := db.Update(func(tx *bolt.Tx) error { + for _, name := range []string{"foo", "bar", "baz"} { + _, err := tx.CreateBucket([]byte(name)) + if err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatal(err) + } + db.DB.Close() + + expected := "bar\nbaz\nfoo\n" + + // Run the command. + m := NewMain() + if err := m.Run("buckets", db.Path); err != nil { + t.Fatal(err) + } else if actual := m.Stdout.String(); actual != expected { + t.Fatalf("unexpected stdout:\n\n%s", actual) + } +} + +// Ensure the "keys" command can print a list of keys for a bucket. +func TestKeysCommand_Run(t *testing.T) { + db := MustOpen(0666, nil) + defer db.Close() + + if err := db.Update(func(tx *bolt.Tx) error { + for _, name := range []string{"foo", "bar"} { + b, err := tx.CreateBucket([]byte(name)) + if err != nil { + return err + } + for i := 0; i < 3; i++ { + key := fmt.Sprintf("%s-%d", name, i) + if err := b.Put([]byte(key), []byte{0}); err != nil { + return err + } + } + } + return nil + }); err != nil { + t.Fatal(err) + } + db.DB.Close() + + expected := "foo-0\nfoo-1\nfoo-2\n" + + // Run the command. + m := NewMain() + if err := m.Run("keys", db.Path, "foo"); err != nil { + t.Fatal(err) + } else if actual := m.Stdout.String(); actual != expected { + t.Fatalf("unexpected stdout:\n\n%s", actual) + } +} + +// Ensure the "get" command can print the value of a key in a bucket. +func TestGetCommand_Run(t *testing.T) { + db := MustOpen(0666, nil) + defer db.Close() + + if err := db.Update(func(tx *bolt.Tx) error { + for _, name := range []string{"foo", "bar"} { + b, err := tx.CreateBucket([]byte(name)) + if err != nil { + return err + } + for i := 0; i < 3; i++ { + key := fmt.Sprintf("%s-%d", name, i) + val := fmt.Sprintf("val-%s-%d", name, i) + if err := b.Put([]byte(key), []byte(val)); err != nil { + return err + } + } + } + return nil + }); err != nil { + t.Fatal(err) + } + db.DB.Close() + + expected := "val-foo-1\n" + + // Run the command. + m := NewMain() + if err := m.Run("get", db.Path, "foo", "foo-1"); err != nil { + t.Fatal(err) + } else if actual := m.Stdout.String(); actual != expected { + t.Fatalf("unexpected stdout:\n\n%s", actual) + } +} + // Main represents a test wrapper for main.Main that records output. type Main struct { *main.Main diff --git a/db.go b/db.go index dea8d171..54240c52 100644 --- a/db.go +++ b/db.go @@ -7,8 +7,7 @@ import ( "log" "os" "runtime" - "runtime/debug" - "strings" + "sort" "sync" "time" "unsafe" @@ -23,6 +22,8 @@ const version = 2 // Represents a marker value to indicate that a file is a Bolt DB. const magic uint32 = 0xED0CDAED +const pgidNoFreelist pgid = 0xffffffffffffffff + // IgnoreNoSync specifies whether the NoSync field of a DB is ignored when // syncing changes to a file. This is required as some operating systems, // such as OpenBSD, do not have a unified buffer cache (UBC) and writes @@ -39,6 +40,9 @@ const ( // default page size for db is set to the OS page size. var defaultPageSize = os.Getpagesize() +// The time elapsed between consecutive file locking attempts. +const flockRetryTimeout = 50 * time.Millisecond + // DB represents a collection of buckets persisted to a file on disk. // All data access is performed through transactions which can be obtained through the DB. // All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called. @@ -61,6 +65,11 @@ type DB struct { // THIS IS UNSAFE. PLEASE USE WITH CAUTION. NoSync bool + // When true, skips syncing freelist to disk. This improves the database + // write performance under normal operation, but requires a full database + // re-sync during recovery. + NoFreelistSync bool + // When true, skips the truncate call when growing the database. // Setting this to true is only safe on non-ext3/ext4 systems. // Skipping truncation avoids preallocation of hard drive space and @@ -107,9 +116,11 @@ type DB struct { opened bool rwtx *Tx txs []*Tx - freelist *freelist stats Stats + freelist *freelist + freelistLoad sync.Once + pagePool sync.Pool batchMu sync.Mutex @@ -148,14 +159,17 @@ func (db *DB) String() string { // If the file does not exist then it will be created automatically. // Passing in nil options will cause Bolt to open the database with the default options. func Open(path string, mode os.FileMode, options *Options) (*DB, error) { - var db = &DB{opened: true} - + db := &DB{ + opened: true, + } // Set default options if no options are provided. if options == nil { options = DefaultOptions } + db.NoSync = options.NoSync db.NoGrowSync = options.NoGrowSync db.MmapFlags = options.MmapFlags + db.NoFreelistSync = options.NoFreelistSync // Set default values for later DB operations. db.MaxBatchSize = DefaultMaxBatchSize @@ -184,6 +198,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { // The database file is locked using the shared lock (more than one process may // hold a lock at the same time) otherwise (options.ReadOnly is set). if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil { + db.lockfile = nil // make 'unused' happy. TODO: rework locks _ = db.close() return nil, err } @@ -191,6 +206,11 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { // Default values for test hooks db.ops.writeAt = db.file.WriteAt + if db.pageSize = options.PageSize; db.pageSize == 0 { + // Set the default page size to the OS page size. + db.pageSize = defaultPageSize + } + // Initialize the database if it doesn't exist. if info, err := db.file.Stat(); err != nil { return nil, err @@ -202,20 +222,21 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { } else { // Read the first meta page to determine the page size. var buf [0x1000]byte - if _, err := db.file.ReadAt(buf[:], 0); err == nil { - m := db.pageInBuffer(buf[:], 0).meta() - if err := m.validate(); err != nil { - // If we can't read the page size, we can assume it's the same - // as the OS -- since that's how the page size was chosen in the - // first place. - // - // If the first page is invalid and this OS uses a different - // page size than what the database was created with then we - // are out of luck and cannot access the database. - db.pageSize = os.Getpagesize() - } else { + // If we can't read the page size, but can read a page, assume + // it's the same as the OS or one given -- since that's how the + // page size was chosen in the first place. + // + // If the first page is invalid and this OS uses a different + // page size than what the database was created with then we + // are out of luck and cannot access the database. + // + // TODO: scan for next page + if bw, err := db.file.ReadAt(buf[:], 0); err == nil && bw == len(buf) { + if m := db.pageInBuffer(buf[:], 0).meta(); m.validate() == nil { db.pageSize = int(m.pageSize) } + } else { + return nil, ErrInvalid } } @@ -232,14 +253,50 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { return nil, err } - // Read in the freelist. - db.freelist = newFreelist() - db.freelist.read(db.page(db.meta().freelist)) + if db.readOnly { + return db, nil + } + + db.loadFreelist() + + // Flush freelist when transitioning from no sync to sync so + // NoFreelistSync unaware boltdb can open the db later. + if !db.NoFreelistSync && !db.hasSyncedFreelist() { + tx, err := db.Begin(true) + if tx != nil { + err = tx.Commit() + } + if err != nil { + _ = db.close() + return nil, err + } + } // Mark the database as opened and return. return db, nil } +// loadFreelist reads the freelist if it is synced, or reconstructs it +// by scanning the DB if it is not synced. It assumes there are no +// concurrent accesses being made to the freelist. +func (db *DB) loadFreelist() { + db.freelistLoad.Do(func() { + db.freelist = newFreelist() + if !db.hasSyncedFreelist() { + // Reconstruct free list by scanning the DB. + db.freelist.readIDs(db.freepages()) + } else { + // Read free list from freelist page. + db.freelist.read(db.page(db.meta().freelist)) + } + db.stats.FreePageN = len(db.freelist.ids) + }) +} + +func (db *DB) hasSyncedFreelist() bool { + return db.meta().freelist != pgidNoFreelist +} + // mmap opens the underlying memory-mapped file and initializes the meta references. // minsz is the minimum size that the new mmap can be. func (db *DB) mmap(minsz int) error { @@ -346,9 +403,6 @@ func (db *DB) mmapSize(size int) (int, error) { // init creates a new database file and initializes its meta pages. func (db *DB) init() error { - // Set the page size to the OS page size. - db.pageSize = os.Getpagesize() - // Create two meta pages on a buffer. buf := make([]byte, db.pageSize*4) for i := 0; i < 2; i++ { @@ -392,7 +446,8 @@ func (db *DB) init() error { } // Close releases all database resources. -// All transactions must be closed before closing the database. +// It will block waiting for any open transactions to finish +// before closing the database and returning. func (db *DB) Close() error { db.rwlock.Lock() defer db.rwlock.Unlock() @@ -531,21 +586,36 @@ func (db *DB) beginRWTx() (*Tx, error) { t := &Tx{writable: true} t.init(db) db.rwtx = t + db.freePages() + return t, nil +} - // Free any pages associated with closed read-only transactions. - var minid txid = 0xFFFFFFFFFFFFFFFF - for _, t := range db.txs { - if t.meta.txid < minid { - minid = t.meta.txid - } +// freePages releases any pages associated with closed read-only transactions. +func (db *DB) freePages() { + // Free all pending pages prior to earliest open transaction. + sort.Sort(txsById(db.txs)) + minid := txid(0xFFFFFFFFFFFFFFFF) + if len(db.txs) > 0 { + minid = db.txs[0].meta.txid } if minid > 0 { db.freelist.release(minid - 1) } - - return t, nil + // Release unused txid extents. + for _, t := range db.txs { + db.freelist.releaseRange(minid, t.meta.txid-1) + minid = t.meta.txid + 1 + } + db.freelist.releaseRange(minid, txid(0xFFFFFFFFFFFFFFFF)) + // Any page both allocated and freed in an extent is safe to release. } +type txsById []*Tx + +func (t txsById) Len() int { return len(t) } +func (t txsById) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t txsById) Less(i, j int) bool { return t[i].meta.txid < t[j].meta.txid } + // removeTx removes a transaction from the database. func (db *DB) removeTx(tx *Tx) { // Release the read lock on the mmap. @@ -638,11 +708,7 @@ func (db *DB) View(fn func(*Tx) error) error { return err } - if err := t.Rollback(); err != nil { - return err - } - - return nil + return t.Rollback() } // Batch calls fn as part of a batch. It behaves similar to Update, @@ -742,9 +808,7 @@ retry: // pass success, or bolt internal errors, to all callers for _, c := range b.calls { - if c.err != nil { - c.err <- err - } + c.err <- err } break retry } @@ -831,7 +895,7 @@ func (db *DB) meta() *meta { } // allocate returns a contiguous block of memory starting at a given page. -func (db *DB) allocate(count int) (*page, error) { +func (db *DB) allocate(txid txid, count int) (*page, error) { // Allocate a temporary buffer for the page. var buf []byte if count == 1 { @@ -843,7 +907,7 @@ func (db *DB) allocate(count int) (*page, error) { p.overflow = uint32(count - 1) // Use pages from the freelist if they are available. - if p.id = db.freelist.allocate(count); p.id != 0 { + if p.id = db.freelist.allocate(txid, count); p.id != 0 { return p, nil } @@ -898,6 +962,38 @@ func (db *DB) IsReadOnly() bool { return db.readOnly } +func (db *DB) freepages() []pgid { + tx, err := db.beginTx() + defer func() { + err = tx.Rollback() + if err != nil { + panic("freepages: failed to rollback tx") + } + }() + if err != nil { + panic("freepages: failed to open read only tx") + } + + reachable := make(map[pgid]*page) + nofreed := make(map[pgid]bool) + ech := make(chan error) + go func() { + for e := range ech { + panic(fmt.Sprintf("freepages: failed to get all reachable pages (%v)", e)) + } + }() + tx.checkBucket(&tx.root, reachable, nofreed, ech) + close(ech) + + var fids []pgid + for i := pgid(2); i < db.meta().pgid; i++ { + if _, ok := reachable[i]; !ok { + fids = append(fids, i) + } + } + return fids +} + // Options represents the options that can be set when opening a database. type Options struct { // Timeout is the amount of time to wait to obtain a file lock. @@ -908,6 +1004,10 @@ type Options struct { // Sets the DB.NoGrowSync flag before memory mapping the file. NoGrowSync bool + // Do not sync freelist to disk. This improves the database write performance + // under normal operation, but requires a full database re-sync during recovery. + NoFreelistSync bool + // Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to // grab a shared lock (UNIX). ReadOnly bool @@ -924,6 +1024,14 @@ type Options struct { // If initialMmapSize is smaller than the previous database size, // it takes no effect. InitialMmapSize int + + // PageSize overrides the default OS page size. + PageSize int + + // NoSync sets the initial value of DB.NoSync. Normally this can just be + // set directly on the DB itself when returned from Open(), but this option + // is useful in APIs which expose Options but not the underlying DB. + NoSync bool } // DefaultOptions represent the options used if nil options are passed into Open(). @@ -965,10 +1073,6 @@ func (s *Stats) Sub(other *Stats) Stats { return diff } -func (s *Stats) add(other *Stats) { - s.TxStats.add(&other.TxStats) -} - type Info struct { Data uintptr PageSize int @@ -1007,7 +1111,8 @@ func (m *meta) copy(dest *meta) { func (m *meta) write(p *page) { if m.root.root >= m.pgid { panic(fmt.Sprintf("root bucket pgid (%d) above high water mark (%d)", m.root.root, m.pgid)) - } else if m.freelist >= m.pgid { + } else if m.freelist >= m.pgid && m.freelist != pgidNoFreelist { + // TODO: reject pgidNoFreeList if !NoFreelistSync panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid)) } @@ -1034,11 +1139,3 @@ func _assert(condition bool, msg string, v ...interface{}) { panic(fmt.Sprintf("assertion failed: "+msg, v...)) } } - -func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } -func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } - -func printstack() { - stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n") - fmt.Fprintln(os.Stderr, stack) -} diff --git a/db_test.go b/db_test.go index abded80a..6035ab1f 100644 --- a/db_test.go +++ b/db_test.go @@ -9,11 +9,10 @@ import ( "hash/fnv" "io/ioutil" "log" + "math/rand" "os" "path/filepath" "regexp" - "sort" - "strings" "sync" "testing" "time" @@ -24,12 +23,6 @@ import ( var statsFlag = flag.Bool("stats", false, "show performance stats") -// version is the data file format version. -const version = 2 - -// magic is the marker value to indicate that a file is a Bolt DB. -const magic uint32 = 0xED0CDAED - // pageSize is the size of one page in the data file. const pageSize = 4096 @@ -52,6 +45,8 @@ type meta struct { // Ensure that a database can be opened without error. func TestOpen(t *testing.T) { path := tempfile() + defer os.RemoveAll(path) + db, err := bolt.Open(path, 0666, nil) if err != nil { t.Fatal(err) @@ -87,6 +82,7 @@ func TestOpen_ErrNotExists(t *testing.T) { // Ensure that opening a file that is not a Bolt database returns ErrInvalid. func TestOpen_ErrInvalid(t *testing.T) { path := tempfile() + defer os.RemoveAll(path) f, err := os.Create(path) if err != nil { @@ -98,7 +94,6 @@ func TestOpen_ErrInvalid(t *testing.T) { if err := f.Close(); err != nil { t.Fatal(err) } - defer os.Remove(path) if _, err := bolt.Open(path, 0666, nil); err != bolt.ErrInvalid { t.Fatalf("unexpected error: %s", err) @@ -310,15 +305,16 @@ func TestOpen_Size_Large(t *testing.T) { // Ensure that a re-opened database is consistent. func TestOpen_Check(t *testing.T) { path := tempfile() + defer os.RemoveAll(path) db, err := bolt.Open(path, 0666, nil) if err != nil { t.Fatal(err) } - if err := db.View(func(tx *bolt.Tx) error { return <-tx.Check() }); err != nil { + if err = db.View(func(tx *bolt.Tx) error { return <-tx.Check() }); err != nil { t.Fatal(err) } - if err := db.Close(); err != nil { + if err = db.Close(); err != nil { t.Fatal(err) } @@ -342,17 +338,19 @@ func TestOpen_MetaInitWriteError(t *testing.T) { // Ensure that a database that is too small returns an error. func TestOpen_FileTooSmall(t *testing.T) { path := tempfile() + defer os.RemoveAll(path) db, err := bolt.Open(path, 0666, nil) if err != nil { t.Fatal(err) } - if err := db.Close(); err != nil { + pageSize := int64(db.Info().PageSize) + if err = db.Close(); err != nil { t.Fatal(err) } // corrupt the database - if err := os.Truncate(path, int64(os.Getpagesize())); err != nil { + if err = os.Truncate(path, pageSize); err != nil { t.Fatal(err) } @@ -370,7 +368,7 @@ func TestDB_Open_InitialMmapSize(t *testing.T) { path := tempfile() defer os.Remove(path) - initMmapSize := 1 << 31 // 2GB + initMmapSize := 1 << 30 // 1GB testWriteSize := 1 << 27 // 134MB db, err := bolt.Open(path, 0666, &bolt.Options{InitialMmapSize: initMmapSize}) @@ -422,6 +420,144 @@ func TestDB_Open_InitialMmapSize(t *testing.T) { } } +// TestDB_Open_ReadOnly checks a database in read only mode can read but not write. +func TestDB_Open_ReadOnly(t *testing.T) { + // Create a writable db, write k-v and close it. + db := MustOpenDB() + defer db.MustClose() + + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + if err != nil { + t.Fatal(err) + } + if err := b.Put([]byte("foo"), []byte("bar")); err != nil { + t.Fatal(err) + } + return nil + }); err != nil { + t.Fatal(err) + } + if err := db.DB.Close(); err != nil { + t.Fatal(err) + } + + f := db.f + o := &bolt.Options{ReadOnly: true} + readOnlyDB, err := bolt.Open(f, 0666, o) + if err != nil { + panic(err) + } + + if !readOnlyDB.IsReadOnly() { + t.Fatal("expect db in read only mode") + } + + // Read from a read-only transaction. + if err := readOnlyDB.View(func(tx *bolt.Tx) error { + value := tx.Bucket([]byte("widgets")).Get([]byte("foo")) + if !bytes.Equal(value, []byte("bar")) { + t.Fatal("expect value 'bar', got", value) + } + return nil + }); err != nil { + t.Fatal(err) + } + + // Can't launch read-write transaction. + if _, err := readOnlyDB.Begin(true); err != bolt.ErrDatabaseReadOnly { + t.Fatalf("unexpected error: %s", err) + } + + if err := readOnlyDB.Close(); err != nil { + t.Fatal(err) + } +} + +// TestOpen_BigPage checks the database uses bigger pages when +// changing PageSize. +func TestOpen_BigPage(t *testing.T) { + pageSize := os.Getpagesize() + + db1 := MustOpenWithOption(&bolt.Options{PageSize: pageSize * 2}) + defer db1.MustClose() + + db2 := MustOpenWithOption(&bolt.Options{PageSize: pageSize * 4}) + defer db2.MustClose() + + if db1sz, db2sz := fileSize(db1.f), fileSize(db2.f); db1sz >= db2sz { + t.Errorf("expected %d < %d", db1sz, db2sz) + } +} + +// TestOpen_RecoverFreeList tests opening the DB with free-list +// write-out after no free list sync will recover the free list +// and write it out. +func TestOpen_RecoverFreeList(t *testing.T) { + db := MustOpenWithOption(&bolt.Options{NoFreelistSync: true}) + defer db.MustClose() + + // Write some pages. + tx, err := db.Begin(true) + if err != nil { + t.Fatal(err) + } + wbuf := make([]byte, 8192) + for i := 0; i < 100; i++ { + s := fmt.Sprintf("%d", i) + b, err := tx.CreateBucket([]byte(s)) + if err != nil { + t.Fatal(err) + } + if err = b.Put([]byte(s), wbuf); err != nil { + t.Fatal(err) + } + } + if err = tx.Commit(); err != nil { + t.Fatal(err) + } + + // Generate free pages. + if tx, err = db.Begin(true); err != nil { + t.Fatal(err) + } + for i := 0; i < 50; i++ { + s := fmt.Sprintf("%d", i) + b := tx.Bucket([]byte(s)) + if b == nil { + t.Fatal(err) + } + if err := b.Delete([]byte(s)); err != nil { + t.Fatal(err) + } + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + if err := db.DB.Close(); err != nil { + t.Fatal(err) + } + + // Record freelist count from opening with NoFreelistSync. + db.MustReopen() + freepages := db.Stats().FreePageN + if freepages == 0 { + t.Fatalf("no free pages on NoFreelistSync reopen") + } + if err := db.DB.Close(); err != nil { + t.Fatal(err) + } + + // Check free page count is reconstructed when opened with freelist sync. + db.o = &bolt.Options{} + db.MustReopen() + // One less free page for syncing the free list on open. + freepages-- + if fp := db.Stats().FreePageN; fp < freepages { + t.Fatalf("closed with %d free pages, opened with %d", freepages, fp) + } +} + // Ensure that a database cannot open a transaction when it's not open. func TestDB_Begin_ErrDatabaseNotOpen(t *testing.T) { var db bolt.DB @@ -453,6 +589,65 @@ func TestDB_BeginRW(t *testing.T) { } } +// TestDB_Concurrent_WriteTo checks that issuing WriteTo operations concurrently +// with commits does not produce corrupted db files. +func TestDB_Concurrent_WriteTo(t *testing.T) { + o := &bolt.Options{NoFreelistSync: false} + db := MustOpenWithOption(o) + defer db.MustClose() + + var wg sync.WaitGroup + wtxs, rtxs := 5, 5 + wg.Add(wtxs * rtxs) + f := func(tx *bolt.Tx) { + defer wg.Done() + f, err := ioutil.TempFile("", "bolt-") + if err != nil { + panic(err) + } + time.Sleep(time.Duration(rand.Intn(20)+1) * time.Millisecond) + tx.WriteTo(f) + tx.Rollback() + f.Close() + snap := &DB{nil, f.Name(), o} + snap.MustReopen() + defer snap.MustClose() + snap.MustCheck() + } + + tx1, err := db.Begin(true) + if err != nil { + t.Fatal(err) + } + if _, err := tx1.CreateBucket([]byte("abc")); err != nil { + t.Fatal(err) + } + if err := tx1.Commit(); err != nil { + t.Fatal(err) + } + + for i := 0; i < wtxs; i++ { + tx, err := db.Begin(true) + if err != nil { + t.Fatal(err) + } + if err := tx.Bucket([]byte("abc")).Put([]byte{0}, []byte{0}); err != nil { + t.Fatal(err) + } + for j := 0; j < rtxs; j++ { + rtx, rerr := db.Begin(false) + if rerr != nil { + t.Fatal(rerr) + } + go f(rtx) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + } + wg.Wait() +} + // Ensure that opening a transaction while the DB is closed returns an error. func TestDB_BeginRW_Closed(t *testing.T) { var db bolt.DB @@ -1140,7 +1335,7 @@ func ExampleDB_Begin_ReadOnly() { defer os.Remove(db.Path()) // Create a bucket using a read-write transaction. - if err := db.Update(func(tx *bolt.Tx) error { + if err = db.Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucket([]byte("widgets")) return err }); err != nil { @@ -1153,16 +1348,16 @@ func ExampleDB_Begin_ReadOnly() { log.Fatal(err) } b := tx.Bucket([]byte("widgets")) - if err := b.Put([]byte("john"), []byte("blue")); err != nil { + if err = b.Put([]byte("john"), []byte("blue")); err != nil { log.Fatal(err) } - if err := b.Put([]byte("abby"), []byte("red")); err != nil { + if err = b.Put([]byte("abby"), []byte("red")); err != nil { log.Fatal(err) } - if err := b.Put([]byte("zephyr"), []byte("purple")); err != nil { + if err = b.Put([]byte("zephyr"), []byte("purple")); err != nil { log.Fatal(err) } - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { log.Fatal(err) } @@ -1176,11 +1371,11 @@ func ExampleDB_Begin_ReadOnly() { fmt.Printf("%s likes %s\n", k, v) } - if err := tx.Rollback(); err != nil { + if err = tx.Rollback(); err != nil { log.Fatal(err) } - if err := db.Close(); err != nil { + if err = db.Close(); err != nil { log.Fatal(err) } @@ -1366,15 +1561,27 @@ func validateBatchBench(b *testing.B, db *DB) { // DB is a test wrapper for bolt.DB. type DB struct { *bolt.DB + f string + o *bolt.Options } // MustOpenDB returns a new, open DB at a temporary location. func MustOpenDB() *DB { - db, err := bolt.Open(tempfile(), 0666, nil) + return MustOpenWithOption(nil) +} + +// MustOpenDBWithOption returns a new, open DB at a temporary location with given options. +func MustOpenWithOption(o *bolt.Options) *DB { + f := tempfile() + db, err := bolt.Open(f, 0666, o) if err != nil { panic(err) } - return &DB{db} + return &DB{ + DB: db, + f: f, + o: o, + } } // Close closes the database and deletes the underlying file. @@ -1399,6 +1606,15 @@ func (db *DB) MustClose() { } } +// MustReopen reopen the database. Panic on error. +func (db *DB) MustReopen() { + indb, err := bolt.Open(db.f, 0666, db.o) + if err != nil { + panic(err) + } + db.DB = indb +} + // PrintStats prints the database stats func (db *DB) PrintStats() { var stats = db.Stats() @@ -1478,40 +1694,6 @@ func tempfile() string { return f.Name() } -// mustContainKeys checks that a bucket contains a given set of keys. -func mustContainKeys(b *bolt.Bucket, m map[string]string) { - found := make(map[string]string) - if err := b.ForEach(func(k, _ []byte) error { - found[string(k)] = "" - return nil - }); err != nil { - panic(err) - } - - // Check for keys found in bucket that shouldn't be there. - var keys []string - for k, _ := range found { - if _, ok := m[string(k)]; !ok { - keys = append(keys, k) - } - } - if len(keys) > 0 { - sort.Strings(keys) - panic(fmt.Sprintf("keys found(%d): %s", len(keys), strings.Join(keys, ","))) - } - - // Check for keys not found in bucket that should be there. - for k, _ := range m { - if _, ok := found[string(k)]; !ok { - keys = append(keys, k) - } - } - if len(keys) > 0 { - sort.Strings(keys) - panic(fmt.Sprintf("keys not found(%d): %s", len(keys), strings.Join(keys, ","))) - } -} - func trunc(b []byte, length int) []byte { if length < len(b) { return b[:length] @@ -1531,15 +1713,9 @@ func fileSize(path string) int64 { return fi.Size() } -func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } -func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } - // u64tob converts a uint64 into an 8-byte slice. func u64tob(v uint64) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, v) return b } - -// btou64 converts an 8-byte slice into an uint64. -func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/freelist.go b/freelist.go index aba48f58..266f1542 100644 --- a/freelist.go +++ b/freelist.go @@ -6,18 +6,28 @@ import ( "unsafe" ) +// txPending holds a list of pgids and corresponding allocation txns +// that are pending to be freed. +type txPending struct { + ids []pgid + alloctx []txid // txids allocating the ids + lastReleaseBegin txid // beginning txid of last matching releaseRange +} + // freelist represents a list of all pages that are available for allocation. // It also tracks pages that have been freed but are still in use by open transactions. type freelist struct { - ids []pgid // all free and available free page ids. - pending map[txid][]pgid // mapping of soon-to-be free page ids by tx. - cache map[pgid]bool // fast lookup of all free and pending page ids. + ids []pgid // all free and available free page ids. + allocs map[pgid]txid // mapping of txid that allocated a pgid. + pending map[txid]*txPending // mapping of soon-to-be free page ids by tx. + cache map[pgid]bool // fast lookup of all free and pending page ids. } // newFreelist returns an empty, initialized freelist. func newFreelist() *freelist { return &freelist{ - pending: make(map[txid][]pgid), + allocs: make(map[pgid]txid), + pending: make(map[txid]*txPending), cache: make(map[pgid]bool), } } @@ -45,8 +55,8 @@ func (f *freelist) free_count() int { // pending_count returns count of pending pages func (f *freelist) pending_count() int { var count int - for _, list := range f.pending { - count += len(list) + for _, txp := range f.pending { + count += len(txp.ids) } return count } @@ -55,8 +65,8 @@ func (f *freelist) pending_count() int { // f.count returns the minimum length required for dst. func (f *freelist) copyall(dst []pgid) { m := make(pgids, 0, f.pending_count()) - for _, list := range f.pending { - m = append(m, list...) + for _, txp := range f.pending { + m = append(m, txp.ids...) } sort.Sort(m) mergepgids(dst, f.ids, m) @@ -64,7 +74,7 @@ func (f *freelist) copyall(dst []pgid) { // allocate returns the starting page id of a contiguous list of pages of a given size. // If a contiguous block cannot be found then 0 is returned. -func (f *freelist) allocate(n int) pgid { +func (f *freelist) allocate(txid txid, n int) pgid { if len(f.ids) == 0 { return 0 } @@ -97,7 +107,7 @@ func (f *freelist) allocate(n int) pgid { for i := pgid(0); i < pgid(n); i++ { delete(f.cache, initial+i) } - + f.allocs[initial] = txid return initial } @@ -114,28 +124,73 @@ func (f *freelist) free(txid txid, p *page) { } // Free page and all its overflow pages. - var ids = f.pending[txid] + txp := f.pending[txid] + if txp == nil { + txp = &txPending{} + f.pending[txid] = txp + } + allocTxid, ok := f.allocs[p.id] + if ok { + delete(f.allocs, p.id) + } else if (p.flags & freelistPageFlag) != 0 { + // Freelist is always allocated by prior tx. + allocTxid = txid - 1 + } + for id := p.id; id <= p.id+pgid(p.overflow); id++ { // Verify that page is not already free. if f.cache[id] { panic(fmt.Sprintf("page %d already freed", id)) } - // Add to the freelist and cache. - ids = append(ids, id) + txp.ids = append(txp.ids, id) + txp.alloctx = append(txp.alloctx, allocTxid) f.cache[id] = true } - f.pending[txid] = ids } // release moves all page ids for a transaction id (or older) to the freelist. func (f *freelist) release(txid txid) { m := make(pgids, 0) - for tid, ids := range f.pending { + for tid, txp := range f.pending { if tid <= txid { // Move transaction's pending pages to the available freelist. // Don't remove from the cache since the page is still free. - m = append(m, ids...) + m = append(m, txp.ids...) + delete(f.pending, tid) + } + } + sort.Sort(m) + f.ids = pgids(f.ids).merge(m) +} + +// releaseRange moves pending pages allocated within an extent [begin,end] to the free list. +func (f *freelist) releaseRange(begin, end txid) { + if begin > end { + return + } + var m pgids + for tid, txp := range f.pending { + if tid < begin || tid > end { + continue + } + // Don't recompute freed pages if ranges haven't updated. + if txp.lastReleaseBegin == begin { + continue + } + for i := 0; i < len(txp.ids); i++ { + if atx := txp.alloctx[i]; atx < begin || atx > end { + continue + } + m = append(m, txp.ids[i]) + txp.ids[i] = txp.ids[len(txp.ids)-1] + txp.ids = txp.ids[:len(txp.ids)-1] + txp.alloctx[i] = txp.alloctx[len(txp.alloctx)-1] + txp.alloctx = txp.alloctx[:len(txp.alloctx)-1] + i-- + } + txp.lastReleaseBegin = begin + if len(txp.ids) == 0 { delete(f.pending, tid) } } @@ -146,12 +201,29 @@ func (f *freelist) release(txid txid) { // rollback removes the pages from a given pending tx. func (f *freelist) rollback(txid txid) { // Remove page ids from cache. - for _, id := range f.pending[txid] { - delete(f.cache, id) + txp := f.pending[txid] + if txp == nil { + return } - - // Remove pages from pending list. + var m pgids + for i, pgid := range txp.ids { + delete(f.cache, pgid) + tx := txp.alloctx[i] + if tx == 0 { + continue + } + if tx != txid { + // Pending free aborted; restore page back to alloc list. + f.allocs[pgid] = tx + } else { + // Freed page was allocated by this txn; OK to throw away. + m = append(m, pgid) + } + } + // Remove pages from pending list and mark as free if allocated by txid. delete(f.pending, txid) + sort.Sort(m) + f.ids = pgids(f.ids).merge(m) } // freed returns whether a given page is in the free list. @@ -161,6 +233,9 @@ func (f *freelist) freed(pgid pgid) bool { // read initializes the freelist from a freelist page. func (f *freelist) read(p *page) { + if (p.flags & freelistPageFlag) == 0 { + panic(fmt.Sprintf("invalid freelist page: %d, page type is %s", p.id, p.typ())) + } // If the page.count is at the max uint16 value (64k) then it's considered // an overflow and the size of the freelist is stored as the first element. idx, count := 0, int(p.count) @@ -173,7 +248,7 @@ func (f *freelist) read(p *page) { if count == 0 { f.ids = nil } else { - ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count] + ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx : idx+count] f.ids = make([]pgid, len(ids)) copy(f.ids, ids) @@ -185,6 +260,12 @@ func (f *freelist) read(p *page) { f.reindex() } +// read initializes the freelist from a given list of ids. +func (f *freelist) readIDs(ids []pgid) { + f.ids = ids + f.reindex() +} + // write writes the page ids onto a freelist page. All free and pending ids are // saved to disk since in the event of a program crash, all pending ids will // become free. @@ -217,8 +298,8 @@ func (f *freelist) reload(p *page) { // Build a cache of only pending pages. pcache := make(map[pgid]bool) - for _, pendingIDs := range f.pending { - for _, pendingID := range pendingIDs { + for _, txp := range f.pending { + for _, pendingID := range txp.ids { pcache[pendingID] = true } } @@ -244,8 +325,8 @@ func (f *freelist) reindex() { for _, id := range f.ids { f.cache[id] = true } - for _, pendingIDs := range f.pending { - for _, pendingID := range pendingIDs { + for _, txp := range f.pending { + for _, pendingID := range txp.ids { f.cache[pendingID] = true } } diff --git a/freelist_test.go b/freelist_test.go index 4e9b3a8d..24ed4cf1 100644 --- a/freelist_test.go +++ b/freelist_test.go @@ -12,7 +12,7 @@ import ( func TestFreelist_free(t *testing.T) { f := newFreelist() f.free(100, &page{id: 12}) - if !reflect.DeepEqual([]pgid{12}, f.pending[100]) { + if !reflect.DeepEqual([]pgid{12}, f.pending[100].ids) { t.Fatalf("exp=%v; got=%v", []pgid{12}, f.pending[100]) } } @@ -21,7 +21,7 @@ func TestFreelist_free(t *testing.T) { func TestFreelist_free_overflow(t *testing.T) { f := newFreelist() f.free(100, &page{id: 12, overflow: 3}) - if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100]) { + if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100].ids) { t.Fatalf("exp=%v; got=%v", exp, f.pending[100]) } } @@ -44,41 +44,170 @@ func TestFreelist_release(t *testing.T) { } } +// Ensure that releaseRange handles boundary conditions correctly +func TestFreelist_releaseRange(t *testing.T) { + type testRange struct { + begin, end txid + } + + type testPage struct { + id pgid + n int + allocTxn txid + freeTxn txid + } + + var releaseRangeTests = []struct { + title string + pagesIn []testPage + releaseRanges []testRange + wantFree []pgid + }{ + { + title: "Single pending in range", + pagesIn: []testPage{{id: 3, n: 1, allocTxn: 100, freeTxn: 200}}, + releaseRanges: []testRange{{1, 300}}, + wantFree: []pgid{3}, + }, + { + title: "Single pending with minimum end range", + pagesIn: []testPage{{id: 3, n: 1, allocTxn: 100, freeTxn: 200}}, + releaseRanges: []testRange{{1, 200}}, + wantFree: []pgid{3}, + }, + { + title: "Single pending outsize minimum end range", + pagesIn: []testPage{{id: 3, n: 1, allocTxn: 100, freeTxn: 200}}, + releaseRanges: []testRange{{1, 199}}, + wantFree: nil, + }, + { + title: "Single pending with minimum begin range", + pagesIn: []testPage{{id: 3, n: 1, allocTxn: 100, freeTxn: 200}}, + releaseRanges: []testRange{{100, 300}}, + wantFree: []pgid{3}, + }, + { + title: "Single pending outside minimum begin range", + pagesIn: []testPage{{id: 3, n: 1, allocTxn: 100, freeTxn: 200}}, + releaseRanges: []testRange{{101, 300}}, + wantFree: nil, + }, + { + title: "Single pending in minimum range", + pagesIn: []testPage{{id: 3, n: 1, allocTxn: 199, freeTxn: 200}}, + releaseRanges: []testRange{{199, 200}}, + wantFree: []pgid{3}, + }, + { + title: "Single pending and read transaction at 199", + pagesIn: []testPage{{id: 3, n: 1, allocTxn: 199, freeTxn: 200}}, + releaseRanges: []testRange{{100, 198}, {200, 300}}, + wantFree: nil, + }, + { + title: "Adjacent pending and read transactions at 199, 200", + pagesIn: []testPage{ + {id: 3, n: 1, allocTxn: 199, freeTxn: 200}, + {id: 4, n: 1, allocTxn: 200, freeTxn: 201}, + }, + releaseRanges: []testRange{ + {100, 198}, + {200, 199}, // Simulate the ranges db.freePages might produce. + {201, 300}, + }, + wantFree: nil, + }, + { + title: "Out of order ranges", + pagesIn: []testPage{ + {id: 3, n: 1, allocTxn: 199, freeTxn: 200}, + {id: 4, n: 1, allocTxn: 200, freeTxn: 201}, + }, + releaseRanges: []testRange{ + {201, 199}, + {201, 200}, + {200, 200}, + }, + wantFree: nil, + }, + { + title: "Multiple pending, read transaction at 150", + pagesIn: []testPage{ + {id: 3, n: 1, allocTxn: 100, freeTxn: 200}, + {id: 4, n: 1, allocTxn: 100, freeTxn: 125}, + {id: 5, n: 1, allocTxn: 125, freeTxn: 150}, + {id: 6, n: 1, allocTxn: 125, freeTxn: 175}, + {id: 7, n: 2, allocTxn: 150, freeTxn: 175}, + {id: 9, n: 2, allocTxn: 175, freeTxn: 200}, + }, + releaseRanges: []testRange{{50, 149}, {151, 300}}, + wantFree: []pgid{4, 9}, + }, + } + + for _, c := range releaseRangeTests { + f := newFreelist() + + for _, p := range c.pagesIn { + for i := uint64(0); i < uint64(p.n); i++ { + f.ids = append(f.ids, pgid(uint64(p.id)+i)) + } + } + for _, p := range c.pagesIn { + f.allocate(p.allocTxn, p.n) + } + + for _, p := range c.pagesIn { + f.free(p.freeTxn, &page{id: p.id}) + } + + for _, r := range c.releaseRanges { + f.releaseRange(r.begin, r.end) + } + + if exp := c.wantFree; !reflect.DeepEqual(exp, f.ids) { + t.Errorf("exp=%v; got=%v for %s", exp, f.ids, c.title) + } + } +} + // Ensure that a freelist can find contiguous blocks of pages. func TestFreelist_allocate(t *testing.T) { - f := &freelist{ids: []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18}} - if id := int(f.allocate(3)); id != 3 { + f := newFreelist() + f.ids = []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18} + if id := int(f.allocate(1, 3)); id != 3 { t.Fatalf("exp=3; got=%v", id) } - if id := int(f.allocate(1)); id != 6 { + if id := int(f.allocate(1, 1)); id != 6 { t.Fatalf("exp=6; got=%v", id) } - if id := int(f.allocate(3)); id != 0 { + if id := int(f.allocate(1, 3)); id != 0 { t.Fatalf("exp=0; got=%v", id) } - if id := int(f.allocate(2)); id != 12 { + if id := int(f.allocate(1, 2)); id != 12 { t.Fatalf("exp=12; got=%v", id) } - if id := int(f.allocate(1)); id != 7 { + if id := int(f.allocate(1, 1)); id != 7 { t.Fatalf("exp=7; got=%v", id) } - if id := int(f.allocate(0)); id != 0 { + if id := int(f.allocate(1, 0)); id != 0 { t.Fatalf("exp=0; got=%v", id) } - if id := int(f.allocate(0)); id != 0 { + if id := int(f.allocate(1, 0)); id != 0 { t.Fatalf("exp=0; got=%v", id) } if exp := []pgid{9, 18}; !reflect.DeepEqual(exp, f.ids) { t.Fatalf("exp=%v; got=%v", exp, f.ids) } - if id := int(f.allocate(1)); id != 9 { + if id := int(f.allocate(1, 1)); id != 9 { t.Fatalf("exp=9; got=%v", id) } - if id := int(f.allocate(1)); id != 18 { + if id := int(f.allocate(1, 1)); id != 18 { t.Fatalf("exp=18; got=%v", id) } - if id := int(f.allocate(1)); id != 0 { + if id := int(f.allocate(1, 1)); id != 0 { t.Fatalf("exp=0; got=%v", id) } if exp := []pgid{}; !reflect.DeepEqual(exp, f.ids) { @@ -113,9 +242,9 @@ func TestFreelist_read(t *testing.T) { func TestFreelist_write(t *testing.T) { // Create a freelist and write it to a page. var buf [4096]byte - f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid][]pgid)} - f.pending[100] = []pgid{28, 11} - f.pending[101] = []pgid{3} + f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid]*txPending)} + f.pending[100] = &txPending{ids: []pgid{28, 11}} + f.pending[101] = &txPending{ids: []pgid{3}} p := (*page)(unsafe.Pointer(&buf[0])) if err := f.write(p); err != nil { t.Fatal(err) @@ -142,7 +271,8 @@ func benchmark_FreelistRelease(b *testing.B, size int) { pending := randomPgids(len(ids) / 400) b.ResetTimer() for i := 0; i < b.N; i++ { - f := &freelist{ids: ids, pending: map[txid][]pgid{1: pending}} + txp := &txPending{ids: pending} + f := &freelist{ids: ids, pending: map[txid]*txPending{1: txp}} f.release(1) } } diff --git a/node.go b/node.go index 159318b2..f4ce240e 100644 --- a/node.go +++ b/node.go @@ -365,7 +365,7 @@ func (n *node) spill() error { } // Allocate contiguous space for the node. - p, err := tx.allocate((node.size() / tx.db.pageSize) + 1) + p, err := tx.allocate((node.size() + tx.db.pageSize - 1) / tx.db.pageSize) if err != nil { return err } diff --git a/simulation_no_freelist_sync_test.go b/simulation_no_freelist_sync_test.go new file mode 100644 index 00000000..21ec22d2 --- /dev/null +++ b/simulation_no_freelist_sync_test.go @@ -0,0 +1,47 @@ +package bolt_test + +import ( + "testing" + + "github.com/NebulousLabs/bolt" +) + +func TestSimulateNoFreeListSync_1op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1, 1) +} +func TestSimulateNoFreeListSync_10op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10, 1) +} +func TestSimulateNoFreeListSync_100op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 100, 1) +} +func TestSimulateNoFreeListSync_1000op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1000, 1) +} +func TestSimulateNoFreeListSync_10000op_1p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 1) +} +func TestSimulateNoFreeListSync_10op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10, 10) +} +func TestSimulateNoFreeListSync_100op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 100, 10) +} +func TestSimulateNoFreeListSync_1000op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1000, 10) +} +func TestSimulateNoFreeListSync_10000op_10p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 10) +} +func TestSimulateNoFreeListSync_100op_100p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 100, 100) +} +func TestSimulateNoFreeListSync_1000op_100p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1000, 100) +} +func TestSimulateNoFreeListSync_10000op_100p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 100) +} +func TestSimulateNoFreeListSync_10000op_1000p(t *testing.T) { + testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 1000) +} diff --git a/simulation_test.go b/simulation_test.go index 119b19d5..633fc9f7 100644 --- a/simulation_test.go +++ b/simulation_test.go @@ -10,25 +10,25 @@ import ( "github.com/NebulousLabs/bolt" ) -func TestSimulate_1op_1p(t *testing.T) { testSimulate(t, 1, 1) } -func TestSimulate_10op_1p(t *testing.T) { testSimulate(t, 10, 1) } -func TestSimulate_100op_1p(t *testing.T) { testSimulate(t, 100, 1) } -func TestSimulate_1000op_1p(t *testing.T) { testSimulate(t, 1000, 1) } -func TestSimulate_10000op_1p(t *testing.T) { testSimulate(t, 10000, 1) } +func TestSimulate_1op_1p(t *testing.T) { testSimulate(t, nil, 1, 1, 1) } +func TestSimulate_10op_1p(t *testing.T) { testSimulate(t, nil, 1, 10, 1) } +func TestSimulate_100op_1p(t *testing.T) { testSimulate(t, nil, 1, 100, 1) } +func TestSimulate_1000op_1p(t *testing.T) { testSimulate(t, nil, 1, 1000, 1) } +func TestSimulate_10000op_1p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1) } -func TestSimulate_10op_10p(t *testing.T) { testSimulate(t, 10, 10) } -func TestSimulate_100op_10p(t *testing.T) { testSimulate(t, 100, 10) } -func TestSimulate_1000op_10p(t *testing.T) { testSimulate(t, 1000, 10) } -func TestSimulate_10000op_10p(t *testing.T) { testSimulate(t, 10000, 10) } +func TestSimulate_10op_10p(t *testing.T) { testSimulate(t, nil, 1, 10, 10) } +func TestSimulate_100op_10p(t *testing.T) { testSimulate(t, nil, 1, 100, 10) } +func TestSimulate_1000op_10p(t *testing.T) { testSimulate(t, nil, 1, 1000, 10) } +func TestSimulate_10000op_10p(t *testing.T) { testSimulate(t, nil, 1, 10000, 10) } -func TestSimulate_100op_100p(t *testing.T) { testSimulate(t, 100, 100) } -func TestSimulate_1000op_100p(t *testing.T) { testSimulate(t, 1000, 100) } -func TestSimulate_10000op_100p(t *testing.T) { testSimulate(t, 10000, 100) } +func TestSimulate_100op_100p(t *testing.T) { testSimulate(t, nil, 1, 100, 100) } +func TestSimulate_1000op_100p(t *testing.T) { testSimulate(t, nil, 1, 1000, 100) } +func TestSimulate_10000op_100p(t *testing.T) { testSimulate(t, nil, 1, 10000, 100) } -func TestSimulate_10000op_1000p(t *testing.T) { testSimulate(t, 10000, 1000) } +func TestSimulate_10000op_1000p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1000) } // Randomly generate operations on a given database with multiple clients to ensure consistency and thread safety. -func testSimulate(t *testing.T, threadCount, parallelism int) { +func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, parallelism int) { if testing.Short() { t.Skip("skipping test in short mode.") } @@ -42,81 +42,88 @@ func testSimulate(t *testing.T, threadCount, parallelism int) { var versions = make(map[int]*QuickDB) versions[1] = NewQuickDB() - db := MustOpenDB() + db := MustOpenWithOption(openOption) defer db.MustClose() var mutex sync.Mutex // Run n threads in parallel, each with their own operation. var wg sync.WaitGroup - var threads = make(chan bool, parallelism) - var i int - for { - threads <- true - wg.Add(1) - writable := ((rand.Int() % 100) < 20) // 20% writers - - // Choose an operation to execute. - var handler simulateHandler - if writable { - handler = writerHandlers[rand.Intn(len(writerHandlers))] - } else { - handler = readerHandlers[rand.Intn(len(readerHandlers))] - } - - // Execute a thread for the given operation. - go func(writable bool, handler simulateHandler) { - defer wg.Done() - // Start transaction. - tx, err := db.Begin(writable) - if err != nil { - t.Fatal("tx begin: ", err) - } + for n := 0; n < round; n++ { - // Obtain current state of the dataset. - mutex.Lock() - var qdb = versions[tx.ID()] - if writable { - qdb = versions[tx.ID()-1].Copy() - } - mutex.Unlock() + var threads = make(chan bool, parallelism) + var i int + for { + threads <- true + wg.Add(1) + writable := ((rand.Int() % 100) < 20) // 20% writers - // Make sure we commit/rollback the tx at the end and update the state. + // Choose an operation to execute. + var handler simulateHandler if writable { - defer func() { - mutex.Lock() - versions[tx.ID()] = qdb - mutex.Unlock() - - if err := tx.Commit(); err != nil { - t.Fatal(err) - } - }() + handler = writerHandlers[rand.Intn(len(writerHandlers))] } else { - defer func() { _ = tx.Rollback() }() + handler = readerHandlers[rand.Intn(len(readerHandlers))] } - // Ignore operation if we don't have data yet. - if qdb == nil { - return + // Execute a thread for the given operation. + go func(writable bool, handler simulateHandler) { + defer wg.Done() + + // Start transaction. + tx, err := db.Begin(writable) + if err != nil { + t.Fatal("tx begin: ", err) + } + + // Obtain current state of the dataset. + mutex.Lock() + var qdb = versions[tx.ID()] + if writable { + qdb = versions[tx.ID()-1].Copy() + } + mutex.Unlock() + + // Make sure we commit/rollback the tx at the end and update the state. + if writable { + defer func() { + mutex.Lock() + versions[tx.ID()] = qdb + mutex.Unlock() + + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + }() + } else { + defer func() { _ = tx.Rollback() }() + } + + // Ignore operation if we don't have data yet. + if qdb == nil { + return + } + + // Execute handler. + handler(tx, qdb) + + // Release a thread back to the scheduling loop. + <-threads + }(writable, handler) + + i++ + if i > threadCount { + break } + } - // Execute handler. - handler(tx, qdb) - - // Release a thread back to the scheduling loop. - <-threads - }(writable, handler) + // Wait until all threads are done. + wg.Wait() - i++ - if i > threadCount { - break - } + db.MustClose() + db.MustReopen() } - - // Wait until all threads are done. - wg.Wait() } type simulateHandler func(tx *bolt.Tx, qdb *QuickDB) diff --git a/tx.go b/tx.go index b9ac6a84..94f69706 100644 --- a/tx.go +++ b/tx.go @@ -126,10 +126,7 @@ func (tx *Tx) DeleteBucket(name []byte) error { // the error is returned to the caller. func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error { return tx.root.ForEach(func(k, v []byte) error { - if err := fn(k, tx.root.Bucket(k)); err != nil { - return err - } - return nil + return fn(k, tx.root.Bucket(k)) }) } @@ -169,28 +166,18 @@ func (tx *Tx) Commit() error { // Free the old root bucket. tx.meta.root.root = tx.root.root - opgid := tx.meta.pgid - - // Free the freelist and allocate new pages for it. This will overestimate - // the size of the freelist but not underestimate the size (which would be bad). - tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist)) - p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) - if err != nil { - tx.rollback() - return err + // Free the old freelist because commit writes out a fresh freelist. + if tx.meta.freelist != pgidNoFreelist { + tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist)) } - if err := tx.db.freelist.write(p); err != nil { - tx.rollback() - return err - } - tx.meta.freelist = p.id - // If the high water mark has moved up then attempt to grow the database. - if tx.meta.pgid > opgid { - if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil { - tx.rollback() + if !tx.db.NoFreelistSync { + err := tx.commitFreelist() + if err != nil { return err } + } else { + tx.meta.freelist = pgidNoFreelist } // Write dirty pages to disk. @@ -235,6 +222,31 @@ func (tx *Tx) Commit() error { return nil } +func (tx *Tx) commitFreelist() error { + // Allocate new pages for the new free list. This will overestimate + // the size of the freelist but not underestimate the size (which would be bad). + opgid := tx.meta.pgid + p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) + if err != nil { + tx.rollback() + return err + } + if err := tx.db.freelist.write(p); err != nil { + tx.rollback() + return err + } + tx.meta.freelist = p.id + // If the high water mark has moved up then attempt to grow the database. + if tx.meta.pgid > opgid { + if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil { + tx.rollback() + return err + } + } + + return nil +} + // Rollback closes the transaction and ignores all previous updates. Read-only // transactions must be rolled back and not committed. func (tx *Tx) Rollback() error { @@ -305,7 +317,11 @@ func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { if err != nil { return 0, err } - defer func() { _ = f.Close() }() + defer func() { + if cerr := f.Close(); err == nil { + err = cerr + } + }() // Generate a meta page. We use the same page data for both meta pages. buf := make([]byte, tx.db.pageSize) @@ -333,7 +349,7 @@ func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { } // Move past the meta pages in the file. - if _, err := f.Seek(int64(tx.db.pageSize*2), os.SEEK_SET); err != nil { + if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil { return n, fmt.Errorf("seek: %s", err) } @@ -344,7 +360,7 @@ func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { return n, err } - return n, f.Close() + return n, nil } // CopyFile copies the entire database to file at the given path. @@ -391,6 +407,9 @@ func (tx *Tx) Check() <-chan error { } func (tx *Tx) check(ch chan error) { + // Force loading free list if opened in ReadOnly mode. + tx.db.loadFreelist() + // Check if any pages are double freed. freed := make(map[pgid]bool) all := make([]pgid, tx.db.freelist.count()) @@ -406,8 +425,10 @@ func (tx *Tx) check(ch chan error) { reachable := make(map[pgid]*page) reachable[0] = tx.page(0) // meta0 reachable[1] = tx.page(1) // meta1 - for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ { - reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist) + if tx.meta.freelist != pgidNoFreelist { + for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ { + reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist) + } } // Recursively check buckets. @@ -465,7 +486,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo // allocate returns a contiguous block of memory starting at a given page. func (tx *Tx) allocate(count int) (*page, error) { - p, err := tx.db.allocate(count) + p, err := tx.db.allocate(tx.meta.txid, count) if err != nil { return nil, err } @@ -474,7 +495,7 @@ func (tx *Tx) allocate(count int) (*page, error) { tx.pages[p.id] = p // Update statistics. - tx.stats.PageCount++ + tx.stats.PageCount += count tx.stats.PageAlloc += count * tx.db.pageSize return p, nil diff --git a/tx_test.go b/tx_test.go index f4f7784a..abac84b5 100644 --- a/tx_test.go +++ b/tx_test.go @@ -11,6 +11,54 @@ import ( "github.com/NebulousLabs/bolt" ) +// TestTx_Check_ReadOnly tests consistency checking on a ReadOnly database. +func TestTx_Check_ReadOnly(t *testing.T) { + db := MustOpenDB() + defer db.Close() + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("widgets")) + if err != nil { + t.Fatal(err) + } + if err := b.Put([]byte("foo"), []byte("bar")); err != nil { + t.Fatal(err) + } + return nil + }); err != nil { + t.Fatal(err) + } + if err := db.DB.Close(); err != nil { + t.Fatal(err) + } + + readOnlyDB, err := bolt.Open(db.f, 0666, &bolt.Options{ReadOnly: true}) + if err != nil { + t.Fatal(err) + } + defer readOnlyDB.Close() + + tx, err := readOnlyDB.Begin(false) + if err != nil { + t.Fatal(err) + } + // ReadOnly DB will load freelist on Check call. + numChecks := 2 + errc := make(chan error, numChecks) + check := func() { + err, _ := <-tx.Check() + errc <- err + } + // Ensure the freelist is not reloaded and does not race. + for i := 0; i < numChecks; i++ { + go check() + } + for i := 0; i < numChecks; i++ { + if err := <-errc; err != nil { + t.Fatal(err) + } + } +} + // Ensure that committing a closed transaction returns an error. func TestTx_Commit_ErrTxClosed(t *testing.T) { db := MustOpenDB() @@ -659,6 +707,111 @@ func TestTx_CopyFile_Error_Normal(t *testing.T) { } } +// TestTx_releaseRange ensures db.freePages handles page releases +// correctly when there are transaction that are no longer reachable +// via any read/write transactions and are "between" ongoing read +// transactions, which requires they must be freed by +// freelist.releaseRange. +func TestTx_releaseRange(t *testing.T) { + // Set initial mmap size well beyond the limit we will hit in this + // test, since we are testing with long running read transactions + // and will deadlock if db.grow is triggered. + db := MustOpenWithOption(&bolt.Options{InitialMmapSize: os.Getpagesize() * 100}) + defer db.MustClose() + + bucket := "bucket" + + put := func(key, value string) { + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucket)) + if err != nil { + t.Fatal(err) + } + return b.Put([]byte(key), []byte(value)) + }); err != nil { + t.Fatal(err) + } + } + + del := func(key string) { + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(bucket)) + if err != nil { + t.Fatal(err) + } + return b.Delete([]byte(key)) + }); err != nil { + t.Fatal(err) + } + } + + getWithTxn := func(txn *bolt.Tx, key string) []byte { + return txn.Bucket([]byte(bucket)).Get([]byte(key)) + } + + openReadTxn := func() *bolt.Tx { + readTx, err := db.Begin(false) + if err != nil { + t.Fatal(err) + } + return readTx + } + + checkWithReadTxn := func(txn *bolt.Tx, key string, wantValue []byte) { + value := getWithTxn(txn, key) + if !bytes.Equal(value, wantValue) { + t.Errorf("Wanted value to be %s for key %s, but got %s", wantValue, key, string(value)) + } + } + + rollback := func(txn *bolt.Tx) { + if err := txn.Rollback(); err != nil { + t.Fatal(err) + } + } + + put("k1", "v1") + rtx1 := openReadTxn() + put("k2", "v2") + hold1 := openReadTxn() + put("k3", "v3") + hold2 := openReadTxn() + del("k3") + rtx2 := openReadTxn() + del("k1") + hold3 := openReadTxn() + del("k2") + hold4 := openReadTxn() + put("k4", "v4") + hold5 := openReadTxn() + + // Close the read transactions we established to hold a portion of the pages in pending state. + rollback(hold1) + rollback(hold2) + rollback(hold3) + rollback(hold4) + rollback(hold5) + + // Execute a write transaction to trigger a releaseRange operation in the db + // that will free multiple ranges between the remaining open read transactions, now that the + // holds have been rolled back. + put("k4", "v4") + + // Check that all long running reads still read correct values. + checkWithReadTxn(rtx1, "k1", []byte("v1")) + checkWithReadTxn(rtx2, "k2", []byte("v2")) + rollback(rtx1) + rollback(rtx2) + + // Check that the final state is correct. + rtx7 := openReadTxn() + checkWithReadTxn(rtx7, "k1", nil) + checkWithReadTxn(rtx7, "k2", nil) + checkWithReadTxn(rtx7, "k3", nil) + checkWithReadTxn(rtx7, "k4", []byte("v4")) + rollback(rtx7) +} + func ExampleTx_Rollback() { // Open the database. db, err := bolt.Open(tempfile(), 0666, nil)