Skip to content

Commit

Permalink
calculate sustained file
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro committed Dec 5, 2024
1 parent 23e50aa commit f779d6e
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 15 deletions.
4 changes: 2 additions & 2 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type engine interface {
// @trySync: try sync dir stat if broken or not existed
doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, syscall.Errno)
doSyncDirStat(ctx Context, ino Ino) (*dirStat, syscall.Errno)
doSyncUsedSpace() error
doSyncUsedSpace(ctx Context) error

scanTrashSlices(Context, trashSliceScan) error
scanPendingSlices(Context, pendingSliceScan) error
Expand Down Expand Up @@ -2016,7 +2016,7 @@ func (m *baseMeta) Check(ctx Context, fpath string, repair bool, recursive bool,
}
wg.Wait()
if fpath == "/" && repair && recursive && statAll {
if err := m.syncUsedSpace(); err != nil {
if err := m.syncUsedSpace(ctx); err != nil {
logger.Errorf("Sync used space: %s", err)
hasError = true
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (m *baseMeta) doFlushStats() {
m.en.doFlushStats()
m.fsStatsLock.Unlock()
}
func (m *baseMeta) syncUsedSpace() error {
return m.en.doSyncUsedSpace()
func (m *baseMeta) syncUsedSpace(ctx Context) error {
return m.en.doSyncUsedSpace(ctx)
}

func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parents ...Ino) syscall.Errno {
Expand Down
58 changes: 56 additions & 2 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,62 @@ func (m *redisMeta) updateStats(space int64, inodes int64) {
atomic.AddInt64(&m.usedInodes, inodes)
}

func (m *redisMeta) doSyncUsedSpace() error {
return nil
func (m *redisMeta) doSyncUsedSpace(ctx Context) error {
if m.conf.ReadOnly {
return syscall.EROFS
}
var used int64
if err := m.hscan(ctx, m.dirUsedSpaceKey(), func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
v, err := strconv.ParseInt(keys[i+1], 10, 64)
if err != nil {
logger.Warnf("invalid used space: %s->%s", keys[i], keys[i+1])
continue
}
used += v
}
return nil
}); err != nil {
return err
}

var inoKeys []string
if err := m.scan(ctx, m.prefix+"session*", func(keys []string) error {
for i := 0; i < len(keys); i += 2 {
key := keys[i]
if key == "sessions" {
continue
}

inodes, err := m.rdb.SMembers(ctx, key).Result()
if err != nil {
logger.Warnf("SMembers %s: %s", key, err)
continue
}
for _, sinode := range inodes {
ino, err := strconv.ParseInt(sinode, 10, 64)
if err != nil {
logger.Warnf("invalid sustained: %s->%s", key, sinode)
continue
}
inoKeys = append(inoKeys, m.inodeKey(Ino(ino)))
}
}
return nil
}); err != nil {
return err
}

values, err := m.rdb.MGet(ctx, inoKeys...).Result()
if err != nil {
return err
}
var attr Attr
for _, value := range values {
m.parseAttr([]byte(value.(string)), &attr)
used += align4K(attr.Length)
}
return m.rdb.Set(ctx, m.usedSpaceKey(), strconv.FormatInt(used, 10), 0).Err()
}

// redisMeta updates the usage in each transaction
Expand Down
31 changes: 29 additions & 2 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"runtime"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -925,9 +926,35 @@ func (m *dbMeta) updateStats(space int64, inodes int64) {
atomic.AddInt64(&m.newInodes, inodes)
}

func (m *dbMeta) doSyncUsedSpace() error {
func (m *dbMeta) doSyncUsedSpace(ctx Context) error {
if m.conf.ReadOnly {
return syscall.EROFS
}
var used int64
if err := m.roTxn(func(s *xorm.Session) error {
total, err := s.SumInt(&dirStats{}, "used_space")
used += total
return err
}); err != nil {
return err
}
if err := m.roTxn(func(s *xorm.Session) error {
queryMap, err := s.QueryString("SELECT SUM(length) FROM jfs_node WHERE inode IN (SELECT inode FROM jfs_sustained)")
if err != nil {
return err
}
value, err := strconv.ParseInt(queryMap[0]["SUM(length)"], 10, 64)
if err != nil {
return err
}
used += align4K(uint64(value))
return nil
}); err != nil {
return err
}

return m.txn(func(s *xorm.Session) error {
_, err := s.Exec("UPDATE jfs_counter SET value=(SELECT SUM(used_space) FROM jfs_dir_stats) WHERE name='usedSpace'")
_, err := s.Update(&counter{Value: used}, &counter{Name: usedSpace})
return err
})
}
Expand Down
38 changes: 31 additions & 7 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2746,20 +2746,44 @@ func (m *kvMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) {
return quotas, nil
}

func (m *kvMeta) doSyncUsedSpace() error {
var usedSpace int64
prefix := m.fmtKey("U")
err := m.client.txn(func(tx *kvTxn) error {
func (m *kvMeta) doSyncUsedSpace(ctx Context) error {
if m.conf.ReadOnly {
return syscall.EROFS
}
var used int64
if err := m.client.txn(func(tx *kvTxn) error {
prefix := m.fmtKey("U")
tx.scan(prefix, nextKey(prefix), false, func(k, v []byte) bool {
usedSpace += m.parseInt64(v)
stat := m.parseDirStat(v)
used += stat.space
return true
})
return nil
}, 0)
}, 0); err != nil {
return err
}
// need add sustained file size
vals, err := m.scanKeys(m.fmtKey("SS"))
if err != nil {
return err
}
return m.setValue(m.counterKey("usedSpace"), packCounter(usedSpace))
var attr Attr
for _, k := range vals {
b := utils.FromBuffer(k[2:])
if b.Len() != 16 {
logger.Warnf("Invalid sustainedKey: %v", k)
continue
}
_ = b.Get64()
inode := m.decodeInode(b.Get(8))
if eno := m.doGetAttr(ctx, inode, &attr); eno != 0 {
logger.Warnf("Get attr of inode %d: %s", inode, eno)
continue
}
used += align4K(attr.Length)
}

return m.setValue(m.counterKey(usedSpace), packCounter(used))
}

func (m *kvMeta) doFlushQuotas(ctx Context, quotas map[Ino]*Quota) error {
Expand Down

0 comments on commit f779d6e

Please sign in to comment.