Skip to content

Commit

Permalink
Hadoop: use auto increment fs handle to avoid possible conflict (#4411)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng authored Feb 21, 2024
1 parent 38f279a commit 05cb243
Showing 1 changed file with 39 additions and 38 deletions.
77 changes: 39 additions & 38 deletions sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ var (
openFiles = make(map[int]*fwrapper)
nextHandle = 1

fslock sync.Mutex
handlers = make(map[uintptr]*wrapper)
activefs = make(map[string][]*wrapper)
logger = utils.GetLogger("juicefs")
bOnce sync.Once
bridges []*Bridge
pOnce sync.Once
pushers []*push.Pusher
fslock sync.Mutex
handlers = make(map[int64]*wrapper)
nextFsHandle int64 = 0
activefs = make(map[string][]*wrapper)
logger = utils.GetLogger("juicefs")
bOnce sync.Once
bridges []*Bridge
pOnce sync.Once
pushers []*push.Pusher
)

const (
Expand Down Expand Up @@ -314,7 +315,7 @@ type javaConf struct {
PushGraphite string `json:"pushGraphite"`
}

func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.FileSystem) uintptr {
func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.FileSystem) int64 {
fslock.Lock()
defer fslock.Unlock()
ws := activefs[name]
Expand Down Expand Up @@ -342,9 +343,9 @@ func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.F
w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(user), w.lookupGids(group))
}
activefs[name] = append(ws, w)
h := uintptr(unsafe.Pointer(w)) & 0x7fffffff // low 32bits
handlers[h] = w
return h
nextFsHandle = nextFsHandle + 1
handlers[nextFsHandle] = w
return nextFsHandle
}

func push2Gateway(pushGatewayAddr, pushAuth string, pushInterVal time.Duration, registry *prometheus.Registry, commonLabels map[string]string) {
Expand Down Expand Up @@ -403,7 +404,7 @@ func push2Graphite(graphite string, pushInterVal time.Duration, registry *promet
}

//export jfs_init
func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintptr {
func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64 {
name := C.GoString(cname)
debug.SetGCPercent(50)
object.UserAgent = "JuiceFS-SDK " + version.Version()
Expand Down Expand Up @@ -592,14 +593,14 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
})
}

