Skip to content

Commit

Permalink
meta/sql: use upsert in Write() to avoid possible deadlock (#4529)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD authored Mar 19, 2024
1 parent b7050cf commit 90e549b
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,28 @@ func (m *dbMeta) appendSlice(s *xorm.Session, inode Ino, indx uint32, buf []byte
return err
}

func (m *dbMeta) upsertSlice(s *xorm.Session, inode Ino, indx uint32, buf []byte, insert *bool) error {
var err error
driver := m.Name()
if driver == "sqlite3" || driver == "postgres" {
_, err = s.Exec(`
INSERT INTO jfs_chunk (inode, indx, slices)
VALUES (?, ?, ?)
ON CONFLICT (inode, indx)
DO UPDATE SET slices=jfs_chunk.slices || ?`, inode, indx, buf, buf)
} else {
var r sql.Result
r, err = s.Exec(`
INSERT INTO jfs_chunk (inode, indx, slices)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
slices=concat(slices, ?)`, inode, indx, buf, buf)
n, _ := r.RowsAffected()
*insert = n == 1 // https://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html
}
return err
}

func (m *dbMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, attr *Attr, skipPermCheck bool) syscall.Errno {
defer m.timeit("Truncate", time.Now())
f := m.of.find(inode)
Expand Down Expand Up @@ -2430,26 +2452,18 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
nodeAttr.Ctime = ctime.UnixNano() / 1e3
nodeAttr.Ctimensec = int16(ctime.Nanosecond())

var ck = chunk{Inode: inode, Indx: indx}
ok, err = s.ForUpdate().MustCols("indx").Get(&ck)
if err != nil {
return err
}
buf := marshalSlice(off, slice.Id, slice.Size, slice.Off, slice.Len)
if ok {
if err := m.appendSlice(s, inode, indx, buf); err != nil {
return err
}
} else {
if err = mustInsert(s, &chunk{Inode: inode, Indx: indx, Slices: buf}); err != nil {
return err
}
var insert bool // no compaction check for the first slice
if err = m.upsertSlice(s, inode, indx, buf, &insert); err != nil {
return err
}
if err = mustInsert(s, sliceRef{slice.Id, slice.Size, 1}); err != nil {
return err
}
_, err = s.Cols("length", "mtime", "ctime", "mtimensec", "ctimensec").Update(&nodeAttr, &node{Inode: inode})
if err == nil {
if err == nil && !insert {
ck := chunk{Inode: inode, Indx: indx}
_, _ = s.MustCols("indx").Get(&ck)
ns := len(ck.Slices) / sliceBytes // number of slices
needCompact = ns%100 == 99 || ns > 350
}
Expand Down

0 comments on commit 90e549b

Please sign in to comment.