From 895bddfb144db288903047ed0555eb136a0d8859 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Wed, 11 Dec 2024 19:25:56 +0800 Subject: [PATCH] wip --- .../java/io/juicefs/JuiceFileSystemImpl.java | 76 ++++++++++++------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index d973817a94e8..101a36eae632 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -60,6 +60,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.jar.JarFile; import java.util.stream.Collectors; @@ -110,12 +111,13 @@ static String loadVersion() { private long handle; private UserGroupInformation ugi; private String homeDirPrefix = "/user"; - private static Map cachedHosts = new HashMap<>(); // (ip, hostname) - private static ConsistentHash hash = new ConsistentHash<>(1, Collections.singletonList("localhost")); + private static final Map> cachedHostsForName = new ConcurrentHashMap<>(); // (name -> (ip -> hostname)) + private static final Map> hashForName = new ConcurrentHashMap<>(); // (name -> consistentHash) + private static final Map lastFileStatus = new ConcurrentHashMap<>(); + private FsPermission uMask; private String hflushMethod; - private Map lastFileStatus = new HashMap<>(); private static final DirectBufferPool directBufferPool = new DirectBufferPool(); private boolean metricsEnable = false; @@ -135,7 +137,7 @@ static String loadVersion() { private static final String USERNAME_UID_PATTERN = "[a-zA-Z0-9_-]+:[0-9]+"; private static final String GROUPNAME_GID_USERNAMES_PATTERN = "[a-zA-Z0-9_-]+:[0-9]+:[,a-zA-Z0-9_-]+"; - private volatile static JuiceFileSystemImpl bgfs; + private static final Map bgfss = new ConcurrentHashMap<>(); // ("scheme://host" -> fsInstance) /* go call back @@ -530,17 +532,31 @@ public void initialize(URI uri, Configuration conf) throws IOException { } } - private JuiceFileSystemImpl getBgfs() throws IOException { - if (bgfs == null) { - synchronized (JuiceFileSystemImpl.class) { - if (bgfs == null) { - Configuration newConf = new Configuration(getConf()); - newConf.setBoolean("juicefs.internal-bg-task", true); - JuiceFileSystemImpl fs = new JuiceFileSystemImpl(); - fs.initialize(uri, newConf); - bgfs = fs; - } + private FileSystem getBgfs(Path p) throws IOException { + String scheme = p.toUri().getScheme(); + String authority = p.toUri().getAuthority(); + if (scheme == null) { + scheme = FileSystem.getDefaultUri(getConf()).getScheme(); + } + if (scheme != null && authority == null && scheme.equals(FileSystem.getDefaultUri(getConf()).getScheme())) { + authority = FileSystem.getDefaultUri(getConf()).getAuthority(); + } + String key = scheme + "://" + authority; + + FileSystem bgfs = bgfss.computeIfAbsent(key, k -> { + try { + Configuration newConf = new Configuration(getConf()); + newConf.setBoolean("juicefs.internal-bg-task", true); + FileSystem fs = FileSystem.newInstance(p.toUri(), newConf); + fs.initialize(uri, newConf); + return fs; + } catch (Exception e) { + LOG.warn("create background fs failed", e); + return null; } + }); + if (bgfs == null) { + throw new IOException("create background fs failed"); } return bgfs; } @@ -597,12 +613,13 @@ private boolean isEmpty(String str) { private String readFile(String file) throws IOException { Path path = new Path(file); FileStatus lastStatus = lastFileStatus.get(file); - FileStatus status = getBgfs().getFileStatus(path); + FileSystem bgfs = getBgfs(path); + FileStatus status = bgfs.getFileStatus(path); if (lastStatus != null && status.getModificationTime() == lastStatus.getModificationTime() && status.getLen() == lastStatus.getLen()) { return null; } - try (FSDataInputStream in = getBgfs().open(path)) { + try (FSDataInputStream in = bgfs.open(path)) { String res = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n")); lastFileStatus.put(file, status); return res; @@ -828,11 +845,11 @@ private void initCache(Configuration conf) { } // if newCachedHosts are not changed, skip - if (!newCachedHosts.equals(cachedHosts)) { + if (!newCachedHosts.equals(cachedHostsForName.get(name))) { List ips = new ArrayList<>(newCachedHosts.keySet()); LOG.debug("update nodes to: " + String.join(",", ips)); - hash = new ConsistentHash<>(100, ips); - cachedHosts = newCachedHosts; + hashForName.put(name, new ConsistentHash<>(100, ips)); + cachedHostsForName.put(name, newCachedHosts); } } } catch (Throwable e) { @@ -856,7 +873,13 @@ private BlockLocation makeLocation(long code, long start, long len) { BlockLocation blockLocation; String[] ns = new String[cacheReplica]; String[] hs = new String[cacheReplica]; - String host = cachedHosts.getOrDefault(hash.get(code + "-" + index), "localhost"); + + Map cachedHosts = cachedHostsForName.get(name); + ConsistentHash hash = hashForName.get(name); + String host = "localhost"; + if (cachedHosts != null && hash != null) { + host = cachedHosts.get(hash.get(code + "-" + index)); + } ns[0] = host + ":50010"; hs[0] = host; for (int i = 1; i < cacheReplica; i++) { @@ -1866,15 +1889,14 @@ public void setTimes(Path p, long mtime, long atime) throws IOException { public void close() throws IOException { super.close(); BgTaskUtil.unregister(name, handle, () -> { - try { - if (bgfs != null) { - bgfs.close(); + for (FileSystem fs : bgfss.values()) { + try { + fs.close(); + } catch (IOException e) { + LOG.warn("close background filesystem failed", e); } - } catch (IOException e) { - LOG.warn("close background filesystem failed", e); - } finally { - bgfs = null; } + bgfss.clear(); }); lib.jfs_term(Thread.currentThread().getId(), handle); if (metricsEnable) {