func F(p uintptr) *wrapper {
func F(p int64) *wrapper {
fslock.Lock()
defer fslock.Unlock()
return handlers[p]
}

//export jfs_update_uid_grouping
func jfs_update_uid_grouping(h uintptr, uidstr *C.char, grouping *C.char) {
func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) {
w := F(h)
if w == nil {
return
Expand Down Expand Up @@ -663,7 +664,7 @@ func jfs_update_uid_grouping(h uintptr, uidstr *C.char, grouping *C.char) {
}

//export jfs_term
func jfs_term(pid int, h uintptr) int {
func jfs_term(pid int, h int64) int {
w := F(h)
if w == nil {
return 0
Expand Down Expand Up @@ -721,7 +722,7 @@ func jfs_term(pid int, h uintptr) int {
}

//export jfs_open
func jfs_open(pid int, h uintptr, cpath *C.char, lenPtr uintptr, flags int) int {
func jfs_open(pid int, h int64, cpath *C.char, lenPtr uintptr, flags int) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -744,7 +745,7 @@ func jfs_open(pid int, h uintptr, cpath *C.char, lenPtr uintptr, flags int) int
}

//export jfs_access
func jfs_access(pid int, h uintptr, cpath *C.char, flags int) int {
func jfs_access(pid int, h int64, cpath *C.char, flags int) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -753,7 +754,7 @@ func jfs_access(pid int, h uintptr, cpath *C.char, flags int) int {
}

//export jfs_create
func jfs_create(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) int {
func jfs_create(pid int, h int64, cpath *C.char, mode uint16, umask uint16) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -771,7 +772,7 @@ func jfs_create(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) in
}

//export jfs_mkdir
func jfs_mkdir(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) int {
func jfs_mkdir(pid int, h int64, cpath *C.char, mode uint16, umask uint16) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -785,7 +786,7 @@ func jfs_mkdir(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) int
}

//export jfs_delete
func jfs_delete(pid int, h uintptr, cpath *C.char) int {
func jfs_delete(pid int, h int64, cpath *C.char) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -794,7 +795,7 @@ func jfs_delete(pid int, h uintptr, cpath *C.char) int {
}

//export jfs_rmr
func jfs_rmr(pid int, h uintptr, cpath *C.char) int {
func jfs_rmr(pid int, h int64, cpath *C.char) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -803,7 +804,7 @@ func jfs_rmr(pid int, h uintptr, cpath *C.char) int {
}

//export jfs_rename
func jfs_rename(pid int, h uintptr, oldpath *C.char, newpath *C.char) int {
func jfs_rename(pid int, h int64, oldpath *C.char, newpath *C.char) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -812,7 +813,7 @@ func jfs_rename(pid int, h uintptr, oldpath *C.char, newpath *C.char) int {
}

//export jfs_truncate
func jfs_truncate(pid int, h uintptr, path *C.char, length uint64) int {
func jfs_truncate(pid int, h int64, path *C.char, length uint64) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -821,7 +822,7 @@ func jfs_truncate(pid int, h uintptr, path *C.char, length uint64) int {
}

//export jfs_setXattr
func jfs_setXattr(pid int, h uintptr, path *C.char, name *C.char, value uintptr, vlen int, mode int) int {
func jfs_setXattr(pid int, h int64, path *C.char, name *C.char, value uintptr, vlen int, mode int) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -837,7 +838,7 @@ func jfs_setXattr(pid int, h uintptr, path *C.char, name *C.char, value uintptr,
}

//export jfs_getXattr
func jfs_getXattr(pid int, h uintptr, path *C.char, name *C.char, buf uintptr, bufsize int) int {
func jfs_getXattr(pid int, h int64, path *C.char, name *C.char, buf uintptr, bufsize int) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -854,7 +855,7 @@ func jfs_getXattr(pid int, h uintptr, path *C.char, name *C.char, buf uintptr, b
}

//export jfs_listXattr
func jfs_listXattr(pid int, h uintptr, path *C.char, buf uintptr, bufsize int) int {
func jfs_listXattr(pid int, h int64, path *C.char, buf uintptr, bufsize int) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -871,7 +872,7 @@ func jfs_listXattr(pid int, h uintptr, path *C.char, buf uintptr, bufsize int) i
}

//export jfs_removeXattr
func jfs_removeXattr(pid int, h uintptr, path *C.char, name *C.char) int {
func jfs_removeXattr(pid int, h int64, path *C.char, name *C.char) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -880,7 +881,7 @@ func jfs_removeXattr(pid int, h uintptr, path *C.char, name *C.char) int {
}

//export jfs_readlink
func jfs_readlink(pid int, h uintptr, link *C.char, buf uintptr, bufsize int) int {
func jfs_readlink(pid int, h int64, link *C.char, buf uintptr, bufsize int) int {
w := F(h)
if w == nil {
return EINVAL
Expand Down Expand Up @@ -914,7 +915,7 @@ func fill_stat(w *wrapper, wb *utils.Buffer, st *fs.FileStat) int {
}

//export jfs_stat1
func jfs_stat1(pid int, h uintptr, cpath *C.char, buf uintptr) int {
func jfs_stat1(pid int, h int64, cpath *C.char, buf uintptr) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -927,7 +928,7 @@ func jfs_stat1(pid int, h uintptr, cpath *C.char, buf uintptr) int {
}

//export jfs_lstat1
func jfs_lstat1(pid int, h uintptr, cpath *C.char, buf uintptr) int {
func jfs_lstat1(pid int, h int64, cpath *C.char, buf uintptr) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -940,7 +941,7 @@ func jfs_lstat1(pid int, h uintptr, cpath *C.char, buf uintptr) int {
}

//export jfs_summary
func jfs_summary(pid int, h uintptr, cpath *C.char, buf uintptr) int {
func jfs_summary(pid int, h int64, cpath *C.char, buf uintptr) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -963,7 +964,7 @@ func jfs_summary(pid int, h uintptr, cpath *C.char, buf uintptr) int {
}

//export jfs_statvfs
func jfs_statvfs(pid int, h uintptr, buf uintptr) int {
func jfs_statvfs(pid int, h int64, buf uintptr) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -976,7 +977,7 @@ func jfs_statvfs(pid int, h uintptr, buf uintptr) int {
}

//export jfs_chmod
func jfs_chmod(pid int, h uintptr, cpath *C.char, mode C.mode_t) int {
func jfs_chmod(pid int, h int64, cpath *C.char, mode C.mode_t) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -990,7 +991,7 @@ func jfs_chmod(pid int, h uintptr, cpath *C.char, mode C.mode_t) int {
}

//export jfs_utime
func jfs_utime(pid int, h uintptr, cpath *C.char, mtime, atime int64) int {
func jfs_utime(pid int, h int64, cpath *C.char, mtime, atime int64) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -1004,7 +1005,7 @@ func jfs_utime(pid int, h uintptr, cpath *C.char, mtime, atime int64) int {
}

//export jfs_setOwner
func jfs_setOwner(pid int, h uintptr, cpath *C.char, owner *C.char, group *C.char) int {
func jfs_setOwner(pid int, h int64, cpath *C.char, owner *C.char, group *C.char) int {
w := F(h)
if w == nil {
return EINVAL
Expand All @@ -1031,7 +1032,7 @@ func setOwner(w *wrapper, ctx meta.Context, path string, owner, group string) in
}

//export jfs_listdir
func jfs_listdir(pid int, h uintptr, cpath *C.char, offset int, buf uintptr, bufsize int) int {
func jfs_listdir(pid int, h int64, cpath *C.char, offset int, buf uintptr, bufsize int) int {
var ctx meta.Context
var f *fs.File
var w *wrapper
Expand Down Expand Up @@ -1089,7 +1090,7 @@ func toBuf(s uintptr, sz int) []byte {
}

//export jfs_concat
func jfs_concat(pid int, h uintptr, _dst *C.char, buf uintptr, bufsize int) int {
func jfs_concat(pid int, h int64, _dst *C.char, buf uintptr, bufsize int) int {
w := F(h)
if w == nil {
return EINVAL
Expand Down

0 comments on commit 05cb243

Please sign in to comment.