Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng committed Dec 11, 2024
1 parent ce93ec9 commit 895bddf
Showing 1 changed file with 49 additions and 27 deletions.
76 changes: 49 additions & 27 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -110,12 +111,13 @@ static String loadVersion() {
private long handle;
private UserGroupInformation ugi;
private String homeDirPrefix = "/user";
private static Map<String, String> cachedHosts = new HashMap<>(); // (ip, hostname)
private static ConsistentHash<String> hash = new ConsistentHash<>(1, Collections.singletonList("localhost"));
private static final Map<String, Map<String, String>> cachedHostsForName = new ConcurrentHashMap<>(); // (name -> (ip -> hostname))
private static final Map<String, ConsistentHash<String>> hashForName = new ConcurrentHashMap<>(); // (name -> consistentHash)
private static final Map<String, FileStatus> lastFileStatus = new ConcurrentHashMap<>();

private FsPermission uMask;
private String hflushMethod;

private Map<String, FileStatus> lastFileStatus = new HashMap<>();
private static final DirectBufferPool directBufferPool = new DirectBufferPool();

private boolean metricsEnable = false;
Expand All @@ -135,7 +137,7 @@ static String loadVersion() {
private static final String USERNAME_UID_PATTERN = "[a-zA-Z0-9_-]+:[0-9]+";
private static final String GROUPNAME_GID_USERNAMES_PATTERN = "[a-zA-Z0-9_-]+:[0-9]+:[,a-zA-Z0-9_-]+";

private volatile static JuiceFileSystemImpl bgfs;
private static final Map<String, FileSystem> bgfss = new ConcurrentHashMap<>(); // ("scheme://host" -> fsInstance)

/*
go call back
Expand Down Expand Up @@ -530,17 +532,31 @@ public void initialize(URI uri, Configuration conf) throws IOException {
}
}

private JuiceFileSystemImpl getBgfs() throws IOException {
if (bgfs == null) {
synchronized (JuiceFileSystemImpl.class) {
if (bgfs == null) {
Configuration newConf = new Configuration(getConf());
newConf.setBoolean("juicefs.internal-bg-task", true);
JuiceFileSystemImpl fs = new JuiceFileSystemImpl();
fs.initialize(uri, newConf);
bgfs = fs;
}
private FileSystem getBgfs(Path p) throws IOException {
String scheme = p.toUri().getScheme();
String authority = p.toUri().getAuthority();
if (scheme == null) {
scheme = FileSystem.getDefaultUri(getConf()).getScheme();
}
if (scheme != null && authority == null && scheme.equals(FileSystem.getDefaultUri(getConf()).getScheme())) {
authority = FileSystem.getDefaultUri(getConf()).getAuthority();
}
String key = scheme + "://" + authority;

FileSystem bgfs = bgfss.computeIfAbsent(key, k -> {
try {
Configuration newConf = new Configuration(getConf());
newConf.setBoolean("juicefs.internal-bg-task", true);
FileSystem fs = FileSystem.newInstance(p.toUri(), newConf);
fs.initialize(uri, newConf);
return fs;
} catch (Exception e) {
LOG.warn("create background fs failed", e);
return null;
}
});
if (bgfs == null) {
throw new IOException("create background fs failed");
}
return bgfs;
}
Expand Down Expand Up @@ -597,12 +613,13 @@ private boolean isEmpty(String str) {
private String readFile(String file) throws IOException {
Path path = new Path(file);
FileStatus lastStatus = lastFileStatus.get(file);
FileStatus status = getBgfs().getFileStatus(path);
FileSystem bgfs = getBgfs(path);
FileStatus status = bgfs.getFileStatus(path);
if (lastStatus != null && status.getModificationTime() == lastStatus.getModificationTime()
&& status.getLen() == lastStatus.getLen()) {
return null;
}
try (FSDataInputStream in = getBgfs().open(path)) {
try (FSDataInputStream in = bgfs.open(path)) {
String res = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n"));
lastFileStatus.put(file, status);
return res;
Expand Down Expand Up @@ -828,11 +845,11 @@ private void initCache(Configuration conf) {
}

// if newCachedHosts are not changed, skip
if (!newCachedHosts.equals(cachedHosts)) {
if (!newCachedHosts.equals(cachedHostsForName.get(name))) {
List<String> ips = new ArrayList<>(newCachedHosts.keySet());
LOG.debug("update nodes to: " + String.join(",", ips));
hash = new ConsistentHash<>(100, ips);
cachedHosts = newCachedHosts;
hashForName.put(name, new ConsistentHash<>(100, ips));
cachedHostsForName.put(name, newCachedHosts);
}
}
} catch (Throwable e) {
Expand All @@ -856,7 +873,13 @@ private BlockLocation makeLocation(long code, long start, long len) {
BlockLocation blockLocation;
String[] ns = new String[cacheReplica];
String[] hs = new String[cacheReplica];
String host = cachedHosts.getOrDefault(hash.get(code + "-" + index), "localhost");

Map<String, String> cachedHosts = cachedHostsForName.get(name);
ConsistentHash<String> hash = hashForName.get(name);
String host = "localhost";
if (cachedHosts != null && hash != null) {
host = cachedHosts.get(hash.get(code + "-" + index));
}
ns[0] = host + ":50010";
hs[0] = host;
for (int i = 1; i < cacheReplica; i++) {
Expand Down Expand Up @@ -1866,15 +1889,14 @@ public void setTimes(Path p, long mtime, long atime) throws IOException {
public void close() throws IOException {
super.close();
BgTaskUtil.unregister(name, handle, () -> {
try {
if (bgfs != null) {
bgfs.close();
for (FileSystem fs : bgfss.values()) {
try {
fs.close();
} catch (IOException e) {
LOG.warn("close background filesystem failed", e);
}
} catch (IOException e) {
LOG.warn("close background filesystem failed", e);
} finally {
bgfs = null;
}
bgfss.clear();
});
lib.jfs_term(Thread.currentThread().getId(), handle);
if (metricsEnable) {
Expand Down

0 comments on commit 895bddf

Please sign in to comment.