Skip to content

Commit

Permalink
Merge pull request #12 from zuoyebang/dev
Browse files Browse the repository at this point in the history
v1.1.4
  • Loading branch information
xingfu89 authored Jul 2, 2024
2 parents adb3ce0 + 7e5424a commit c2459f1
Show file tree
Hide file tree
Showing 8 changed files with 596 additions and 114 deletions.
388 changes: 388 additions & 0 deletions batch_bitower.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,20 @@ func TestBatchBitowerMemTableSizeOverflow(t *testing.T) {
require.NoError(t, db.Close())
}

func TestFlushableBatchBitowerReset(t *testing.T) {
var b BatchBitower
b.flushable = newFlushableBatchBitower(&b, DefaultComparer)

b.Reset()
require.Nil(t, b.flushable)
}

func TestEmptyFlushableBatchBitower(t *testing.T) {
fb := newFlushableBatchBitower(newBatchBitower(nil), DefaultComparer)
it := newInternalIterAdapter(fb.newIter(nil))
require.False(t, it.First())
}

func BenchmarkBatchBitowerSet(b *testing.B) {
value := make([]byte, 10)
for i := range value {
Expand Down
37 changes: 37 additions & 0 deletions bitalos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,43 @@ func TestBatchSetMulti(t *testing.T) {
}
}

func TestLargeBatch(t *testing.T) {
defer os.RemoveAll(testDirname)
os.RemoveAll(testDirname)

db := openTestDB(testDirname, nil)
defer func() {
require.NoError(t, db.Close())
}()

num := 100
kvList := make(map[string][]byte, num)
for i := 0; i < 20; i++ {
b := db.NewBatchBitower()
key := makeTestSlotIntKey(i)
val := testRandBytes(100)
_ = b.Set(key, val, NoSync)
kvList[unsafe2.String(key)] = val
require.NoError(t, b.Commit(NoSync))
require.NoError(t, b.Close())
}

b := db.NewBatchBitower()
for i := 20; i < num; i++ {
key := makeTestSlotIntKey(i)
val := testRandBytes(1 << 20)
_ = b.Set(key, val, NoSync)
kvList[unsafe2.String(key)] = val
}
require.NoError(t, b.Commit(NoSync))
require.NoError(t, b.Close())

for i := 0; i < 100; i++ {
key := makeTestSlotIntKey(i)
require.NoError(t, verifyGet(db, key, kvList[string(key)]))
}
}

func TestBithashWriteRead(t *testing.T) {
defer os.RemoveAll(testDirname)
os.RemoveAll(testDirname)
Expand Down
159 changes: 98 additions & 61 deletions bitower.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,36 +169,38 @@ func openBitower(d *DB, index int) (*Bitower, error) {
}
}

newLogNum := s.mu.metaEdit.GetNextFileNum()
sme := &bitowerMetaEditor{MinUnflushedLogNum: newLogNum}
if err = s.metaApply(sme); err != nil {
return nil, err
}
if !d.opts.DisableWAL {
newLogNum := s.mu.metaEdit.GetNextFileNum()
sme := &bitowerMetaEditor{MinUnflushedLogNum: newLogNum}
if err = s.metaApply(sme); err != nil {
return nil, err
}

newLogName := s.makeWalFilename(newLogNum)
s.mu.log.queue = append(s.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: 0})
logFile, err := d.opts.FS.Create(newLogName)
if err != nil {
return nil, err
}
if err = s.walDir.Sync(); err != nil {
return nil, err
}
newLogName := s.makeWalFilename(newLogNum)
s.mu.log.queue = append(s.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: 0})
logFile, err := d.opts.FS.Create(newLogName)
if err != nil {
return nil, err
}
if err = s.walDir.Sync(); err != nil {
return nil, err
}

d.opts.EventListener.WALCreated(WALCreateInfo{
Index: index,
Path: newLogName,
FileNum: newLogNum,
})
d.opts.EventListener.WALCreated(WALCreateInfo{
Index: index,
Path: newLogName,
FileNum: newLogNum,
})

s.mu.mem.queue[len(s.mu.mem.queue)-1].logNum = newLogNum
s.mu.mem.queue[len(s.mu.mem.queue)-1].logNum = newLogNum

logFile = vfs.NewSyncingFile(logFile, vfs.SyncingFileOptions{
BytesPerSync: d.opts.WALBytesPerSync,
PreallocateSize: s.walPreallocateSize(),
})
s.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum)
s.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval)
logFile = vfs.NewSyncingFile(logFile, vfs.SyncingFileOptions{
BytesPerSync: d.opts.WALBytesPerSync,
PreallocateSize: s.walPreallocateSize(),
})
s.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum)
s.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval)
}

s.updateReadState()

