diff --git a/.github/workflows/mutate-test-sdk.yml b/.github/workflows/mutate-test-sdk.yml index ff16ed3d2771..b60898a7d41e 100644 --- a/.github/workflows/mutate-test-sdk.yml +++ b/.github/workflows/mutate-test-sdk.yml @@ -40,7 +40,7 @@ jobs: - name: Set up Java uses: actions/setup-java@v3 with: - distribution: 'adopt' + distribution: 'temurin' java-version: '8' - name: Run Redis diff --git a/.github/workflows/sdktest.yml b/.github/workflows/sdktest.yml index 52d9b3595a81..f191c0708934 100644 --- a/.github/workflows/sdktest.yml +++ b/.github/workflows/sdktest.yml @@ -64,6 +64,8 @@ jobs: mkdir etc chmod 777 etc echo `hostname` > etc/nodes + echo "tom:3001" > users + echo "g1:2001:tom" > groups - name: Sdk Test run: | diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index e4ae9da39245..da19ef42f511 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -75,6 +75,9 @@ var ( bridges []*Bridge pOnce sync.Once pushers []*push.Pusher + + userGroupCache = make(map[string]map[string][]string) // name -> (user -> groups) + ) const ( @@ -340,7 +343,16 @@ func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.F logger.Infof("JuiceFileSystem created for user:%s group:%s", user, group) } w := &wrapper{jfs, nil, m, user, superuser, supergroup} - if w.isSuperuser(user, strings.Split(group, ",")) { + var gs []string + if userGroupCache[name] != nil { + gs = userGroupCache[name][user] + } + if gs == nil { + gs = strings.Split(group, ",") + } + group = strings.Join(gs, ",") + logger.Debugf("update groups of %s to %s", user, group) + if w.isSuperuser(user, gs) { w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0}) } else { w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(user), w.lookupGids(group)) @@ -603,11 +615,8 @@ func F(p int64) *wrapper { } //export jfs_update_uid_grouping -func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) { - w := F(h) - if w == nil { - return - } +func jfs_update_uid_grouping(cname, uidstr *C.char, grouping *C.char) { + name := C.GoString(cname) var uids []pwent if uidstr != nil { for _, line := range strings.Split(C.GoString(uidstr), "\n") { @@ -627,8 +636,9 @@ func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) { logger.Debugf("Update uids mapping\n %s", buffer.String()) } + var userGroups = make(map[string][]string) // user -> groups + var gids []pwent - var groups []string if grouping != nil { for _, line := range strings.Split(C.GoString(grouping), "\n") { fields := strings.Split(line, ":") @@ -640,29 +650,32 @@ func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) { gids = append(gids, pwent{uint32(gid), gname}) if len(fields) > 2 { for _, user := range strings.Split(fields[len(fields)-1], ",") { - if strings.TrimSpace(user) == w.user { - groups = append(groups, gname) - } + userGroups[user] = append(userGroups[user], gname) } } } - logger.Debugf("Update groups of %s to %s", w.user, strings.Join(groups, ",")) var buffer bytes.Buffer for _, g := range gids { buffer.WriteString(fmt.Sprintf("\t%v:%v\n", g.name, g.id)) } logger.Debugf("Update gids mapping\n %s", buffer.String()) } - w.m.update(uids, gids, false) - if w.isSuperuser(w.user, groups) { - w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0}) - } else { - gids := w.ctx.Gids() - if len(groups) > 0 { - gids = w.lookupGids(strings.Join(groups, ",")) + fslock.Lock() + defer fslock.Unlock() + userGroupCache[name] = userGroups + ws := activefs[name] + if len(ws) > 0 { + m := ws[0].m + m.update(uids, gids, false) + for _, w := range ws { + logger.Debugf("Update groups of %s to %s", w.user, strings.Join(userGroups[w.user], ",")) + if w.isSuperuser(w.user, userGroups[w.user]) { + w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0}) + } else { + w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(w.user), w.lookupGids(strings.Join(userGroups[w.user], ","))) + } } - w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(w.user), gids) } } diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java index 4b51fea0b088..37a4f9def513 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java @@ -43,8 +43,6 @@ public class JuiceFileSystem extends FilterFileSystem { private static boolean fileChecksumEnabled = false; private static boolean distcpPatched = false; - private FileSystem emptierFs; - static { PatchUtil.patchBefore("org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream", "waitUntilLeaseIsRevoked", @@ -68,25 +66,21 @@ private synchronized static void patchDistCpChecksum() { public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); fileChecksumEnabled = Boolean.parseBoolean(getConf(conf, "file.checksum", "false")); - if (!Boolean.parseBoolean(getConf(conf, "disable-trash-emptier", "false"))) { - startTrashEmptier(uri, conf); - } - } - - private void startTrashEmptier(URI uri, final Configuration conf) throws IOException { - if (BgTaskUtil.isRunning(uri.getHost(), "Trash emptier")) { - return; - } - try { - UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs")); - emptierFs = superUser.doAs((PrivilegedExceptionAction) () -> { - JuiceFileSystemImpl fs = new JuiceFileSystemImpl(); - fs.initialize(uri, conf); - return fs; - }); - BgTaskUtil.startTrashEmptier(uri.getHost(), "Trash emptier", emptierFs, new Trash(emptierFs, conf).getEmptier(), TimeUnit.MINUTES.toMillis(10)); - } catch (Exception e) { - throw new IOException("start trash failed!",e); + boolean asBgTask = conf.getBoolean("juicefs.internal-bg-task", false); + if (!asBgTask && !Boolean.parseBoolean(getConf(conf, "disable-trash-emptier", "false"))) { + try { + Configuration newConf = new Configuration(conf); + newConf.setBoolean("juicefs.internal-bg-task", true); + UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs")); + FileSystem emptierFs = superUser.doAs((PrivilegedExceptionAction) () -> { + JuiceFileSystemImpl fs = new JuiceFileSystemImpl(); + fs.initialize(uri, newConf); + return fs; + }); + BgTaskUtil.startTrashEmptier(uri.getHost(), emptierFs, new Trash(emptierFs, conf).getEmptier(), 10, TimeUnit.MINUTES); + } catch (Exception e) { + LOG.warn("start trash emptier for {} failed", uri.getHost(), e); + } } } diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index 74dc5611f43f..fc590febf602 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -60,6 +60,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.jar.JarFile; import java.util.stream.Collectors; @@ -110,12 +111,13 @@ static String loadVersion() { private long handle; private UserGroupInformation ugi; private String homeDirPrefix = "/user"; - private Map cachedHosts = new HashMap<>(); // (ip, hostname) - private ConsistentHash hash = new ConsistentHash<>(1, Collections.singletonList("localhost")); + private static final Map> cachedHostsForName = new ConcurrentHashMap<>(); // (name -> (ip -> hostname)) + private static final Map> hashForName = new ConcurrentHashMap<>(); // (name -> consistentHash) + private static final Map lastFileStatus = new ConcurrentHashMap<>(); + private FsPermission uMask; private String hflushMethod; - private Map lastFileStatus = new HashMap<>(); private static final DirectBufferPool directBufferPool = new DirectBufferPool(); private boolean metricsEnable = false; @@ -143,7 +145,7 @@ static String loadVersion() { public static interface Libjfs { long jfs_init(String name, String jsonConf, String user, String group, String superuser, String supergroup); - void jfs_update_uid_grouping(long h, String uidstr, String grouping); + void jfs_update_uid_grouping(String name, String uidstr, String grouping); int jfs_term(long pid, long h); @@ -384,7 +386,6 @@ public void initialize(URI uri, Configuration conf) throws IOException { groups.clear(); groups.add(supergroup); } - String mountpoint = getConf(conf, "mountpoint", ""); synchronized (JuiceFileSystemImpl.class) { if (callBack == null) { @@ -448,10 +449,12 @@ public void initialize(URI uri, Configuration conf) throws IOException { if (handle <= 0) { throw new IOException("JuiceFS initialized failed for jfs://" + name); } - - initCache(conf); - refreshCache(conf); - + boolean asBgTask = conf.getBoolean("juicefs.internal-bg-task", false); + if (asBgTask) { + LOG.debug("background fs {}|({})", name, handle); + } else { + BgTaskUtil.register(name, handle); + } homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user"); this.workingDir = getHomeDirectory(); @@ -514,11 +517,13 @@ public void initialize(URI uri, Configuration conf) throws IOException { } } - String uidFile = getConf(conf, "users", null); - if (!isEmpty(uidFile) || !isEmpty(groupingFile)) { - updateUidAndGrouping(uidFile, groupingFile); - if (!isSuperGroupFileSystem) { - refreshUidAndGrouping(uidFile, groupingFile); + if (!asBgTask && !isSuperGroupFileSystem) { + // use juicefs.users and juicefs.groups for global mapping + String uidFile = getConf(conf, "users", null); + if (!isEmpty(uidFile) || !isEmpty(groupingFile)) { + BgTaskUtil.putTask(name, "Refresh guid", () -> { + updateUidAndGrouping(uidFile, groupingFile); + }, 1, 1, TimeUnit.MINUTES); } } } @@ -574,38 +579,20 @@ private boolean isEmpty(String str) { private String readFile(String file) throws IOException { Path path = new Path(file); - URI uri = path.toUri(); - FileSystem fs; - try { - URI defaultUri = getDefaultUri(getConf()); - if (uri.getScheme() == null) { - uri = defaultUri; - } else { - if (uri.getAuthority() == null && (uri.getScheme().equals(defaultUri.getScheme()))) { - uri = defaultUri; - } - } - if (getScheme().equals(uri.getScheme()) && - (name != null && name.equals(uri.getAuthority()))) { - fs = this; - } else { - fs = path.getFileSystem(getConf()); - } - - FileStatus lastStatus = lastFileStatus.get(file); + FileStatus lastStatus = lastFileStatus.get(file); + Configuration newConf = new Configuration(getConf()); + newConf.setBoolean("juicefs.internal-bg-task", true); + try (FileSystem fs = FileSystem.newInstance(path.toUri(), newConf)) { FileStatus status = fs.getFileStatus(path); if (lastStatus != null && status.getModificationTime() == lastStatus.getModificationTime() - && status.getLen() == lastStatus.getLen()) { + && status.getLen() == lastStatus.getLen()) { return null; } - FSDataInputStream in = fs.open(path); - String res = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n")); - in.close(); - lastFileStatus.put(file, status); - return res; - } catch (IOException e) { - LOG.warn(String.format("read %s failed", file), e); - throw e; + try (FSDataInputStream in = fs.open(path)) { + String res = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n")); + lastFileStatus.put(file, status); + return res; + } } } @@ -635,16 +622,10 @@ private void updateUidAndGrouping(String uidFile, String groupFile) throws IOExc grouping = readFile(groupFile); } - lib.jfs_update_uid_grouping(handle, uidstr, grouping); + lib.jfs_update_uid_grouping(name, uidstr, grouping); groups = Arrays.stream(group.split(",")).collect(Collectors.toSet()); } - private void refreshUidAndGrouping(String uidFile, String groupFile) { - BgTaskUtil.startScheduleTask(name, "Refresh guid", () -> { - updateUidAndGrouping(uidFile, groupFile); - }, 1, 1, TimeUnit.MINUTES); - } - private void initializeStorageIds(Configuration conf) throws IOException { try { Class clazz = Class.forName("org.apache.hadoop.fs.BlockLocation"); @@ -834,11 +815,11 @@ private void initCache(Configuration conf) { } // if newCachedHosts are not changed, skip - if (!newCachedHosts.equals(cachedHosts)) { + if (!newCachedHosts.equals(cachedHostsForName.get(name))) { List ips = new ArrayList<>(newCachedHosts.keySet()); LOG.debug("update nodes to: " + String.join(",", ips)); - this.hash = new ConsistentHash<>(100, ips); - this.cachedHosts = newCachedHosts; + hashForName.put(name, new ConsistentHash<>(100, ips)); + cachedHostsForName.put(name, newCachedHosts); } } } catch (Throwable e) { @@ -846,12 +827,6 @@ private void initCache(Configuration conf) { } } - private void refreshCache(Configuration conf) { - BgTaskUtil.startScheduleTask(name, "Node fetcher", () -> { - initCache(conf); - }, 10, 10, TimeUnit.MINUTES); - } - private List discoverNodes(String urls) { LOG.debug("fetching nodes from {}", urls); NodesFetcher fetcher = NodesFetcherBuilder.buildFetcher(urls, name, this); @@ -868,7 +843,13 @@ private BlockLocation makeLocation(long code, long start, long len) { BlockLocation blockLocation; String[] ns = new String[cacheReplica]; String[] hs = new String[cacheReplica]; - String host = cachedHosts.getOrDefault(hash.get(code + "-" + index), "localhost"); + + Map cachedHosts = cachedHostsForName.get(name); + ConsistentHash hash = hashForName.get(name); + String host = "localhost"; + if (cachedHosts != null && hash != null) { + host = cachedHosts.get(hash.get(code + "-" + index)); + } ns[0] = host + ":50010"; hs[0] = host; for (int i = 1; i < cacheReplica; i++) { @@ -916,6 +897,11 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l String[] host = new String[]{"localhost"}; return new BlockLocation[]{new BlockLocation(name, host, 0L, file.getLen())}; } + + BgTaskUtil.putTask(name, "Node fetcher", () -> { + initCache(getConf()); + }, 10, 10, TimeUnit.MINUTES); + if (file.getLen() <= start + len) { len = file.getLen() - start; } @@ -1872,6 +1858,12 @@ public void setTimes(Path p, long mtime, long atime) throws IOException { @Override public void close() throws IOException { super.close(); + BgTaskUtil.unregister(name, handle, () -> { + cachedHostsForName.clear(); + hashForName.clear(); + lastFileStatus.clear(); + }); + LOG.debug("close {}({})", name, handle); lib.jfs_term(Thread.currentThread().getId(), handle); if (metricsEnable) { JuiceFSInstrumentation.close(); diff --git a/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java b/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java index 16fb2447c177..eee31e866eb4 100644 --- a/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java +++ b/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -28,87 +29,127 @@ public class BgTaskUtil { private static final Logger LOG = LoggerFactory.getLogger(BgTaskUtil.class); - private static BgTaskUtil staticFieldForGc = new BgTaskUtil(); + private static final Map bgThreadForName = new HashMap<>(); // volName -> threadpool + private static final Map tasks = new HashMap<>(); // volName|taskName -> running + private static final Map> runningInstance = new HashMap<>(); - private BgTaskUtil() { + public static Map getBgThreadForName() { + return bgThreadForName; } - private static final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(2, r -> { - Thread thread = new Thread(r, "Background Task"); - thread.setDaemon(true); - return thread; - }); - // use timer to run trash emptier because it will occupy a thread - private static final List timers = new ArrayList<>(); - private static final List fileSystems = new ArrayList<>(); - private static final Set runningBgTask = new HashSet<>(); + public static Map> getRunningInstance() { + return runningInstance; + } - public interface Task { - void run() throws Exception; + public static void register(String volName, long handle) { + if (handle <= 0) { + return; + } + synchronized (runningInstance) { + LOG.debug("register instance for {}({})", volName, handle); + if (!runningInstance.containsKey(volName)) { + Set handles = new HashSet<>(); + handles.add(handle); + runningInstance.put(volName, handles); + } else { + runningInstance.get(volName).add(handle); + } + } } - public static void startScheduleTask(String name, String type, Task task, long initialDelay, long period, TimeUnit unit) { - synchronized (runningBgTask) { - if (isRunning(name, type)) { + public static void unregister(String volName, long handle, Runnable cleanupTask) { + if (handle <= 0) { + return; + } + synchronized (runningInstance) { + if (!runningInstance.containsKey(volName)) { return; } - threadPool.scheduleAtFixedRate(() -> { - try { - LOG.debug("Background task started for {} {}", name, type); - task.run(); - } catch (Exception e) { - LOG.warn("Background task failed for {} {}", name, type, e); - synchronized (runningBgTask) { - runningBgTask.remove(genKey(name, type)); - } - throw new RuntimeException(e); + Set handles = runningInstance.get(volName); + boolean removed = handles.remove(handle); + if (!removed) { + return; + } + LOG.debug("unregister instance for {}({})", volName, handle); + if (handles.size() == 0) { + LOG.debug("clean resources for {}", volName); + ScheduledExecutorService pool = bgThreadForName.remove(volName); + if (pool != null) { + pool.shutdownNow(); } - }, initialDelay, period, unit); - runningBgTask.add(genKey(name, type)); + stopTrashEmptier(volName); + tasks.entrySet().removeIf(e -> e.getKey().startsWith(volName + "|")); + cleanupTask.run(); + runningInstance.remove(volName); + } } } + public interface Task { + void run() throws IOException; + } - public static void startTrashEmptier(String name, String type, FileSystem fs, Runnable emptierTask, long delay) { - synchronized (runningBgTask) { - if (isRunning(name, type)) { - return; + + public static void putTask(String volName, String taskName, Task task, long delay, long period, TimeUnit unit) throws IOException { + synchronized (tasks) { + String key = volName + "|" + taskName; + if (!tasks.containsKey(key)) { + LOG.debug("start task {}", key); + task.run(); + // build background task thread for volume name + ScheduledExecutorService pool = bgThreadForName.computeIfAbsent(volName, + n -> Executors.newScheduledThreadPool(1, r -> { + Thread thread = new Thread(r, "JuiceFS Background Task"); + thread.setDaemon(true); + return thread; + }) + ); + pool.scheduleAtFixedRate(()->{ + try { + task.run(); + } catch (IOException e) { + LOG.warn("run {} failed", key, e); + } + }, delay, period, unit); + tasks.put(key, new Object()); } - Timer timer = new Timer("trash emptier", true); - timer.schedule(new TimerTask() { - @Override - public void run() { - emptierTask.run(); - } - }, delay); - runningBgTask.add(genKey(name, type)); - timers.add(timer); - fileSystems.add(fs); } } - public static boolean isRunning(String name, String type) { - synchronized (runningBgTask) { - return runningBgTask.contains(genKey(name, type)); + static class TrashEmptyTask { + FileSystem fs; + ScheduledExecutorService thread; + + public TrashEmptyTask(FileSystem fs, ScheduledExecutorService thread) { + this.fs = fs; + this.thread = thread; } } - private static String genKey(String name, String type) { - return name + "|" + type; + public static void startTrashEmptier(String name, FileSystem fs, Runnable emptierTask, long delay, TimeUnit unit) { + synchronized (tasks) { + String key = name + "|" + "Trash emptier"; + if (!tasks.containsKey(key)) { + LOG.debug("start trash emptier for {}", name); + ScheduledExecutorService thread = Executors.newScheduledThreadPool(1); + thread.schedule(emptierTask, delay, unit); + tasks.put(key, new TrashEmptyTask(fs, thread)); + } + } } - @Override - protected void finalize() { - threadPool.shutdownNow(); - for (Timer timer : timers) { - timer.cancel(); - timer.purge(); - } - for (FileSystem fs : fileSystems) { - try { - fs.close(); - } catch (IOException e) { - LOG.warn("close trash emptier fs failed", e); + private static void stopTrashEmptier(String name) { + synchronized (tasks) { + String key = name + "|" + "Trash emptier"; + Object v = tasks.remove(key); + if (v instanceof TrashEmptyTask) { + LOG.debug("close trash emptier for {}", name); + ((TrashEmptyTask) v).thread.shutdownNow(); + try { + ((TrashEmptyTask) v).fs.close(); + } catch (IOException e) { + LOG.warn("close failed", e); + } } } } diff --git a/sdk/java/src/main/java/io/juicefs/utils/NodesFetcher.java b/sdk/java/src/main/java/io/juicefs/utils/NodesFetcher.java index 2db440b0a88b..c9044a60f899 100644 --- a/sdk/java/src/main/java/io/juicefs/utils/NodesFetcher.java +++ b/sdk/java/src/main/java/io/juicefs/utils/NodesFetcher.java @@ -18,7 +18,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import sun.net.www.protocol.http.Handler; import java.io.*; import java.net.HttpURLConnection; @@ -40,8 +39,6 @@ public abstract class NodesFetcher { protected File cacheFile; private String jfsName; - private static Handler handler = new Handler(); - public NodesFetcher(String jfsName) { this.jfsName = jfsName; if (!cacheFolder.exists()) { @@ -121,7 +118,7 @@ protected String doGet(String url) { HttpURLConnection con = null; try { - con = (HttpURLConnection) new URL(null, url, handler).openConnection(); + con = (HttpURLConnection) new URL(url).openConnection(); con.setConnectTimeout(timeout * 1000); con.setReadTimeout(timeout * 1000); diff --git a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemBgTaskTest.java b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemBgTaskTest.java new file mode 100644 index 000000000000..d0ed3216906d --- /dev/null +++ b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemBgTaskTest.java @@ -0,0 +1,72 @@ +/* + * JuiceFS, Copyright 2024 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.juicefs; + +import io.juicefs.utils.BgTaskUtil; +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.*; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; + +public class JuiceFileSystemBgTaskTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(JuiceFileSystemBgTaskTest.class); + + public void testJuiceFileSystemBgTask() throws Exception { + FileSystem.closeAll(); + Configuration conf = new Configuration(); + conf.addResource(JuiceFileSystemTest.class.getClassLoader().getResourceAsStream("core-site.xml")); + conf.set(FS_TRASH_INTERVAL_KEY, "6"); + conf.set(FS_TRASH_CHECKPOINT_INTERVAL_KEY, "2"); + conf.set("juicefs.users", "jfs://dev/users"); + conf.set("juicefs.groups", "jfs://dev/groups"); + conf.set("juicefs.discover-nodes-url", "jfs://dev/etc/nodes"); + int threads = 100; + int instances = 1000; + CountDownLatch latch = new CountDownLatch(instances); + ExecutorService pool = Executors.newFixedThreadPool(threads); + for (int i = 0; i < instances; i++) { + pool.submit(() -> { + try (JuiceFileSystem jfs = new JuiceFileSystem()) { + jfs.initialize(URI.create("jfs://dev/"), conf); + if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) { + jfs.getFileBlockLocations(jfs.getFileStatus(new Path("jfs://dev/users")), 0, 1000); + } + } catch (Exception e) { + LOG.error("unexpected exception", e); + } finally { + latch.countDown(); + } + }); + } + latch.await(); + Map bgThreadForName = BgTaskUtil.getBgThreadForName(); + for (String s : bgThreadForName.keySet()) { + System.out.println(s); + } + assertEquals(0, bgThreadForName.size()); + assertEquals(0, BgTaskUtil.getRunningInstance().size()); + pool.shutdown(); + } +} diff --git a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java index 48434f1e61bb..cfc7173abf9e 100644 --- a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java +++ b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java @@ -49,7 +49,6 @@ import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; import static org.apache.hadoop.fs.permission.AclEntryType.*; import static org.apache.hadoop.fs.permission.FsAction.*; -import static org.apache.hadoop.fs.permission.FsAction.ALL; import static org.junit.Assert.assertArrayEquals; public class JuiceFileSystemTest extends TestCase { @@ -76,6 +75,7 @@ public void setUp() throws Exception { public void tearDown() throws Exception { fs.close(); + FileSystem.closeAll(); } public void testFsStatus() throws IOException { @@ -668,6 +668,7 @@ public void testUsersAndGroups() throws Exception { writeFile(fs, groups1, "group1:3001:user1\n"); writeFile(fs, users2, "user2:2001\n"); writeFile(fs, groups2, "group2:3001:user2\n"); + fs.close(); Configuration conf = new Configuration(cfg); conf.set("juicefs.users", users1.toUri().getPath()); @@ -756,17 +757,21 @@ public void testGuidMapping() throws Exception { Path f = new Path("/test_foo"); fooFs.create(f).close(); assertEquals("foo", fooFs.getFileStatus(f).getOwner()); + fooFs.close(); ou = fs.create(new Path("/etc/users")); ou.write("foo:10001\n".getBytes()); ou.close(); - FileSystem newFS = FileSystem.newInstance(newConf); - assertEquals("10000", fooFs.getFileStatus(f).getOwner()); + fs.close(); - fooFs.delete(f, false); + FileSystem newFS = FileSystem.newInstance(newConf); + assertEquals("10000", newFS.getFileStatus(f).getOwner()); + newFS.delete(f, false); + newFS.close(); } public void testGuidMappingFromString() throws Exception { + fs.close(); Configuration newConf = new Configuration(cfg); newConf.set("juicefs.users", "bar:10000;foo:20000;baz:30000"); @@ -779,14 +784,16 @@ public void testGuidMappingFromString() throws Exception { fooFs.setOwner(f, "foo", "user"); assertEquals("foo", fooFs.getFileStatus(f).getOwner()); assertEquals("user", fooFs.getFileStatus(f).getGroup()); + fooFs.close(); newConf.set("juicefs.users", "foo:20001"); newConf.set("juicefs.groups", "user:1001:foo,bar;admin:2001:baz"); FileSystem newFS = FileSystem.newInstance(newConf); - assertEquals("20000", fooFs.getFileStatus(f).getOwner()); - assertEquals("1000", fooFs.getFileStatus(f).getGroup()); + assertEquals("20000", newFS.getFileStatus(f).getOwner()); + assertEquals("1000", newFS.getFileStatus(f).getGroup()); - fooFs.delete(f, false); + newFS.delete(f, false); + newFS.close(); } public void testTrash() throws Exception { @@ -948,10 +955,12 @@ public void testUserWithMultiGroups() throws Exception { writeFile(fs, users, "tom:2001\n"); writeFile(fs, groups, "groupa:3001:tom\ngroupb:3002:tom"); + fs.close(); Configuration conf = new Configuration(cfg); conf.set("juicefs.users", users.toUri().getPath()); conf.set("juicefs.groups", groups.toUri().getPath()); + conf.set("juicefs.debug", "true"); FileSystem superFs = createNewFs(conf, "hdfs", new String[]{"hadoop"}); Path testDir = new Path("/test_multi_group/d1"); diff --git a/sdk/java/src/test/java/io/juicefs/utils/BgTaskUtilTest.java b/sdk/java/src/test/java/io/juicefs/utils/BgTaskUtilTest.java new file mode 100644 index 000000000000..e6aab69f2aa2 --- /dev/null +++ b/sdk/java/src/test/java/io/juicefs/utils/BgTaskUtilTest.java @@ -0,0 +1,75 @@ +package io.juicefs.utils; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.concurrent.*; + +public class BgTaskUtilTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(BgTaskUtilTest.class); + + public void testBgTask() throws Exception { + String[] volNames = new String[]{"fs1", "fs2", "fs3"}; + String[] taskNames = new String[]{"task1", "task2", "task3"}; + int threads = 20; + ExecutorService pool = Executors.newFixedThreadPool(threads); + + int instances = 100; + CountDownLatch latch = new CountDownLatch(instances); + + for (int i = 0; i < instances; i++) { + int handle = i + 1; + pool.submit(() -> { + String volName = volNames[ThreadLocalRandom.current().nextInt(100) % volNames.length]; + try { + BgTaskUtil.register(volName, handle); + BgTaskUtil.startTrashEmptier(volName, FileSystem.get(URI.create("file:///"), new Configuration()), () -> { + while (true) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + }, 0, TimeUnit.MINUTES); + // put many tasks + for (int j = 0; j < 10; j++) { + String taskName = taskNames[ThreadLocalRandom.current().nextInt(100) % taskNames.length]; + BgTaskUtil.putTask(volName, + taskName, + () -> { + LOG.info("running {}|{}", volName, taskName); + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(2000)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + 0, 1, TimeUnit.MINUTES + ); + } + } catch (Exception e) { + LOG.error("unexpected", e); + } finally { + BgTaskUtil.unregister(volName, handle, () -> { + LOG.info("clean {}", volName); + }); + latch.countDown(); + } + }); + } + latch.await(); + assertEquals(0, BgTaskUtil.getBgThreadForName().size()); + assertEquals(0, BgTaskUtil.getRunningInstance().size()); + + for (StackTraceElement[] elements : Thread.getAllStackTraces().values()) { + for (StackTraceElement e : elements) { + assertFalse("ClassName: " + e.getClassName(), e.getClassName().contains("juicefs") && !e.getClassName().equals("io.juicefs.utils.BgTaskUtilTest")); + } + } + } +} diff --git a/sdk/java/src/test/resources/log4j.properties b/sdk/java/src/test/resources/log4j.properties new file mode 100644 index 000000000000..9d962458dcc1 --- /dev/null +++ b/sdk/java/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppenderk +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n \ No newline at end of file