From b74a087e2ac4b48edada015ebca4e6b7404136a5 Mon Sep 17 00:00:00 2001 From: CodingPoeta Date: Tue, 24 Dec 2024 20:22:28 +0800 Subject: [PATCH 1/5] link and truncate --- pkg/fs/fs.go | 44 ++++++++ pkg/meta/interface.go | 4 +- sdk/java/libjfs/main.go | 235 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 282 insertions(+), 1 deletion(-) diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index bd179ab92cdd..1d5302c36303 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -525,6 +525,24 @@ func (fs *FileSystem) Rename(ctx meta.Context, oldpath string, newpath string, f return } +func (fs *FileSystem) Link(ctx meta.Context, src string, dst string) (err syscall.Errno) { + defer trace.StartRegion(context.TODO(), "fs.Link").End() + l := vfs.NewLogContext(ctx) + defer func() { fs.log(l, "Link (%s,%s): %s", src, dst, errstr(err)) }() + + fi, err := fs.resolve(ctx, src, false) + if err != 0 { + return + } + pi, err := fs.resolve(ctx, parentDir(dst), true) + if err != 0 { + return + } + err = fs.m.Link(ctx, fi.inode, pi.inode, path.Base(dst), nil) + fs.invalidateEntry(pi.inode, path.Base(dst)) + return +} + func (fs *FileSystem) Symlink(ctx meta.Context, target string, link string) (err syscall.Errno) { defer trace.StartRegion(context.TODO(), "fs.Symlink").End() l := vfs.NewLogContext(ctx) @@ -1068,6 +1086,29 @@ func (f *File) pwrite(ctx meta.Context, b []byte, offset int64) (n int, err sysc return len(b), 0 } +func (f *File) Truncate(ctx meta.Context, length uint64) (err syscall.Errno) { + defer trace.StartRegion(context.TODO(), "fs.Truncate").End() + f.Lock() + defer f.Unlock() + l := vfs.NewLogContext(ctx) + defer func() { f.fs.log(l, "Truncate (%s,%d): %s", f.path, length, errstr(err)) }() + if f.wdata != nil { + err = f.wdata.Flush(ctx) + if err != 0 { + return + } + } + err = f.fs.m.Truncate(ctx, f.inode, 0, length, nil, false) + if err == 0 { + f.fs.m.InvalidateChunkCache(ctx, f.inode, uint32(((length - 1) >> meta.ChunkBits))) + f.fs.writer.Truncate(f.inode, length) + f.fs.reader.Truncate(f.inode, length) + f.info.attr.Length = length + f.fs.invalidateAttr(f.inode) + } + return +} + func (f *File) Flush(ctx meta.Context) (err syscall.Errno) { defer trace.StartRegion(context.TODO(), "fs.Flush").End() f.Lock() @@ -1078,6 +1119,7 @@ func (f *File) Flush(ctx meta.Context) (err syscall.Errno) { l := vfs.NewLogContext(ctx) defer func() { f.fs.log(l, "Flush (%s): %s", f.path, errstr(err)) }() err = f.wdata.Flush(ctx) + f.fs.invalidateAttr(f.inode) return } @@ -1091,6 +1133,7 @@ func (f *File) Fsync(ctx meta.Context) (err syscall.Errno) { l := vfs.NewLogContext(ctx) defer func() { f.fs.log(l, "Fsync (%s): %s", f.path, errstr(err)) }() err = f.wdata.Flush(ctx) + f.fs.invalidateAttr(f.inode) return } @@ -1110,6 +1153,7 @@ func (f *File) Close(ctx meta.Context) (err syscall.Errno) { } if f.wdata != nil { err = f.wdata.Close(meta.Background()) + f.fs.invalidateAttr(f.inode) f.wdata = nil } _ = f.fs.m.Close(ctx, f.inode) diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 8ee07993a315..b1849ccc546d 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -35,8 +35,10 @@ import ( const ( // MaxVersion is the max of supported versions. MaxVersion = 1 + // ChunkBits is the size of a chunk. + ChunkBits = 26 // ChunkSize is size of a chunk - ChunkSize = 1 << 26 // 64M + ChunkSize = 1 << ChunkBits // 64M // DeleteSlice is a message to delete a slice from object store. DeleteSlice = 1000 // CompactChunk is a message to compact a chunk in object store. diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index da19ef42f511..67878b3dc629 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -26,6 +26,21 @@ package main // #include // #include // void jfs_callback(const char *msg); +/* +#include + +typedef struct { + uint64_t inode; + uint32_t mode; + uint32_t uid; + uint32_t gid; + uint32_t atime; + uint32_t mtime; + uint32_t ctime; + uint32_t nlink; + uint64_t length; +} fileInfo; +*/ import "C" import ( "bytes" @@ -35,6 +50,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "path" "path/filepath" "runtime/debug" "strconv" @@ -801,6 +817,23 @@ func jfs_mkdir(pid int, h int64, cpath *C.char, mode uint16, umask uint16) int { return err } +//export jfs_mkdirAll +func jfs_mkdirAll(pid int, h int64, cpath *C.char, mode, umask uint16) int { + w := F(h) + if w == nil { + return EINVAL + } + path := C.GoString(cpath) + err := errno(w.MkdirAll(w.withPid(pid), path, mode, umask)) + if err == 0 && w.ctx.Uid() == 0 && w.user != w.superuser { + // belongs to supergroup + if err := setOwner(w, w.withPid(pid), path, w.user, ""); err != 0 { + logger.Errorf("change owner of %s to %s: %s", path, w.user, err) + } + } + return err +} + //export jfs_delete func jfs_delete(pid int, h int64, cpath *C.char) int { w := F(h) @@ -853,6 +886,22 @@ func jfs_setXattr(pid int, h int64, path *C.char, name *C.char, value uintptr, v return errno(w.SetXattr(w.withPid(pid), C.GoString(path), C.GoString(name), toBuf(value, vlen), flags)) } +//export jfs_setXattr2 +func jfs_setXattr2(pid int, h int64, path *C.char, name *C.char, value *C.char, mode int) int { + w := F(h) + if w == nil { + return EINVAL + } + var flags uint32 + switch mode { + case 1: + flags = meta.XattrCreate + case 2: + flags = meta.XattrReplace + } + return errno(w.SetXattr(w.withPid(pid), C.GoString(path), C.GoString(name), []byte(C.GoString(value)), flags)) +} + //export jfs_getXattr func jfs_getXattr(pid int, h int64, path *C.char, name *C.char, buf uintptr, bufsize int) int { w := F(h) @@ -870,6 +919,19 @@ func jfs_getXattr(pid int, h int64, path *C.char, name *C.char, buf uintptr, buf return len(buff) } +//export jfs_getXattr2 +func jfs_getXattr2(pid int, h int64, path *C.char, name *C.char, value **C.char) int { + w := F(h) + if w == nil { + return EINVAL + } + t, err := w.GetXattr(w.withPid(pid), C.GoString(path), C.GoString(name)) + if err == 0 { + *value = C.CString(string(t)) + } + return errno(err) +} + //export jfs_listXattr func jfs_listXattr(pid int, h int64, path *C.char, buf uintptr, bufsize int) int { w := F(h) @@ -887,6 +949,20 @@ func jfs_listXattr(pid int, h int64, path *C.char, buf uintptr, bufsize int) int return len(buff) } +//export jfs_listXattr2 +func jfs_listXattr2(pid int, h int64, path *C.char, value **C.char, size *int) int { + w := F(h) + if w == nil { + return EINVAL + } + t, err := w.ListXattr(w.withPid(pid), C.GoString(path)) + if err == 0 { + *value = C.CString(string(t)) + *size = len(t) + } + return errno(err) +} + //export jfs_removeXattr func jfs_removeXattr(pid int, h int64, path *C.char, name *C.char) int { w := F(h) @@ -961,6 +1037,32 @@ func jfs_setfacl(pid int, h int64, path *C.char, acltype int, buf uintptr, alen return errno(w.SetFacl(w.withPid(pid), C.GoString(path), uint8(acltype), rule)) } +//export jfs_link +func jfs_link(pid int, h int64, src *C.char, dst *C.char) int { + w := F(h) + if w == nil { + return EINVAL + } + return errno(w.Link(w.withPid(pid), C.GoString(src), C.GoString(dst))) +} + +//export jfs_symlink +func jfs_symlink(pid int, h int64, target_ *C.char, link_ *C.char) int { + w := F(h) + if w == nil { + return EINVAL + } + target := C.GoString(target_) + link := C.GoString(link_) + dir := path.Dir(strings.TrimRight(link, "/")) + rel, e := filepath.Rel(dir, target) + if e != nil { + // external link + rel = target + } + return errno(w.Symlink(w.withPid(pid), rel, link)) +} + //export jfs_readlink func jfs_readlink(pid int, h int64, link *C.char, buf uintptr, bufsize int) int { w := F(h) @@ -1021,6 +1123,48 @@ func jfs_lstat1(pid int, h int64, cpath *C.char, buf uintptr) int { return fill_stat(w, utils.NewNativeBuffer(toBuf(buf, 130)), fi) } +func attrToInfo(fi *fs.FileStat, info *C.fileInfo) { + attr := fi.Sys().(*meta.Attr) + info.mode = C.uint32_t(attr.SMode()) + info.uid = C.uint32_t(attr.Uid) + info.gid = C.uint32_t(attr.Gid) + info.atime = C.uint32_t(attr.Atime) + info.mtime = C.uint32_t(attr.Mtime) + info.ctime = C.uint32_t(attr.Ctime) + info.nlink = C.uint32_t(attr.Nlink) + info.length = C.uint64_t(attr.Length) +} + +//export jfs_stat +func jfs_stat(pid int, h int64, cpath *C.char, info *C.fileInfo) int { + w := F(h) + if w == nil { + return EINVAL + } + fi, err := w.Stat(w.withPid(pid), C.GoString(cpath)) + if err != 0 { + return errno(err) + } + info.inode = C.uint64_t(fi.Inode()) + attrToInfo(fi, info) + return 0 +} + +//export jfs_lstat +func jfs_lstat(pid int, h int64, cpath *C.char, info *C.fileInfo) int { + w := F(h) + if w == nil { + return EINVAL + } + fi, err := w.Lstat(w.withPid(pid), C.GoString(cpath)) + if err != 0 { + return errno(err) + } + info.inode = C.uint64_t(fi.Inode()) + attrToInfo(fi, info) + return 0 +} + //export jfs_summary func jfs_summary(pid int, h int64, cpath *C.char, buf uintptr) int { w := F(h) @@ -1166,6 +1310,67 @@ func jfs_listdir(pid int, h int64, cpath *C.char, offset int, buf uintptr, bufsi return bufsize - wb.Left() - 4 } +//export jfs_listdir2 +func jfs_listdir2(pid int, h int64, cpath *C.char, plus bool, buf **byte, size *int) int { + var ctx meta.Context + var f *fs.File + w := F(h) + if w == nil { + return EINVAL + } + var err syscall.Errno + ctx = w.withPid(pid) + f, err = w.Open(ctx, C.GoString(cpath), 0) + if err != 0 { + return errno(err) + } + st, _ := f.Stat() + if !st.IsDir() { + return ENOTDIR + } + + *size = 0 + if plus { + es, err := f.ReaddirPlus(ctx, 0) + if err != 0 { + return errno(err) + } + for _, e := range es { + *size += 2 + len(e.Name) + 4*11 + } + *buf = (*byte)(C.malloc(C.size_t(*size))) + out := utils.FromBuffer(unsafe.Slice(*buf, *size)) + for _, e := range es { + out.Put16(uint16(len(e.Name))) + out.Put([]byte(e.Name)) + out.Put32(e.Attr.SMode()) + out.Put64(uint64(e.Inode)) + out.Put32(e.Attr.Nlink) + out.Put32(e.Attr.Uid) + out.Put32(e.Attr.Gid) + out.Put64(e.Attr.Length) + out.Put32(uint32(e.Attr.Atime)) + out.Put32(uint32(e.Attr.Mtime)) + out.Put32(uint32(e.Attr.Ctime)) + } + } else { + es, err := f.Readdir(ctx, 0) + if err != 0 { + return errno(err) + } + for _, e := range es { + *size += 2 + len(e.Name()) + } + *buf = (*byte)(C.malloc(C.size_t(*size))) + out := utils.FromBuffer(unsafe.Slice(*buf, *size)) + for _, e := range es { + out.Put16(uint16(len(e.Name()))) + out.Put([]byte(e.Name())) + } + } + return 0 +} + func toBuf(s uintptr, sz int) []byte { return (*[1 << 30]byte)(unsafe.Pointer(s))[:sz:sz] } @@ -1298,6 +1503,36 @@ func jfs_write(pid, fd int, cbuf uintptr, count C.size_t) int { return n } +//export jfs_pwrite +func jfs_pwrite(pid, fd int, cbuf uintptr, count C.size_t, offset C.off_t) int { + filesLock.Lock() + f, ok := openFiles[fd] + if !ok { + filesLock.Unlock() + return EINVAL + } + filesLock.Unlock() + + buf := toBuf(cbuf, int(count)) + n, err := f.Pwrite(f.w.withPid(pid), buf, int64(offset)) + if err != 0 { + logger.Errorf("pwrite %s: %s", f.Name(), err) + return errno(err) + } + return n +} + +//export jfs_ftruncate +func jfs_ftruncate(pid, fd int, size uint64) int { + filesLock.Lock() + f, ok := openFiles[int(fd)] + filesLock.Unlock() + if !ok { + return EINVAL + } + return errno(f.Truncate(f.w.withPid(pid), size)) +} + //export jfs_flush func jfs_flush(pid, fd int) int { filesLock.Lock() From 6991d13c2c862f8c29697cd3fea5e494924e6632 Mon Sep 17 00:00:00 2001 From: CodingPoeta Date: Wed, 25 Dec 2024 15:58:08 +0800 Subject: [PATCH 2/5] clone --- sdk/java/libjfs/main.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index 67878b3dc629..448ae5098b9e 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -1432,6 +1432,36 @@ func jfs_concat(pid int, h int64, _dst *C.char, buf uintptr, bufsize int) int { return r } +// TODO: implement real clone + +//export jfs_clone +func jfs_clone(pid int, h int64, _src *C.char, _dst *C.char) int { + w := F(h) + if w == nil { + return EINVAL + } + src := C.GoString(_src) + dst := C.GoString(_dst) + ctx := w.withPid(pid) + fi, err := w.Open(ctx, src, 0) + if err != 0 { + logger.Errorf("open %s: %s", src, err) + return errno(err) + } + defer fi.Close(ctx) + fo, err := w.Create(ctx, dst, 0666, 022) + if err != 0 { + logger.Errorf("create %s: %s", dst, err) + return errno(err) + } + defer fo.Close(ctx) + _, err = w.CopyFileRange(ctx, src, 0, dst, 0, 1<<63) + if err != 0 { + logger.Errorf("copy %s to %s: %s", src, dst, err) + } + return errno(err) +} + //export jfs_lseek func jfs_lseek(pid, fd int, offset int64, whence int) int64 { filesLock.Lock() From b9fe7b41d02e7cb2238f1e4a8b68b737e0df8ea6 Mon Sep 17 00:00:00 2001 From: CodingPoeta Date: Wed, 25 Dec 2024 18:07:47 +0800 Subject: [PATCH 3/5] pysdk --- sdk/java/libjfs/main.go | 13 + sdk/python/.gitignore | 5 + sdk/python/Dockerfile.builder | 14 + sdk/python/Dockerfile.builder.arm | 11 + sdk/python/Makefile | 40 ++ sdk/python/juicefs/juicefs/__init__.py | 1 + sdk/python/juicefs/juicefs/juicefs.py | 657 +++++++++++++++++++++++++ sdk/python/juicefs/juicefs/spec.py | 286 +++++++++++ sdk/python/juicefs/setup.py | 21 + sdk/python/juicefs/tests/__init__.py | 0 sdk/python/juicefs/tests/test.py | 28 ++ 11 files changed, 1076 insertions(+) create mode 100644 sdk/python/.gitignore create mode 100644 sdk/python/Dockerfile.builder create mode 100644 sdk/python/Dockerfile.builder.arm create mode 100644 sdk/python/Makefile create mode 100644 sdk/python/juicefs/juicefs/__init__.py create mode 100644 sdk/python/juicefs/juicefs/juicefs.py create mode 100644 sdk/python/juicefs/juicefs/spec.py create mode 100644 sdk/python/juicefs/setup.py create mode 100644 sdk/python/juicefs/tests/__init__.py create mode 100644 sdk/python/juicefs/tests/test.py diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index 448ae5098b9e..f9abae4b0fa1 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -1215,6 +1215,19 @@ func jfs_chmod(pid int, h int64, cpath *C.char, mode C.mode_t) int { return errno(f.Chmod(w.withPid(pid), uint16(mode))) } +//export jfs_chown +func jfs_chown(pid int, h int64, cpath *C.char, uid uint32, gid uint32) int { + w := F(h) + if w == nil { + return EINVAL + } + f, err := w.Open(w.withPid(pid), C.GoString(cpath), 0) + if err != 0 { + return errno(err) + } + return errno(f.Chown(w.withPid(pid), uid, gid)) +} + //export jfs_utime func jfs_utime(pid int, h int64, cpath *C.char, mtime, atime int64) int { w := F(h) diff --git a/sdk/python/.gitignore b/sdk/python/.gitignore new file mode 100644 index 000000000000..92d6dd9505da --- /dev/null +++ b/sdk/python/.gitignore @@ -0,0 +1,5 @@ +dist +build +*.egg-info +*.h +*.so diff --git a/sdk/python/Dockerfile.builder b/sdk/python/Dockerfile.builder new file mode 100644 index 000000000000..87c03c0c51ff --- /dev/null +++ b/sdk/python/Dockerfile.builder @@ -0,0 +1,14 @@ +FROM centos/python-38-centos7 + +USER 0 + +RUN curl -fsSL https://autoinstall.plesk.com/PSA_18.0.62/examiners/repository_check.sh | bash -s -- update >/dev/null && \ + yum install -y make gcc && \ + cd /tmp && \ + curl -L https://static.juicefs.com/misc/go1.20.14.linux-amd64.tar.gz -o go1.20.14.linux-amd64.tar.gz && \ + tar -C /usr/local -xzf go1.20.14.linux-amd64.tar.gz && \ + rm go1.20.14.linux-amd64.tar.gz && \ + ln -s /usr/local/go/bin/go /usr/bin/go && \ + python3 -m pip install --upgrade pip && \ + python3 -m pip install --upgrade setuptools && \ + pip install wheel build diff --git a/sdk/python/Dockerfile.builder.arm b/sdk/python/Dockerfile.builder.arm new file mode 100644 index 000000000000..f93a5d13027d --- /dev/null +++ b/sdk/python/Dockerfile.builder.arm @@ -0,0 +1,11 @@ +FROM centos:7 + +RUN yum install -y make gcc python3 && \ + curl -L https://static.juicefs.com/misc/go1.19.13.linux-arm64.tar.gz -o go1.19.13.linux-arm64.tar.gz && \ + tar -C /usr/local -xzf go1.19.13.linux-arm64.tar.gz && \ + rm go1.19.13.linux-arm64.tar.gz && \ + ln -s /usr/local/go/bin/go /usr/bin/go && \ + go version && \ + python3 -m pip install --upgrade pip && \ + python3 -m pip install --upgrade setuptools && \ + pip install wheel diff --git a/sdk/python/Makefile b/sdk/python/Makefile new file mode 100644 index 000000000000..efc9794f1424 --- /dev/null +++ b/sdk/python/Makefile @@ -0,0 +1,40 @@ +LDFLAGS = -s -w + +.PHONY: libjfs.so juicefs + +# SET GOPROXY if WITH_PROXY is set +CN_GOPROXY ?= 0 +ifeq ($(CN_GOPROXY), 1) + GOPROXY = https://proxy.golang.com.cn,direct +endif + +VERSION_FILE := ../../pkg/version/version.go + +VERSION := $(shell awk '/major[[:space:]]*:[[:space:]]*/ {gsub(/[^0-9]/, "", $$2); major=$$2} \ + /minor[[:space:]]*:[[:space:]]*/ {gsub(/[^0-9]/, "", $$2); minor=$$2} \ + /patch[[:space:]]*:[[:space:]]*/ {gsub(/[^0-9]/, "", $$2); patch=$$2} \ + END {print major "." minor "." patch}' $(VERSION_FILE)) + +SHORT_HASH := $(shell git rev-parse --short HEAD) +BUILD_DATE := $(shell date -u +'%Y-%m-%dT%H:%M:%SZ') +BUILD_DATE_SHORT := $(shell date -u +'%Y%m%d%H%M') + +# libjfs is located in the sdk/java/libjfs +libjfs.so: + go build -buildmode c-shared -ldflags="$(LDFLAGS)" -o juicefs/juicefs/libjfs.so ../java/libjfs + +builder: Dockerfile.builder + docker build -t sdkbuilder -f Dockerfile.builder . + +arm-builder: Dockerfile.builder.arm + docker build -t sdkbuilder -f Dockerfile.builder.arm . + +juicefs: + sudo rm -rf juicefs.egg-info + echo "Building juicefs version $(VERSION).$(BUILD_DATE_SHORT)" + sed -i 's/^VERSION = .*/VERSION = "$(VERSION).$(BUILD_DATE_SHORT)"/' juicefs/setup.py + sed -i 's/^BUILD_INFO = .*/BUILD_INFO = "$(BUILD_DATE) $(SHORT_HASH)"/' juicefs/setup.py + docker run --rm -i -v ${PWD}/../../:/opt/jfs -w /opt/jfs/sdk/python -e GOPROXY=${GOPROXY} sdkbuilder sh -c 'make libjfs.so && cd juicefs && python3 -m build -w' + +clean: + $(clean) diff --git a/sdk/python/juicefs/juicefs/__init__.py b/sdk/python/juicefs/juicefs/__init__.py new file mode 100644 index 000000000000..5b506317675c --- /dev/null +++ b/sdk/python/juicefs/juicefs/__init__.py @@ -0,0 +1 @@ +from .juicefs import Client diff --git a/sdk/python/juicefs/juicefs/juicefs.py b/sdk/python/juicefs/juicefs/juicefs.py new file mode 100644 index 000000000000..d2d3d2e12b37 --- /dev/null +++ b/sdk/python/juicefs/juicefs/juicefs.py @@ -0,0 +1,657 @@ +# encoding: utf-8 + +import codecs +import errno +import grp +import io +import json +import locale +import os +import pwd +import six +import struct +import threading +import time +from ctypes import * + +# pkg/vfs/helpers.go +MODE_WRITE = 2 +MODE_READ = 4 + +def check_error(r, fn, args): + if r < 0: + e = OSError(f'call {fn.__name__} failed: [Errno {-r}] {os.strerror(-r)}: {args[2:]}') + e.errno = -r + raise e + return r + +class FileInfo(Structure): + _fields_ = [ + ('inode', c_uint64), + ('mode', c_uint32), + ('uid', c_uint32), + ('gid', c_uint32), + ('atime', c_uint32), + ('mtime', c_uint32), + ('ctime', c_uint32), + ('nlink', c_uint32), + ('length', c_uint64), + ] + +def _tid(): + return threading.current_thread().ident + +def _bin(s): + return six.ensure_binary(s) + +def unpack(fmt, buf): + if not fmt.startswith("!"): + fmt = "!" + fmt + return struct.unpack(fmt, buf[: struct.calcsize(fmt)]) + +class JuiceFSLib(object): + def __init__(self): + self.lib = cdll.LoadLibrary(os.path.join(os.path.dirname(__file__), "libjfs.so")) + + def __getattr__(self, n): + fn = getattr(self.lib, n) + if n.startswith("jfs"): + fn.restype = c_int + fn.errcheck = check_error + return fn + +class Client(object): + """A JuiceFS client.""" + def __init__(self, name, meta, *, bucket="", storage_class="", read_only=False, no_session=False, + no_bg_job=False, open_cache="", backup_meta="", backup_skip_trash=False, heartbeat="", + cache_dir="", cache_size="100M", free_space="", auto_create=False, cache_full_block=False, + cache_checksum="", cache_eviction="", cache_scan_interval="", cache_expire="", + writeback=False, memory_size="300M", prefetch=0, readahead="100M", upload_limit="10g", + download_limit="10g", max_uploads=0, max_deletes=0, skip_dir_nlink=0, skip_dir_mtime="", + io_retries=0, get_timeout="", put_timeout="", fast_resolve=False, attr_timeout="", + entry_timeout="", dir_entry_timeout="", debug=False, no_usage_report=False, access_log="", + push_gateway="", push_interval="", push_auth="", push_labels="", push_graphite="", **kwargs): + self.lib = JuiceFSLib() + kwargs["meta"] = meta + kwargs["bucket"] = bucket + kwargs["storageClass"] = storage_class + kwargs["readOnly"] = read_only + kwargs["noSession"] = no_session + kwargs["noBGJob"] = no_bg_job + kwargs["openCache"] = open_cache + kwargs["backupMeta"] = backup_meta + kwargs["backupSkipTrash"] = backup_skip_trash + kwargs["heartbeat"] = heartbeat + kwargs["cacheDir"] = cache_dir + kwargs["cacheSize"] = cache_size + kwargs["freeSpace"] = free_space + kwargs["autoCreate"] = auto_create + kwargs["cacheFullBlock"] = cache_full_block + kwargs["cacheChecksum"] = cache_checksum + kwargs["cacheEviction"] = cache_eviction + kwargs["cacheScanInterval"] = cache_scan_interval + kwargs["cacheExpire"] = cache_expire + kwargs["writeback"] = writeback + kwargs["memorySize"] = memory_size + kwargs["prefetch"] = prefetch + kwargs["readahead"] = readahead + kwargs["uploadLimit"] = upload_limit + kwargs["downloadLimit"] = download_limit + kwargs["maxUploads"] = max_uploads + kwargs["maxDeletes"] = max_deletes + kwargs["skipDirNlink"] = skip_dir_nlink + kwargs["skipDirMtime"] = skip_dir_mtime + kwargs["ioRetries"] = io_retries + kwargs["getTimeout"] = get_timeout + kwargs["putTimeout"] = put_timeout + kwargs["fastResolve"] = fast_resolve + kwargs["attrTimeout"] = attr_timeout + kwargs["entryTimeout"] = entry_timeout + kwargs["dirEntryTimeout"] = dir_entry_timeout + kwargs["debug"] = debug + kwargs["noUsageReport"] = no_usage_report + kwargs["accessLog"] = access_log + kwargs["pushGateway"] = push_gateway + kwargs["pushInterval"] = push_interval + kwargs["pushAuth"] = push_auth + kwargs["pushLabels"] = push_labels + kwargs["pushGraphite"] = push_graphite + + jsonConf = json.dumps(kwargs) + self.umask = os.umask(0) + os.umask(self.umask) + user = pwd.getpwuid(os.geteuid()) + groups = [grp.getgrgid(gid).gr_name for gid in os.getgrouplist(user.pw_name, user.pw_gid)] + superuser = pwd.getpwuid(0) + supergroups = [grp.getgrgid(gid).gr_name for gid in os.getgrouplist(superuser.pw_name, superuser.pw_gid)] + self.h = self.lib.jfs_init(name.encode(), jsonConf.encode(), user.pw_name.encode(), ','.join(groups).encode(), superuser.pw_name.encode(), ''.join(supergroups).encode()) + + def stat(self, path): + """Get the status of a file.""" + """Get the status of a file or a directory.""" + fi = FileInfo() + self.lib.jfs_stat1(_tid(), self.h, _bin(path), byref(fi)) + return os.stat_result((fi.mode, fi.inode, 0, fi.nlink, fi.uid, fi.gid, fi.length, fi.atime, fi.mtime, fi.ctime)) + + def exists(self, path): + """Check if a file exists.""" + try: + self.stat(path) + return True + except OSError as e: + if e.errno == errno.ENOENT: + return False + raise + + def open(self, path, mode='r', buffering=-1, encoding=None, errors=None): + """Open a file, returns a filelike object.""" + if len(mode) != len(set(mode)): + raise ValueError(f'invalid mode: {mode}') + flag = 0 + cnt = 0 + for c in mode: + if c in 'rwxa': + cnt += 1 + if c == 'r': + flag |= MODE_READ + else: + flag |= MODE_WRITE + elif c == '+': + flag |= MODE_READ | MODE_WRITE + elif c not in 'tb': + raise ValueError(f'invalid mode: {mode}') + if cnt != 1: + raise ValueError('must have exactly one of create/read/write/append mode') + if 'b' in mode: + if 't' in mode: + raise ValueError("can't have text and binary mode at once") + if encoding: + raise ValueError("binary mode doesn't take an encoding argument") + if errors: + raise ValueError("binary mode doesn't take an errors argument") + else: + if not encoding: + encoding = locale.getpreferredencoding(False).lower() + if not errors: + errors = 'strict' + codecs.lookup(encoding) + + size = 0 + if 'x' in mode: + fd = self.lib.jfs_create(_tid(), self.h, _bin(path), c_uint16(0o666), c_uint16(self.umask)) + else: + try: + sz = c_uint64() + fd = self.lib.jfs_open(_tid(), self.h, _bin(path), byref(sz), c_uint32(flag)) + if 'w' in mode: + self.lib.jfs_ftruncate(_tid(), fd, 0) + else: + size = sz.value + except OSError as e: + if e.errno != errno.ENOENT: + raise e + if 'r' in mode: + raise FileNotFoundError(e) + fd = self.lib.jfs_create(_tid(), self.h, _bin(path), c_uint16(0o666), c_uint16(self.umask)) + return File(self.lib, fd, path, mode, flag, size, buffering, encoding, errors) + + def truncate(self, path, size): + """Truncate a file to a specified size.""" + self.lib.jfs_truncate(_tid(), self.h, _bin(path), c_uint64(size)) + + def remove(self, path): + """Remove a file.""" + self.lib.jfs_delete(_tid(), self.h, _bin(path)) + + def mkdir(self, path, mode=0o777): + """Create a directory.""" + self.lib.jfs_mkdir(_tid(), self.h, _bin(path), c_uint16(mode&0o777), c_uint16(self.umask)) + + def makedirs(self, path, mode=0o777): + """Create a directory and all its parent components if they do not exist.""" + print("makedirs: ", path, "--: ", mode, "--: ", self.umask) + self.lib.jfs_mkdirAll(_tid(), self.h, _bin(path), c_uint16(mode&0o777), c_uint16(self.umask)) + + def rmdir(self, path): + """Remove a directory. The directory must be empty.""" + self.lib.jfs_rmr(_tid(), self.h, _bin(path)) + + def rename(self, old, new): + """Rename the file or directory old to new.""" + self.lib.jfs_rename(_tid(), self.h, _bin(old), _bin(new), c_uint32(0)) + + def listdir(self, path, detail=False): + """Return a list containing the names of the entries in the directory given by path.""" + buf = c_void_p() + size = c_int() + # func jfs_listdir(pid int, h int64, cpath *C.char, offset int, buf uintptr, bufsize int) int { + + self.lib.jfs_listdir2(_tid(), self.h, _bin(path), bool(detail), byref(buf), byref(size)) + data = string_at(buf, size) + infos = [] + pos = 0 + while pos < len(data): + nlen, = unpack("H", data[pos:pos+2]) + pos += 2 + name = six.ensure_str(data[pos : pos + nlen], errors='replace') + pos += nlen + if detail: + mode, inode, nlink, uid, gid, length, atime, mtime, ctime = \ + unpack("IQIIIQIII", data[pos:pos+44]) + infos.append((name, os.stat_result((mode, inode, 0, nlink, uid, gid, length, atime, mtime, ctime)))) + pos += 44 + else: + infos.append(name) + self.lib.free(buf) + return sorted(infos) + + def chmod(self, path, mode): + """Change the mode of a file.""" + self.lib.jfs_chmod(_tid(), self.h, _bin(path), c_uint16(mode)) + + def chown(self, path, uid, gid): + """Change the owner and group id of a file.""" + self.lib.jfs_chown(_tid(), self.h, _bin(path), c_uint32(uid), c_uint32(gid)) + + def link(self, src, dst): + """Create a hard link to a file.""" + self.lib.jfs_link(_tid(), self.h, _bin(src), _bin(dst)) + + def lstat(self, path): + """Like stat(), but do not follow symbolic links.""" + info = FileInfo() + self.lib.jfs_lstat(_tid(), self.h, _bin(path), byref(info)) + return os.stat_result((info.mode, info.inode, 0, info.nlink, info.uid, info.gid, info.length, info.atime, info.mtime, info.ctime)) + + def readlink(self, path): + """Return a string representing the path to which the symbolic link points.""" + buf = bytes(1<<16) + n = self.lib.jfs_readlink(_tid(), self.h, _bin(path), buf, len(buf)) + return buf[:n].decode() + + def symlink(self, src, dst): + """Create a symbolic link.""" + self.lib.jfs_symlink(_tid(), self.h, _bin(src), _bin(dst)) + + def unlink(self, path): + """Remove a file.""" + self.remove(path) + + def rmr(self, path): + """Remove a directory and all its contents recursively.""" + self.lib.jfs_rmr(_tid(), self.h, _bin(path)) + + def utime(self, path, times=None): + """Set the access and modified times of a file.""" + if not times: + now = time.time() + times = (now, now) + self.lib.jfs_utime(_tid(), self.h, _bin(path), c_int64(int(times[1]*1000)), c_int64(int(times[0]*1000))) + + def walk(self, top, topdown=True, onerror=None, followlinks=False): + raise NotImplementedError + + def getxattr(self, path, name): + """Get an extended attribute on a file.""" + size = 64 << 10 # XattrSizeMax + buf = bytes(size) + size = self.lib.jfs_getXattr(_tid(), self.h, _bin(path), _bin(name), buf, size) + return buf[:size] + + def listxattr(self, path): + """List extended attributes on a file.""" + buf = c_void_p() + size = c_int() + self.lib.jfs_listXattr2(_tid(), self.h, _bin(path), byref(buf), byref(size)) + data = string_at(buf, size).decode() + self.lib.free(buf) + if not data: + return [] + return data.split('\0')[:-1] + + def setxattr(self, path, name, value, flags=0): + """Set an extended attribute on a file.""" + value = _bin(value) + self.lib.jfs_setXattr(_tid(), self.h, _bin(path), _bin(name), value, len(value), c_int(flags)) + + def removexattr(self, path, name): + """Remove an extended attribute from a file.""" + self.lib.jfs_removeXattr(_tid(), self.h, _bin(path), _bin(name)) + + def clone(self, src, dst): + """Clone a file.""" + self.lib.jfs_clone(_tid(), self.h, _bin(src), _bin(dst)) + + # def summary(self, path, depth=0, entries=1): + # """Get the summary of a directory.""" + +class File(object): + """A JuiceFS file.""" + def __init__(self, lib, fd, path, mode, flag, length, buffering, encoding, errors): + self.lib = lib + self.fd = fd + self.name = path + self.append = 'a' in mode + self.flag = flag + self.length = length + self.encoding = encoding + self.errors = errors + self.newlines = None + self.closed = False + self._buffering = buffering + if self._buffering < 0: + self._buffering = 128 << 10 + if flag == MODE_READ | MODE_WRITE: + self._buffering = 0 + self._readbuf = None + self._readbuf_off = 0 + self._writebuf = [] + self.off = 0 + if self.append: + self.off = self.length + + def __fspath__(self): + return self.name + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + lines = self.readlines(1) + if lines: + return lines[0] + raise StopIteration + + def fileno(self): + return self.fd + + def isatty(self): + return False + + def _read(self, size): + self._check_closed() + if self.flag & MODE_READ == 0: + raise io.UnsupportedOperation('not readable') + # fill buffer + if (not self._readbuf or self._readbuf_off == len(self._readbuf)) and size < self._buffering: + if not self._readbuf or len(self._readbuf) < self._buffering: + self._readbuf = bytes(self._buffering) + n = self.lib.jfs_pread(_tid(), self.fd, self._readbuf, self._buffering, c_int64(self.off)) + if n < self._buffering: + self._readbuf = self._readbuf[:n] + self._readbuf_off = 0 + # read from buffer + rs = [] + got = 0 + if self._readbuf and self._readbuf_off < len(self._readbuf): + n = len(self._readbuf) - self._readbuf_off + if size >= 0 and size < n: + n = size + rs.append(self._readbuf[self._readbuf_off:self._readbuf_off+n]) + self._readbuf_off += n + got += n + size -= n + # read directly + if size > 0: + while size > 0: + n = min(size, 4 << 20) + buf = bytes(n) + n = self.lib.jfs_pread(_tid(), self.fd, buf, c_uint64(n), c_int64(self.off+got)) + if n == 0: + break + if n < len(buf): + buf = buf[:n] + rs.append(buf) + got += n + size -= n + elif size < 0: + while True: + buf = bytes(128 << 10) + n = self.lib.jfs_pread(_tid(), self.fd, buf, len(buf), c_int64(self.off+got)) + if n == 0: + break + if n < len(buf): + buf = buf[:n] + rs.append(buf) + got += n + if len(rs) == 1: + buf = rs[0] + else: + buf = b''.join(rs) + self.off += len(buf) + return buf + + def read(self, size=-1): + """Read at most size bytes, returned as a string.""" + buf = self._read(size) + if self.encoding: + return buf.decode(self.encoding, self.errors) + else: + return buf + + def write(self, data): + """Write the string data to the file.""" + self._check_closed() + # TODO: buffer for small write + if self.encoding and not isinstance(data, six.text_type): + raise TypeError(f'write() argument must be str, not {type(data).__name__}') + if not self.encoding and not isinstance(data, six.binary_type): + raise TypeError(f"a bytes-like object is required, not '{type(data).__name__}'") + if self.flag & MODE_WRITE == 0: + raise io.UnsupportedOperation('not writable') + + if not data: + return 0 + n = len(data) + if self.encoding: + data = data.encode(self.encoding, self.errors) + if self.append: + self.off = self.length + total = len(data) + for b in self._writebuf: + total += len(b) + if total >= self._buffering: + self.flush() + if len(data) < self._buffering: + self._writebuf.append(data) + else: + self.lib.jfs_pwrite(_tid(), self.fd, data, c_uint64(len(data)), c_int64(self.off)) + else: + self._writebuf.append(data) + self.off += len(data) + if self.off > self.length: + self.length = self.off + return n + + def seek(self, offset, whence=0): + """Set the stream position to the given byte offset. + offset is interpreted relative to the position indicated by whence. + The default value for whence is SEEK_SET.""" + self._check_closed() + if whence not in (os.SEEK_SET, os.SEEK_CUR, os.SEEK_END): + raise ValueError(f'invalid whence ({whence}, should be {os.SEEK_SET}, {os.SEEK_CUR} or {os.SEEK_END})') + if self.encoding: + if whence == os.SEEK_CUR and offset != 0: + raise io.UnsupportedOperation("can't do nonzero cur-relative seeks") + if whence == os.SEEK_END and offset != 0: + raise io.UnsupportedOperation("can't do nonzero end-relative seeks") + self.flush() + if whence == os.SEEK_SET: + self.off = offset + self._readbuf = None + elif whence == os.SEEK_CUR: + self.off += offset + self._readbuf_off += offset + if self._readbuf and (self._readbuf_off < 0 or self._readbuf_off >= len(self._readbuf)): + self._readbuf = None + else: + self.off = self.length + offset + self._readbuf = None + return self.off + + def tell(self): + """Return the current stream position.""" + self._check_closed() + return self.off + + def truncate(self, size=None): + """Truncate the file to at most size bytes. + Size defaults to the current file position, as returned by tell().""" + self._check_closed() + if self.flag & MODE_WRITE == 0: + raise io.UnsupportedOperation('File not open for writing') + self.flush() + if size is None: + size = self.tell() + self.lib.jfs_ftruncate(_tid(), self.fd, c_uint64(size)) + self.length = size + return size + + def flush(self): + """Flush the write buffers of the file if applicable. + This does nothing for read-only and non-blocking streams.""" + if self._writebuf: + data = b''.join(self._writebuf) + self.lib.jfs_pwrite(_tid(), self.fd, data, len(data), c_uint64(self.off-len(data))) + self._writebuf = [] + + def fsync(self): + """Force write file data to the backend storage.""" + self.flush() + self.lib.jfs_fsync(_tid(), self.fd) + + def close(self): + """Close the file. A closed file cannot be used for further I/O operations.""" + if self.closed: + return + self.flush() + self.lib.jfs_close(_tid(), self.fd) + self.closed = True + + def __del__(self): + if not self.closed: + self.close() + + def _check_closed(self): + if self.closed: + raise ValueError('I/O operation on closed file.') + + def readline(self): # TODO: add parameter `size=-1` + """Read until newline or EOF.""" + ls = self.readlines(1) + if ls: + return ls[0] + return '' if self.encoding else b'' + + def xreadlines(self): + return self + + def readlines(self, hint=-1): + """Return a list of lines from the stream.""" + self._check_closed() + if hint == -1: + data = self._read(-1) + else: + rs = [] + while hint > 0: + r = self._read(1) + if not r: + break + rs.append(r) + if r[0] == b'\n': + hint -= 1 + data = b''.join(rs) + if self.encoding: + return [l.decode(self.encoding, self.errors) for l in data.splitlines(True)] + return data.splitlines(True) + + def writelines(self, lines): + """Write a list of lines to the file.""" + self._check_closed() + self.write(''.join(lines) if self.encoding else b''.join(lines)) + self.flush() + + +def test(): + volume = os.getenv("JFS_VOLUME", "test") + meta = os.getenv("JFS_META", "redis://localhost") + v = Client(volume, meta, access_log="/tmp/jfs.log") + st = v.stat("/") + print(st) + if v.exists("/d"): + v.rmr("/d") + v.makedirs("/d") + if v.exists("/d/file"): + v.remove("/d/file") + with v.open("/d/file", "w") as f: + f.write("hello") + with v.open("/d/file", "a+") as f: + f.write("world") + with v.open("/d/file") as f: + data = f.read() + assert data == "helloworld" + with v.open("/d/file", "w") as f: + f.write("hello") + with v.open("/d/file", 'rb', 5) as f: + data = f.readlines() + assert data == [b"hello"] + print(list(v.open("/d/file"))) + assert list(v.open("/d/file")) == ['hello'] + try: + v.open("/d/d/file", "w") + except OSError as e: + if e.errno != errno.ENOENT: + raise e + else: + raise AssertionError + v.chmod("/d/file", 0o777) + # v.chown("/d/file", 0, 0) + v.symlink("/d/file", "/d/link") + assert v.readlink("/d/link") == "file" + v.unlink("/d/link") + v.link("/d/file", "/d/link") + v.rename("/d/link", "/d/link2") + names = sorted(v.listdir("/d")) + assert names == ["file", "link2"] + v.setxattr("/d/file", "user.key", b"value\0") + xx = v.getxattr("/d/file", "user.key") + assert xx == b"value\0" + print(v.listxattr("/d/file")) + assert v.listxattr("/d/file") == ["user.key"] + v.removexattr("/d/file", "user.key") + assert v.listxattr("/d/file") == [] + with v.open("/d/file", "a") as f: + f.seek(0, 0) + f.write("world") + assert f.truncate(2) == 2 + assert f.seek(0, 2) == 2 + assert v.open("/d/file").read() == "he" + k=1024 + start = time.time() + size = 0 + with v.open("/bigfile", mode="wb") as f: + for i in range(4000): + f.write(b"!"*(k*k)) + size += k*k + print("write time:", time.time()-start, size>>20) + start = time.time() + size = 0 + with v.open("/bigfile",mode='rb') as f: + while True: + t = f.read(4*k) + if not t: break + size += len(t) + print("read time:", time.time()-start, size>>20) + +if __name__ == '__main__': + test() + diff --git a/sdk/python/juicefs/juicefs/spec.py b/sdk/python/juicefs/juicefs/spec.py new file mode 100644 index 000000000000..01f4a439b536 --- /dev/null +++ b/sdk/python/juicefs/juicefs/spec.py @@ -0,0 +1,286 @@ +import datetime +import logging +import uuid +import os +from stat import S_ISDIR, S_ISLNK, S_ISREG + +from fsspec.spec import AbstractFileSystem, AbstractBufferedFile + +from .juicefs import Client + +logger = logging.getLogger("fsspec.jfs") + + +class JuiceFS(AbstractFileSystem): + """ + A JuiceFS file system. + """ + protocol = "jfs", "juicefs" + def __init__(self, name, auto_mkdir=False, **kwargs): + if self._cached: + return + super().__init__(**kwargs) + self.auto_mkdir = auto_mkdir + self.temppath = kwargs.pop("temppath", "/tmp") + self.fs = Client(name, **kwargs) + + @property + def fsid(self): + return "jfs_" + self.fs.name + + def makedirs(self, path, exist_ok=False, mode=511): + if self.exists(path) and not exist_ok: + raise FileExistsError(f"File exists: {path}") + self.fs.makedirs(self._strip_protocol(path), mode) + + def mkdir(self, path, create_parents=True, mode=0o511): + if self.exists(path): + raise FileExistsError(f"File exists: {path}") + if create_parents: + self.fs.makedirs(self._strip_protocol(path), mode=mode) + else: + self.fs.mkdir(self._strip_protocol(path), mode) + + def rmdir(self, path): + self.fs.rmdir(self._strip_protocol(path)) + + def ls(self, path, detail=False, **kwargs): + infos = self.fs.listdir(self._strip_protocol(path), detail) + if not detail: + return infos + stats = [] + for name, st in infos: + info = { + "name": os.path.join(path, name), + "size": st.st_size, + "type": "directory" if S_ISDIR(st.st_mode) else "link" if S_ISLNK(st.st_mode) else "file", + "mode": st.st_mode, + "ino": st.st_ino, + "nlink": st.st_nlink, + "uid": st.st_uid, + "gid": st.st_gid, + "created": st.st_atime, + "mtime": st.st_mtime, + } + if S_ISLNK(st.st_mode): + info.update(**self.info(f"{path}/{name}")) + stats.append(info) + return stats + + def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs): + if total: + info = self.info(path) + return info["size"] + return super().du(path, total=total, maxdepth=maxdepth, withdirs=withdirs, **kwargs) + + def info(self, path): + path = self._strip_protocol(path) + try: + st = self.fs.lstat(path) + except OSError: + raise FileNotFoundError(path) + info = { + "name": path, + } + if S_ISLNK(st.st_mode): + info['destination'] = self.fs.readlink(path) + st = self.fs.stat(path) + info.update({ + "type": "directory" if S_ISDIR(st.st_mode) else "file" if S_ISREG(st.st_mode) else "other", + "size": st.st_size, + "uid": st.st_uid, + "gid": st.st_gid, + "created": st.st_atime, + "mtime": st.st_mtime, + }) + return info + + def lexists(self, path, **kwargs): + try: + self.fs.lstat(self._strip_protocol(path)) + return True + except OSError: + return False + + def cp_file(self, path1, path2, **kwargs): + if self.isfile(path1): + if self.auto_mkdir: + self.makedirs(self._parent(path2), exist_ok=True) + self.fs.clone(self._strip_protocol(path1), self._strip_protocol(path2)) + else: + self.mkdirs(path2, exist_ok=True) + + def rm(self, path, recursive=False, maxdepth=None): + if not isinstance(path, list): + path = [path] + for p in path: + if recursive: + self.fs.rmr(self._strip_protocol(p)) + else: + self.fs.remove(self._strip_protocol(p)) + + def _rm(self, path): + self.fs.remove(self._strip_protocol(path)) + + def mv(self, old, new, recursive=False, maxdepth=None, **kwargs): + self.fs.rename(self._strip_protocol(old), self._strip_protocol(new)) + + def link(self, src, dst, **kwargs): + src = self._strip_protocol(src) + dst = self._strip_protocol(dst) + self.fs.link(src, dst, **kwargs) + + def symlink(self, src, dst, **kwargs): + src = self._strip_protocol(src) + dst = self._strip_protocol(dst) + self.fs.symlink(src, dst, **kwargs) + + def islink(self, path) -> bool: + try: + self.fs.readlink(self._strip_protocol(path)) + return True + except OSError: + return False + + def _open(self, path, mode="rb", block_size=None, autocommit=True, **kwargs): + path = self._strip_protocol(path) + if self.auto_mkdir and "w" in mode: + self.makedirs(self._parent(path), exist_ok=True) + return JuiceFile(self, path, mode, block_size, autocommit, **kwargs) + + def touch(self, path, truncate=True, **kwargs): + path = self._strip_protocol(path) + if self.auto_mkdir: + self.makedirs(self._parent(path), exist_ok=True) + if truncate or not self.exists(path): + with self.open(path, "wb", **kwargs): + pass + else: + self.fs.utime(self._strip_protocol(path)) + + @classmethod + def _parent(cls, path): + path = cls._strip_protocol(path) + if os.sep == "/": + # posix native + return path.rsplit("/", 1)[0] or "/" + else: + # NT + path_ = path.rsplit("/", 1)[0] + if len(path_) <= 3: + if path_[1:2] == ":": + # nt root (something like c:/) + return path_[0] + ":/" + # More cases may be required here + return path_ + + def created(self, path): + return datetime.datetime.fromtimestamp( + self.info(path)["created"], tz=datetime.timezone.utc + ) + + def modified(self, path): + return datetime.datetime.fromtimestamp( + self.info(path)["mtime"], tz=datetime.timezone.utc + ) + + def _isfilestore(self): + # Inheriting from DaskFileSystem makes this False (S3, etc. were) + # the original motivation. But we are a posix-like file system. + # See https://github.com/dask/dask/issues/5526 + return True + + def chmod(self, path, mode): + path = self._strip_protocol(path) + return self.fs.chmod(path, mode) + + +class JuiceFile(AbstractBufferedFile): + def __init__(self, fs, path, mode="rb", block_size=None, autocommit=True, cache_options=None, **kwargs): + super().__init__(fs, path, mode, block_size, autocommit, cache_options=cache_options, **kwargs) + if autocommit: + self.temp = path + self.f = None + self._open() + + def _open(self): + if self.f is None or self.f.closed: + if self.autocommit or "w" not in self.mode: + self.f = self.fs.fs.open(self.path, self.mode, buffering=self.blocksize) + else: + self.temp = "/".join([self.fs.temppath, str(uuid.uuid4())]) + self.f = open(self.temp, self.mode, buffering=self.blocksize) + if "w" not in self.mode: + self.size = self.f.seek(0, 2) + self.f.seek(0) + + def _fetch_range(self, start, end): + # probably only used by cached FS + if "r" not in self.mode: + raise ValueError + self._open() + self.f.seek(start) + return self.f.read(end - start) + + def __setstate__(self, state): + self.f = None + loc = state.pop("loc", None) + self.__dict__.update(state) + if "r" in state["mode"]: + self.f = None + self._open() + self.f.seek(loc) + + def __getstate__(self): + d = self.__dict__.copy() + d.pop("f") + if "r" in self.mode: + d["loc"] = self.f.tell() + else: + if not self.f.closed: + raise ValueError("Cannot serialise open write-mode local file") + return d + + def commit(self): + if self.autocommit: + raise RuntimeError("Can only commit if not already set to autocommit") + self.fs.fs.rename(self.temp, self.path) + + def discard(self): + if self.autocommit: + raise RuntimeError("Can only commit if not already set to autocommit") + self.fs.fs.remove(self.temp) + + def tell(self): + return self.f.tell() + + def seek(self, loc, whence=0): + return self.f.seek(loc, whence) + + def write(self, data): + return self.f.write(data) + + def read(self, length=-1): + return self.f.read(length) + + def flush(self, force=True): + return self.f.flush() + + def truncate(self, size=None): + return self.f.truncate(size) + + def close(self): + super().close() + if getattr(self, "_unclosable", False): + return + self.f.close() + + def __getattr__(self, item): + return getattr(self.f, item) + + def __del__(self): + pass + +from fsspec.registry import register_implementation +register_implementation("jfs", JuiceFS, True) +register_implementation("juicefs", JuiceFS, True) diff --git a/sdk/python/juicefs/setup.py b/sdk/python/juicefs/setup.py new file mode 100644 index 000000000000..2d54cc02a03f --- /dev/null +++ b/sdk/python/juicefs/setup.py @@ -0,0 +1,21 @@ +from setuptools import setup, find_packages + +# The following line will be replaced by the actual version number during the Make process +VERSION = "1.3.0" +BUILD_INFO = "BUILDDATE+COMMIT HASH" + + +setup( + name='juicefs', + version=VERSION, + description=BUILD_INFO, + package_data={'juicefs': ['*.so']}, + packages=find_packages(where="."), + include_package_data=True, + install_requires=['six'], + entry_points={ + 'fsspec.specs': [ + 'jfs = juicefs.JuiceFS', + ], + }, +) diff --git a/sdk/python/juicefs/tests/__init__.py b/sdk/python/juicefs/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/python/juicefs/tests/test.py b/sdk/python/juicefs/tests/test.py new file mode 100644 index 000000000000..0cbf5dcc7cb5 --- /dev/null +++ b/sdk/python/juicefs/tests/test.py @@ -0,0 +1,28 @@ +import pytest + +from fsspec import filesystem +import fsspec.tests.abstract as abstract + +from juicefs.spec import JuiceFS + +class JuiceFSFixtures(abstract.AbstractFixtures): + @pytest.fixture(scope="class") + def fs(self): + m = filesystem("jfs", auto_mkdir=True, name="test", meta="redis://localhost") + return m + + @pytest.fixture + def fs_path(self, tmpdir): + return str(tmpdir) + + +class TestJuiceFSGet(abstract.AbstractGetTests, JuiceFSFixtures): + pass + + +class TestJuiceFSPut(abstract.AbstractPutTests, JuiceFSFixtures): + pass + + +class TestJuiceFSCopy(abstract.AbstractCopyTests, JuiceFSFixtures): + pass From 1f4fed58fbe9b6f995f61a1faad6be543f9bcf72 Mon Sep 17 00:00:00 2001 From: CodingPoeta Date: Wed, 25 Dec 2024 22:06:39 +0800 Subject: [PATCH 4/5] lint --- pkg/fs/fs.go | 2 +- sdk/java/libjfs/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 1d5302c36303..dcfca8de3798 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -1100,7 +1100,7 @@ func (f *File) Truncate(ctx meta.Context, length uint64) (err syscall.Errno) { } err = f.fs.m.Truncate(ctx, f.inode, 0, length, nil, false) if err == 0 { - f.fs.m.InvalidateChunkCache(ctx, f.inode, uint32(((length - 1) >> meta.ChunkBits))) + _ = f.fs.m.InvalidateChunkCache(ctx, f.inode, uint32(((length - 1) >> meta.ChunkBits))) f.fs.writer.Truncate(f.inode, length) f.fs.reader.Truncate(f.inode, length) f.info.attr.Length = length diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index f9abae4b0fa1..c1dad91ae24a 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -828,7 +828,7 @@ func jfs_mkdirAll(pid int, h int64, cpath *C.char, mode, umask uint16) int { if err == 0 && w.ctx.Uid() == 0 && w.user != w.superuser { // belongs to supergroup if err := setOwner(w, w.withPid(pid), path, w.user, ""); err != 0 { - logger.Errorf("change owner of %s to %s: %s", path, w.user, err) + logger.Errorf("change owner of %s to %s: %d", path, w.user, err) } } return err From 9e39f59059dfa9d704a0784955a281e3e42fcc86 Mon Sep 17 00:00:00 2001 From: CodingPoeta Date: Fri, 27 Dec 2024 11:34:44 +0800 Subject: [PATCH 5/5] license --- sdk/python/juicefs/juicefs/juicefs.py | 13 +++++++++++++ sdk/python/juicefs/juicefs/spec.py | 15 +++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/sdk/python/juicefs/juicefs/juicefs.py b/sdk/python/juicefs/juicefs/juicefs.py index d2d3d2e12b37..bf838c9fd0bb 100644 --- a/sdk/python/juicefs/juicefs/juicefs.py +++ b/sdk/python/juicefs/juicefs/juicefs.py @@ -1,4 +1,17 @@ # encoding: utf-8 +# JuiceFS, Copyright 2020 Juicedata, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import codecs import errno diff --git a/sdk/python/juicefs/juicefs/spec.py b/sdk/python/juicefs/juicefs/spec.py index 01f4a439b536..00fed863f522 100644 --- a/sdk/python/juicefs/juicefs/spec.py +++ b/sdk/python/juicefs/juicefs/spec.py @@ -1,3 +1,18 @@ +# encoding: utf-8 +# JuiceFS, Copyright 2020 Juicedata, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import datetime import logging import uuid