Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor for chunk compaction #4585

Merged
merged 5 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 123 additions & 3 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type engine interface {
doInit(format *Format, force bool) error

scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) error
compactChunk(inode Ino, indx uint32, once, force bool)
doRead(ctx Context, inode Ino, indx uint32) ([]*slice, syscall.Errno)
doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*slice, skipped int, pos uint32, id uint64, size uint32, delayed []byte) syscall.Errno
doDeleteSustainedInode(sid uint64, inode Ino) error
doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) // limit < 0 means all
doDeleteFileData(inode Ino, length uint64)
Expand Down Expand Up @@ -1777,6 +1778,38 @@ func (m *baseMeta) InvalidateChunkCache(ctx Context, inode Ino, indx uint32) sys
return 0
}

func (m *baseMeta) Read(ctx Context, inode Ino, indx uint32, slices *[]Slice) (rerr syscall.Errno) {
defer func() {
if rerr == 0 {
m.touchAtime(ctx, inode, nil)
}
}()

if slices != nil {
*slices = nil
}
f := m.of.find(inode)
if f != nil {
f.RLock()
defer f.RUnlock()
}
if ss, ok := m.of.ReadChunk(inode, indx); ok {
*slices = ss
return 0
}
defer m.timeit("Read", time.Now())
ss, err := m.en.doRead(ctx, inode, indx)
if err != 0 {
return err
}
*slices = buildSlice(ss)
m.of.CacheChunk(inode, indx, *slices)
if !m.conf.ReadOnly && (len(ss) >= 5 || len(*slices) >= 5) {
go m.compactChunk(inode, indx, false, false)
}
return 0
}

func (m *baseMeta) NewSlice(ctx Context, id *uint64) syscall.Errno {
m.freeMu.Lock()
defer m.freeMu.Unlock()
Expand Down Expand Up @@ -2256,7 +2289,7 @@ func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall.
go func() {
for c := range ch {
logger.Debugf("Compacting chunk %d:%d (%d slices)", c.inode, c.indx, c.slices)
m.en.compactChunk(c.inode, c.indx, false, true)
m.compactChunk(c.inode, c.indx, false, true)
bar.Increment()
}
wg.Done()
Expand All @@ -2273,6 +2306,93 @@ func (m *baseMeta) CompactAll(ctx Context, threads int, bar *utils.Bar) syscall.
return 0
}

func (m *baseMeta) compactChunk(inode Ino, indx uint32, once, force bool) {
// avoid too many or duplicated compaction
k := uint64(inode) + (uint64(indx) << 40)
m.Lock()
if once || force {
for m.compacting[k] {
m.Unlock()
time.Sleep(time.Millisecond * 10)
m.Lock()
}
} else if len(m.compacting) > 10 || m.compacting[k] {
m.Unlock()
return
}
m.compacting[k] = true
m.Unlock()
defer func() {
m.Lock()
delete(m.compacting, k)
m.Unlock()
}()

ss, _ := m.en.doRead(Background, inode, indx)
if once && len(ss) < maxSlices {
return
}
if len(ss) > maxCompactSlices {
ss = ss[:maxCompactSlices]
}
skipped := skipSome(ss)
var first, last *slice
if skipped > 0 {
first, last = ss[0], ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
return
}
if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size))
}

var id uint64
ctx := Background
st := m.NewSlice(ctx, &id)
if st != 0 {
return
}
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size)
err := m.newMsg(CompactChunk, slices, id)
if err != nil {
if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") {
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err)
}
return
}

var dsbuf []byte
trash := m.toTrash(0)
if trash {
dsbuf = make([]byte, 0)
for _, s := range ss {
if s.id > 0 {
dsbuf = append(dsbuf, m.encodeDelayedSlice(s.id, s.size)...)
}
}
}

