From 84fc33503e8aa4437012c1132ecdf622b8ed6787 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 13 Dec 2024 15:12:53 +0800 Subject: [PATCH 1/3] meta/sql: simplify transaction for single query --- pkg/meta/sql.go | 106 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 34 deletions(-) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index d00d96db8bdd..344f0033ef19 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -394,7 +394,7 @@ func (m *dbMeta) doInit(format *Format, force bool) error { } var s = setting{Name: "format"} var ok bool - err := m.roTxn(Background(), func(ses *xorm.Session) (err error) { + err := m.simpleTxn(Background(), func(ses *xorm.Session) (err error) { ok, err = ses.Get(&s) return err }) @@ -483,7 +483,7 @@ func (m *dbMeta) cacheACLs(ctx Context) error { if !m.getFormat().EnableACL { return nil } - return m.roTxn(ctx, func(s *xorm.Session) error { + return m.simpleTxn(ctx, func(s *xorm.Session) error { return s.Table(&acl{}).Iterate(new(acl), func(idx int, bean interface{}) error { a := bean.(*acl) m.aclCache.Put(a.Id, a.toRule()) @@ -501,7 +501,7 @@ func (m *dbMeta) Reset() error { } func (m *dbMeta) doLoad() (data []byte, err error) { - err = m.roTxn(Background(), func(ses *xorm.Session) error { + err = m.simpleTxn(Background(), func(ses *xorm.Session) error { if ok, err := ses.IsTableExist(&setting{}); err != nil { return err } else if !ok { @@ -698,7 +698,7 @@ func (m *dbMeta) ListSessions() ([]*Session, error) { } func (m *dbMeta) getCounter(name string) (v int64, err error) { - err = m.roTxn(Background(), func(s *xorm.Session) error { + err = m.simpleTxn(Background(), func(s *xorm.Session) error { c := counter{Name: name} _, err := s.Get(&c) if err == nil { @@ -864,6 +864,11 @@ func (m *dbMeta) roTxn(ctx context.Context, f func(s *xorm.Session) error) error } var lastErr error for i := 0; i < maxRetry; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } err := s.BeginTx(&opt) if err != nil && opt.ReadOnly && (strings.Contains(err.Error(), "READ") || strings.Contains(err.Error(), "driver does not support read-only transactions")) { logger.Warnf("the database does not support read-only transaction") @@ -893,7 +898,40 @@ func (m *dbMeta) roTxn(ctx context.Context, f func(s *xorm.Session) error) error } return err } - logger.Warnf("Already tried 50 times, returning: %s", lastErr) + logger.Warnf("Already tried %d times, returning: %s", maxRetry, lastErr) + return lastErr +} + +func (m *dbMeta) simpleTxn(ctx context.Context, f func(s *xorm.Session) error) error { + start := time.Now() + defer func() { m.txDist.Observe(time.Since(start).Seconds()) }() + s := m.db.NewSession() + defer s.Close() + + var maxRetry int = 50 + var lastErr error + for i := 0; i < maxRetry; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + err := f(s) + if eno, ok := err.(syscall.Errno); ok && eno == 0 { + err = nil + } + if err != nil && m.shouldRetry(err) { + m.txRestart.Add(1) + logger.Debugf("Read transaction failed, restart it (tried %d): %s", i+1, err) + lastErr = err + time.Sleep(time.Millisecond * time.Duration(i*i)) + continue + } else if err == nil && i > 1 { + logger.Warnf("simple transaction succeeded after %d tries (%s), last error: %s", i+1, time.Since(start), lastErr) + } + return err + } + logger.Warnf("Already tried %d times, returning: %s", maxRetry, lastErr) return lastErr } @@ -954,14 +992,14 @@ func (m *dbMeta) doSyncUsedSpace(ctx Context) error { return syscall.EROFS } var used int64 - if err := m.roTxn(ctx, func(s *xorm.Session) error { + if err := m.simpleTxn(ctx, func(s *xorm.Session) error { total, err := s.SumInt(&dirStats{}, "used_space") used += total return err }); err != nil { return err } - if err := m.roTxn(ctx, func(s *xorm.Session) error { + if err := m.simpleTxn(ctx, func(s *xorm.Session) error { queryResultMap, err := s.QueryString("SELECT length FROM jfs_node WHERE inode IN (SELECT inode FROM jfs_sustained)") if err != nil { return err @@ -1010,7 +1048,7 @@ func (m *dbMeta) doFlushStats() { } func (m *dbMeta) doLookup(ctx Context, parent Ino, name string, inode *Ino, attr *Attr) syscall.Errno { - return errno(m.roTxn(ctx, func(s *xorm.Session) error { + return errno(m.simpleTxn(ctx, func(s *xorm.Session) error { s = s.Table(&edge{}) nn := namedNode{node: node{Parent: parent}, Name: []byte(name)} var exist bool @@ -1034,7 +1072,7 @@ func (m *dbMeta) doLookup(ctx Context, parent Ino, name string, inode *Ino, attr } func (m *dbMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { - return errno(m.roTxn(ctx, func(s *xorm.Session) error { + return errno(m.simpleTxn(ctx, func(s *xorm.Session) error { var n = node{Inode: inode} ok, err := s.Get(&n) if err != nil { @@ -1277,7 +1315,7 @@ func (m *dbMeta) doFallocate(ctx Context, inode Ino, mode uint8, off uint64, siz func (m *dbMeta) doReadlink(ctx Context, inode Ino, noatime bool) (atime int64, target []byte, err error) { if noatime { - err = m.roTxn(ctx, func(s *xorm.Session) error { + err = m.simpleTxn(ctx, func(s *xorm.Session) error { var l = symlink{Inode: inode} ok, err := s.Get(&l) if err == nil && ok { @@ -2155,7 +2193,7 @@ func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) } func (m *dbMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry, limit int) syscall.Errno { - return errno(m.roTxn(ctx, func(s *xorm.Session) error { + return errno(m.simpleTxn(ctx, func(s *xorm.Session) error { s = s.Table(&edge{}) if plus != 0 { s = s.Join("INNER", &node{}, "jfs_edge.inode=jfs_node.inode") @@ -2206,7 +2244,7 @@ func (m *dbMeta) doCleanStaleSession(sid uint64) error { } var sus []sustained - err = m.roTxn(Background(), func(ses *xorm.Session) error { + err = m.simpleTxn(Background(), func(ses *xorm.Session) error { sus = nil return ses.Find(&sus, &sustained{Sid: sid}) }) @@ -2242,7 +2280,7 @@ func (m *dbMeta) doCleanStaleSession(sid uint64) error { func (m *dbMeta) doFindStaleSessions(limit int) ([]uint64, error) { var sids []uint64 - _ = m.roTxn(Background(), func(ses *xorm.Session) error { + _ = m.simpleTxn(Background(), func(ses *xorm.Session) error { var ss []session2 err := ses.Where("Expire < ?", time.Now().Unix()).Limit(limit, 0).Find(&ss) if err != nil { @@ -2259,7 +2297,7 @@ func (m *dbMeta) doFindStaleSessions(limit int) ([]uint64, error) { return sids, nil } - err := m.roTxn(Background(), func(ses *xorm.Session) error { + err := m.simpleTxn(Background(), func(ses *xorm.Session) error { if ok, err := ses.IsTableExist(&session{}); err != nil { return err } else if ok { @@ -2325,7 +2363,7 @@ func (m *dbMeta) doDeleteSustainedInode(sid uint64, inode Ino) error { func (m *dbMeta) doRead(ctx Context, inode Ino, indx uint32) ([]*slice, syscall.Errno) { var c = chunk{Inode: inode, Indx: indx} - if err := m.roTxn(ctx, func(s *xorm.Session) error { + if err := m.simpleTxn(ctx, func(s *xorm.Session) error { _, err := s.MustCols("indx").Get(&c) return err }); err != nil { @@ -2540,7 +2578,7 @@ func (m *dbMeta) getParents(s *xorm.Session, inode, parent Ino) []Ino { func (m *dbMeta) doGetParents(ctx Context, inode Ino) map[Ino]int { var rows []edge - if err := m.roTxn(ctx, func(s *xorm.Session) error { + if err := m.simpleTxn(ctx, func(s *xorm.Session) error { rows = nil return s.Find(&rows, &edge{Inode: inode}) }); err != nil { @@ -2628,7 +2666,7 @@ func (m *dbMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, sys st := dirStats{Inode: ino} var exist bool var err error - if err = m.roTxn(ctx, func(s *xorm.Session) error { + if err = m.simpleTxn(ctx, func(s *xorm.Session) error { exist, err = s.Get(&st) return err }); err != nil { @@ -2667,7 +2705,7 @@ func (m *dbMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, sys func (m *dbMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) { files := make(map[Ino]uint64) - err := m.roTxn(Background(), func(s *xorm.Session) error { + err := m.simpleTxn(Background(), func(s *xorm.Session) error { var ds []delfile err := s.Where("expire < ?", ts).Limit(limit, 0).Find(&ds) if err != nil { @@ -2683,7 +2721,7 @@ func (m *dbMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) func (m *dbMeta) doCleanupSlices() { var cks []sliceRef - _ = m.roTxn(Background(), func(s *xorm.Session) error { + _ = m.simpleTxn(Background(), func(s *xorm.Session) error { cks = nil return s.Where("refs <= 0").Find(&cks) }) @@ -2732,7 +2770,7 @@ func (m *dbMeta) deleteChunk(inode Ino, indx uint32) error { continue } var ref = sliceRef{Id: s.id} - err := m.roTxn(Background(), func(s *xorm.Session) error { + err := m.simpleTxn(Background(), func(s *xorm.Session) error { ok, err := s.Get(&ref) if err == nil && !ok { err = errors.New("not found") @@ -2748,7 +2786,7 @@ func (m *dbMeta) deleteChunk(inode Ino, indx uint32) error { func (m *dbMeta) doDeleteFileData(inode Ino, length uint64) { var indexes []chunk - _ = m.roTxn(Background(), func(s *xorm.Session) error { + _ = m.simpleTxn(Background(), func(s *xorm.Session) error { indexes = nil return s.Cols("indx").Find(&indexes, &chunk{Inode: inode}) }) @@ -2772,7 +2810,7 @@ func (m *dbMeta) doCleanupDelayedSlices(edge int64) (int, error) { var result []delslices var batch int = 1e6 for { - _ = m.roTxn(Background(), func(s *xorm.Session) error { + _ = m.simpleTxn(Background(), func(s *xorm.Session) error { result = result[:0] return s.Where("deleted < ?", edge).Limit(batch, 0).Find(&result) }) @@ -2803,7 +2841,7 @@ func (m *dbMeta) doCleanupDelayedSlices(edge int64) (int, error) { } for _, s := range ss { var ref = sliceRef{Id: s.Id} - err := m.roTxn(Background(), func(s *xorm.Session) error { + err := m.simpleTxn(Background(), func(s *xorm.Session) error { ok, err := s.Get(&ref) if err == nil && !ok { err = errors.New("not found") @@ -2864,7 +2902,7 @@ func (m *dbMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*sli // there could be false-negative that the compaction is successful, double-check if st != 0 && st != syscall.EINVAL { var ok bool - if err := m.roTxn(Background(), func(s *xorm.Session) error { + if err := m.simpleTxn(Background(), func(s *xorm.Session) error { var e error ok, e = s.Get(&sliceRef{Id: id}) return e @@ -2889,7 +2927,7 @@ func (m *dbMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*sli } var ref = sliceRef{Id: s.id} var ok bool - err := m.roTxn(Background(), func(s *xorm.Session) error { + err := m.simpleTxn(Background(), func(s *xorm.Session) error { var e error ok, e = s.Get(&ref) return e @@ -2925,7 +2963,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, de if delete { m.doCleanupSlices() } - err := m.roTxn(ctx, func(s *xorm.Session) error { + err := m.simpleTxn(ctx, func(s *xorm.Session) error { var cs []chunk err := s.Find(&cs) if err != nil { @@ -2953,7 +2991,7 @@ func (m *dbMeta) ListSlices(ctx Context, slices map[Ino][]Slice, scanPending, de } if scanPending { - _ = m.roTxn(ctx, func(s *xorm.Session) error { + _ = m.simpleTxn(ctx, func(s *xorm.Session) error { var cks []sliceRef err := s.Where("refs <= 0").Find(&cks) if err != nil { @@ -2986,7 +3024,7 @@ func (m *dbMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { } var dss []delslices - err := m.roTxn(ctx, func(tx *xorm.Session) error { + err := m.simpleTxn(ctx, func(tx *xorm.Session) error { if ok, err := tx.IsTableExist(&delslices{}); err != nil { return err } else if !ok { @@ -3031,7 +3069,7 @@ func (m *dbMeta) scanTrashSlices(ctx Context, scan trashSliceScan) error { if clean { for _, s := range ss { var ref = sliceRef{Id: s.Id} - err := m.roTxn(ctx, func(tx *xorm.Session) error { + err := m.simpleTxn(ctx, func(tx *xorm.Session) error { ok, err := tx.Get(&ref) if err == nil && !ok { err = errors.New("not found") @@ -3052,7 +3090,7 @@ func (m *dbMeta) scanPendingSlices(ctx Context, scan pendingSliceScan) error { return nil } var refs []sliceRef - err := m.roTxn(ctx, func(tx *xorm.Session) error { + err := m.simpleTxn(ctx, func(tx *xorm.Session) error { if ok, err := tx.IsTableExist(&sliceRef{}); err != nil { return err } else if !ok { @@ -3083,7 +3121,7 @@ func (m *dbMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { } var dfs []delfile - err := m.roTxn(ctx, func(s *xorm.Session) error { + err := m.simpleTxn(ctx, func(s *xorm.Session) error { if ok, err := s.IsTableExist(&delfile{}); err != nil { return err } else if !ok { @@ -3149,7 +3187,7 @@ func (m *dbMeta) doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno { func (m *dbMeta) GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno { defer m.timeit("GetXattr", time.Now()) inode = m.checkRoot(inode) - return errno(m.roTxn(ctx, func(s *xorm.Session) error { + return errno(m.simpleTxn(ctx, func(s *xorm.Session) error { var x = xattr{Inode: inode, Name: name} ok, err := s.Get(&x) if err != nil { @@ -3238,7 +3276,7 @@ func (m *dbMeta) doRemoveXattr(ctx Context, inode Ino, name string) syscall.Errn func (m *dbMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) { var quota *Quota - return quota, m.roTxn(ctx, func(s *xorm.Session) error { + return quota, m.simpleTxn(ctx, func(s *xorm.Session) error { q := dirQuota{Inode: inode} ok, e := s.Get(&q) if e == nil && ok { @@ -3303,7 +3341,7 @@ func (m *dbMeta) doDelQuota(ctx Context, inode Ino) error { func (m *dbMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) { var rows []dirQuota - err := m.roTxn(ctx, func(s *xorm.Session) error { + err := m.simpleTxn(ctx, func(s *xorm.Session) error { rows = rows[:0] return s.Find(&rows) }) From 22e4a2f9f8a75b6929665bd1517453f4cb9340e5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 13 Dec 2024 21:24:38 +0800 Subject: [PATCH 2/3] session pool --- pkg/meta/sql.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 344f0033ef19..1c49e6163855 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -234,8 +234,9 @@ type dirQuota struct { type dbMeta struct { *baseMeta - db *xorm.Engine - snap *dbSnap + db *xorm.Engine + spool *sync.Pool + snap *dbSnap noReadOnlyTxn bool } @@ -326,6 +327,15 @@ func newSQLMeta(driver, addr string, conf *Config) (Meta, error) { baseMeta: newBaseMeta(addr, conf), db: engine, } + m.spool = &sync.Pool{ + New: func() interface{} { + s := engine.NewSession() + runtime.SetFinalizer(s, func(s *xorm.Session) { + _ = s.Close() + }) + return s + }, + } m.en = m return m, nil } @@ -847,7 +857,8 @@ func (m *dbMeta) roTxn(ctx context.Context, f func(s *xorm.Session) error) error s = v.(*xorm.Session) } else { s = m.db.NewSession() - defer s.Close() + s = m.spool.Get().(*xorm.Session) + defer m.spool.Put(s) } var opt sql.TxOptions if !m.noReadOnlyTxn { @@ -905,8 +916,8 @@ func (m *dbMeta) roTxn(ctx context.Context, f func(s *xorm.Session) error) error func (m *dbMeta) simpleTxn(ctx context.Context, f func(s *xorm.Session) error) error { start := time.Now() defer func() { m.txDist.Observe(time.Since(start).Seconds()) }() - s := m.db.NewSession() - defer s.Close() + s := m.spool.Get().(*xorm.Session) + defer m.spool.Put(s) var maxRetry int = 50 var lastErr error From b36f084dac3659455d43e1ceb591ab5cf0502387 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 13 Dec 2024 21:58:35 +0800 Subject: [PATCH 3/3] fix lint --- pkg/meta/sql.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 1c49e6163855..02696b21ab0c 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -856,7 +856,6 @@ func (m *dbMeta) roTxn(ctx context.Context, f func(s *xorm.Session) error) error if v := ctx.Value(txSessionKey{}); v != nil { s = v.(*xorm.Session) } else { - s = m.db.NewSession() s = m.spool.Get().(*xorm.Session) defer m.spool.Put(s) }