Expand Down Expand Up @@ -275,22 +277,31 @@ func (s *Bitower) replayWAL(filename string, logNum FileNum) (maxSeqNum uint64,
seqNum := b.SeqNum()
maxSeqNum = seqNum + uint64(b.Count())

ensureMem(seqNum)
if err = mem.prepare(&b, false); err != nil && err != arenaskl.ErrArenaFull {
return 0, err
}
for err == arenaskl.ErrArenaFull {
if b.memTableSize >= uint64(s.db.largeBatchThreshold) {
flushMem()
b.data = append([]byte(nil), b.data...)
b.flushable = newFlushableBatchBitower(&b, s.db.opts.Comparer)
entry := newFlushableEntry(b.flushable, logNum, b.SeqNum())
entry.readerRefs.Add(1)
toFlush = append(toFlush, entry)
} else {
ensureMem(seqNum)
err = mem.prepare(&b, false)
if err != nil && err != arenaskl.ErrArenaFull {
if err = mem.prepare(&b, false); err != nil && err != arenaskl.ErrArenaFull {
return 0, err
}
for err == arenaskl.ErrArenaFull {
flushMem()
ensureMem(seqNum)
err = mem.prepare(&b, false)
if err != nil && err != arenaskl.ErrArenaFull {
return 0, err
}
}
if err = mem.apply(&b, seqNum); err != nil {
return 0, err
}
mem.writerUnref()
}
if err = mem.apply(&b, seqNum); err != nil {
return 0, err
}
mem.writerUnref()
buf.Reset()
}
flushMem()
Expand Down Expand Up @@ -479,25 +490,27 @@ func (s *Bitower) newBitreeIter(o *options.IterOptions) (iters []base.InternalIt
}

func (s *Bitower) checkpoint(fs vfs.FS, destDir string, isSync bool) error {
s.mu.Lock()
memQueue := s.mu.mem.queue
s.mu.Unlock()
if !s.db.opts.DisableWAL {
s.mu.Lock()
memQueue := s.mu.mem.queue
s.mu.Unlock()

if isSync {
if err := s.db.LogData(nil, s.index, Sync); err != nil {
return err
if isSync {
if err := s.db.LogData(nil, s.index, Sync); err != nil {
return err
}
}
}

for i := range memQueue {
logNum := memQueue[i].logNum
if logNum == 0 {
continue
}
srcPath := base.MakeFilepath(fs, s.walDirname, fileTypeLog, logNum)
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
if err := vfs.Copy(fs, srcPath, destPath); err != nil {
return err
for i := range memQueue {
logNum := memQueue[i].logNum
if logNum == 0 {
continue
}
srcPath := base.MakeFilepath(fs, s.walDirname, fileTypeLog, logNum)
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
if err := vfs.Copy(fs, srcPath, destPath); err != nil {
return err
}
}
}

Expand Down Expand Up @@ -525,7 +538,9 @@ func (s *Bitower) Close() (err error) {
s.mu.compact.cond.Wait()
}

err = utils.FirstError(err, s.mu.log.Close())
if s.mu.log.LogWriter != nil {
err = utils.FirstError(err, s.mu.log.Close())
}

s.readState.val.unref()

Expand All @@ -544,6 +559,10 @@ func (s *Bitower) Close() (err error) {
}

func (s *Bitower) commitApply(b *BatchBitower, mem *memTable) error {
if b.flushable != nil {
return nil
}

err := mem.apply(b, b.SeqNum())
if err != nil {
return err
Expand All @@ -560,12 +579,18 @@ func (s *Bitower) commitApply(b *BatchBitower, mem *memTable) error {
func (s *Bitower) commitWrite(b *BatchBitower, syncWG *sync.WaitGroup, syncErr *error) (*memTable, error) {
repr := b.Repr()

s.mu.Lock()
if b.flushable != nil {
b.flushable.setSeqNum(b.SeqNum())
if !s.db.opts.DisableWAL {
if _, err := s.mu.log.SyncRecord(repr, syncWG, syncErr); err != nil {
return nil, err
}
}
}

s.mu.Lock()
err := s.makeRoomForWrite(b, true)

mem := s.mu.mem.mutable

s.mu.Unlock()
if err != nil {
return nil, err
Expand All @@ -575,22 +600,24 @@ func (s *Bitower) commitWrite(b *BatchBitower, syncWG *sync.WaitGroup, syncErr *
return mem, nil
}

if _, err = s.mu.log.SyncRecord(repr, syncWG, syncErr); err != nil {
return nil, err
if b.flushable == nil {
if _, err = s.mu.log.SyncRecord(repr, syncWG, syncErr); err != nil {
return nil, err
}
}

return mem, err
}

func (s *Bitower) makeRoomForWrite(b *BatchBitower, needReport bool) error {
force := b == nil
force := b == nil || b.flushable != nil
stalled := false
for {
if s.mu.mem.switching {
s.mu.mem.cond.Wait()
continue
}
if b != nil {
if b != nil && b.flushable == nil {
err := s.mu.mem.mutable.prepare(b, true)
if err != arenaskl.ErrArenaFull && err != errMemExceedDelPercent {
if stalled {
Expand Down Expand Up @@ -709,9 +736,19 @@ func (s *Bitower) makeRoomForWrite(b *BatchBitower, needReport bool) error {
imm.logSize = prevLogSize
imm.flushForced = imm.flushForced || (b == nil)

if b != nil && b.flushable != nil {
entry := newFlushableEntry(b.flushable, imm.logNum, b.SeqNum())
entry.releaseMemAccounting = func() {}
s.mu.mem.queue = append(s.mu.mem.queue, entry)
imm.logNum = 0
}

var logSeqNum uint64
if b != nil {
logSeqNum = b.SeqNum()
if b.flushable != nil {
logSeqNum += uint64(b.Count())
}
} else {
logSeqNum = atomic.LoadUint64(&s.db.meta.atomic.logSeqNum)
}
Expand Down
3 changes: 2 additions & 1 deletion bitree/bdb/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"unsafe"

"github.com/cockroachdb/errors"
"github.com/zuoyebang/bitalosdb/internal/consts"
)

const (
MaxKeySize = 10 << 10
MaxKeySize = consts.MaxKeySize + 1<<8
MaxValueSize = (1 << 31) - 2
)

Expand Down
Loading

0 comments on commit c2459f1

Please sign in to comment.