var origin []byte
for _, s := range ss {
origin = append(origin, marshalSlice(s.pos, s.id, s.size, s.off, s.len)...)
}
errno := m.en.doCompactChunk(inode, indx, origin, ss, skipped, pos, id, size, dsbuf)
if errno == syscall.EINVAL {
logger.Infof("compaction for %d:%d is wasted, delete slice %d (%d bytes)", inode, indx, id, size)
m.deleteSlice(id, size)
} else if errno == 0 {
m.of.InvalidateChunk(inode, indx)
} else {
logger.Warnf("compact %d %d: %s", inode, indx, err)
}
if force {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
m.compactChunk(inode, indx, once, force)
}
}

func (m *baseMeta) Compact(ctx Context, inode Ino, concurrency int, preFunc, postFunc func()) syscall.Errno {
var attr Attr
if st := m.GetAttr(ctx, inode, &attr); st != 0 {
Expand All @@ -2288,7 +2408,7 @@ func (m *baseMeta) Compact(ctx Context, inode Ino, concurrency int, preFunc, pos
go func() {
defer wg.Done()
for c := range chunkChan {
m.en.compactChunk(c.inode, c.indx, false, true)
m.compactChunk(c.inode, c.indx, false, true)
postFunc()
}
}()
Expand Down
151 changes: 24 additions & 127 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2297,51 +2297,27 @@ func (m *redisMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
return err
}

func (m *redisMeta) Read(ctx Context, inode Ino, indx uint32, slices *[]Slice) (rerr syscall.Errno) {
defer func() {
if rerr == 0 {
m.touchAtime(ctx, inode, nil)
}
}()

if slices != nil {
*slices = nil
}
f := m.of.find(inode)
if f != nil {
f.RLock()
defer f.RUnlock()
}
if ss, ok := m.of.ReadChunk(inode, indx); ok {
*slices = ss
return 0
}
defer m.timeit("Read", time.Now())
func (m *redisMeta) doRead(ctx Context, inode Ino, indx uint32) ([]*slice, syscall.Errno) {
vals, err := m.rdb.LRange(ctx, m.chunkKey(inode, indx), 0, -1).Result()
if err != nil {
return errno(err)
return nil, errno(err)
}
if len(vals) == 0 {
SandyXSD marked this conversation as resolved.
Show resolved Hide resolved
var attr Attr
eno := m.doGetAttr(ctx, inode, &attr)
if eno != 0 {
return eno
return nil, eno
}
if attr.Typ != TypeFile {
return syscall.EPERM
return nil, syscall.EPERM
}
return 0
return nil, 0
}
ss := readSlices(vals)
if ss == nil {
return syscall.EIO
return nil, syscall.EIO
}
*slices = buildSlice(ss)
m.of.CacheChunk(inode, indx, *slices)
if !m.conf.ReadOnly && (len(vals) >= 5 || len(*slices) >= 5) {
go m.compactChunk(inode, indx, false, false)
}
return 0
return ss, 0
}

func (m *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Slice, mtime time.Time) syscall.Errno {
Expand Down Expand Up @@ -3000,109 +2976,41 @@ func (r *redisMeta) doCleanupDelayedSlices(edge int64) (int, error) {
return count, err
}

func (m *redisMeta) compactChunk(inode Ino, indx uint32, once, force bool) {
// avoid too many or duplicated compaction
k := uint64(inode) + (uint64(indx) << 40)
m.Lock()
if once || force {
for m.compacting[k] {
m.Unlock()
time.Sleep(time.Millisecond * 10)
m.Lock()
}
} else if len(m.compacting) > 10 || m.compacting[k] {
m.Unlock()
return
}
m.compacting[k] = true
defer func() {
m.Lock()
delete(m.compacting, k)
m.Unlock()
}()
m.Unlock()

var ctx = Background
if once && m.rdb.LLen(ctx, m.chunkKey(inode, indx)).Val() < int64(maxSlices) {
return
}
vals, err := m.rdb.LRange(ctx, m.chunkKey(inode, indx), 0, int64(maxCompactSlices)).Result()
if err != nil {
return
}

ss := readSlices(vals)
if ss == nil {
logger.Errorf("Corrupt value for inode %d chunk indx %d", inode, indx)
return
}
skipped := skipSome(ss)
var first, last *slice
if skipped > 0 {
first, last = ss[0], ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
return
}
if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size))
}

var id uint64
st := m.NewSlice(ctx, &id)
if st != 0 {
return
}
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size)
err = m.newMsg(CompactChunk, slices, id)
if err != nil {
if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") {
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err)
}
return
}
var buf []byte // trash enabled: track delayed slices
func (m *redisMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*slice, skipped int, pos uint32, id uint64, size uint32, delayed []byte) syscall.Errno {
var rs []*redis.IntCmd // trash disabled: check reference of slices
trash := m.toTrash(0)
if trash {
for _, s := range ss {
if s.id > 0 {
buf = append(buf, m.encodeDelayedSlice(s.id, s.size)...)
}
}
} else {
rs = make([]*redis.IntCmd, len(ss))
if delayed == nil {
rs = make([]*redis.IntCmd, len(ss)-skipped)
}
key := m.chunkKey(inode, indx)
ctx := Background
errno := errno(m.txn(ctx, func(tx *redis.Tx) error {
vals2, err := tx.LRange(ctx, key, 0, int64(len(vals)-1)).Result()
n := len(origin) / sliceBytes
vals2, err := tx.LRange(ctx, key, 0, int64(n-1)).Result()
if err != nil {
return err
}
if len(vals2) != len(vals) {
if len(vals2) != n {
return syscall.EINVAL
}
for i, val := range vals2 {
if val != vals[i] {
if val != string(origin[i*sliceBytes:(i+1)*sliceBytes]) {
return syscall.EINVAL
}
}

_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.LTrim(ctx, key, int64(len(vals)), -1)
pipe.LTrim(ctx, key, int64(n), -1)
pipe.LPush(ctx, key, marshalSlice(pos, id, size, 0, size))
for i := skipped; i > 0; i-- {
pipe.LPush(ctx, key, vals[i-1])
pipe.LPush(ctx, key, origin[(i-1)*sliceBytes:i*sliceBytes])
}
pipe.HSet(ctx, m.sliceRefs(), m.sliceKey(id, size), "0") // create the key to tracking it
if trash {
if len(buf) > 0 {
pipe.HSet(ctx, m.delSlices(), fmt.Sprintf("%d_%d", id, time.Now().Unix()), buf)
if delayed != nil {
if len(delayed) > 0 {
pipe.HSet(ctx, m.delSlices(), fmt.Sprintf("%d_%d", id, time.Now().Unix()), delayed)
}
} else {
for i, s := range ss {
for i, s := range ss[skipped:] {
if s.id > 0 {
rs[i] = pipe.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(s.id, s.size), -1)
}
Expand All @@ -3123,28 +3031,17 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, once, force bool) {

if errno == syscall.EINVAL {
m.rdb.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(id, size), -1)
logger.Infof("compaction for %d:%d is wasted, delete slice %d (%d bytes)", inode, indx, id, size)
m.deleteSlice(id, size)
} else if errno == 0 {
m.of.InvalidateChunk(inode, indx)
m.cleanupZeroRef(m.sliceKey(id, size))
if !trash {
for i, s := range ss {
if rs != nil {
for i, s := range ss[skipped:] {
if s.id > 0 && rs[i].Err() == nil && rs[i].Val() < 0 {
m.deleteSlice(s.id, s.size)
}
}
}
} else {
logger.Warnf("compact %s: %s", key, errno)
}

if force {
m.Lock()
delete(m.compacting, k)
m.Unlock()
m.compactChunk(inode, indx, once, force)
}
return errno
}

func (m *redisMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar) error {
Expand Down
Loading
Loading