Skip to content

Commit

Permalink
Hadoop: make libjfs singleton to avoid possible jnr weakhashmap infin…
Browse files Browse the repository at this point in the history
…ity loop
  • Loading branch information
tangyoupeng committed Feb 18, 2024
1 parent fc89521 commit 3557cd9
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 54 deletions.
111 changes: 57 additions & 54 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
Expand All @@ -84,7 +83,8 @@ public class JuiceFileSystemImpl extends FileSystem {
private int minBufferSize;
private int cacheReplica;
private boolean fileChecksumEnabled;
private Libjfs lib;
private Libjfs lib = loadLibrary();

private long handle;
private UserGroupInformation ugi;
private String homeDirPrefix = "/user";
Expand Down Expand Up @@ -347,7 +347,6 @@ public void initialize(URI uri, Configuration conf) throws IOException {
String supergroup = getConf(conf, "supergroup", conf.get("dfs.permissions.superusergroup", "supergroup"));
String mountpoint = getConf(conf, "mountpoint", "");

lib = loadLibrary();
synchronized (JuiceFileSystemImpl.class) {
if (callBack == null) {
callBack = new LogCallBackImpl(lib);
Expand Down Expand Up @@ -570,7 +569,7 @@ private static void initStubLoader() {
}
}

public static Libjfs loadLibrary() throws IOException {
public static Libjfs loadLibrary() {
initStubLoader();

LibraryLoader<Libjfs> libjfsLibraryLoader = LibraryLoader.create(Libjfs.class);
Expand Down Expand Up @@ -609,69 +608,73 @@ public static Libjfs loadLibrary() throws IOException {
}
URLConnection con;
try {
con = location.openConnection();
} catch (FileNotFoundException e) {
// jar may changed
return loadExistLib(libjfsLibraryLoader, dir, name, libFile);
}
if (location.getProtocol().equals("jar") && (con instanceof JarURLConnection)) {
LOG.debug("juicefs-hadoop.jar is a nested jar");
JarURLConnection connection = (JarURLConnection) con;
JarFile jfsJar = connection.getJarFile();
ZipEntry entry = jfsJar.getJarEntry(resource);
soTime = entry.getLastModifiedTime().toMillis();
ins = jfsJar.getInputStream(entry);
} else {
URI locationUri;
try {
locationUri = location.toURI();
} catch (URISyntaxException e) {
con = location.openConnection();
} catch (FileNotFoundException e) {
// jar may changed
return loadExistLib(libjfsLibraryLoader, dir, name, libFile);
}
if (Files.isDirectory(Paths.get(locationUri))) { // for debug: sdk/java/target/classes
soTime = con.getLastModified();
ins = JuiceFileSystemImpl.class.getClassLoader().getResourceAsStream(resource);
if (location.getProtocol().equals("jar") && (con instanceof JarURLConnection)) {
LOG.debug("juicefs-hadoop.jar is a nested jar");
JarURLConnection connection = (JarURLConnection) con;
JarFile jfsJar = connection.getJarFile();
ZipEntry entry = jfsJar.getJarEntry(resource);
soTime = entry.getLastModifiedTime().toMillis();
ins = jfsJar.getInputStream(entry);
} else {
JarFile jfsJar;
URI locationUri;
try {
jfsJar = new JarFile(locationUri.getPath());
} catch (FileNotFoundException fne) {
locationUri = location.toURI();
} catch (URISyntaxException e) {
return loadExistLib(libjfsLibraryLoader, dir, name, libFile);
}
ZipEntry entry = jfsJar.getJarEntry(resource);
soTime = entry.getLastModifiedTime().toMillis();
ins = jfsJar.getInputStream(entry);
if (Files.isDirectory(Paths.get(locationUri))) { // for debug: sdk/java/target/classes
soTime = con.getLastModified();
ins = JuiceFileSystemImpl.class.getClassLoader().getResourceAsStream(resource);
} else {
JarFile jfsJar;
try {
jfsJar = new JarFile(locationUri.getPath());
} catch (FileNotFoundException fne) {
return loadExistLib(libjfsLibraryLoader, dir, name, libFile);
}
ZipEntry entry = jfsJar.getJarEntry(resource);
soTime = entry.getLastModifiedTime().toMillis();
ins = jfsJar.getInputStream(entry);
}
}
}

synchronized (JuiceFileSystemImpl.class) {
if (!libFile.exists() || libFile.lastModified() < soTime) {
// try the name for current user
libFile = new File(dir, System.getProperty("user.name") + "-" + name);
synchronized (JuiceFileSystemImpl.class) {
if (!libFile.exists() || libFile.lastModified() < soTime) {
InputStream reader = new GZIPInputStream(ins);
File tmp = File.createTempFile(name, null, dir);
FileOutputStream writer = new FileOutputStream(tmp);
byte[] buffer = new byte[128 << 10];
int bytesRead = 0;
while ((bytesRead = reader.read(buffer)) != -1) {
writer.write(buffer, 0, bytesRead);
}
writer.close();
reader.close();
tmp.setLastModified(soTime);
tmp.setReadable(true, false);
try {
File org = new File(dir, name);
Files.move(tmp.toPath(), org.toPath(), StandardCopyOption.ATOMIC_MOVE);
libFile = org;
} catch (Exception ade) {
Files.move(tmp.toPath(), libFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
// try the name for current user
libFile = new File(dir, System.getProperty("user.name") + "-" + name);
if (!libFile.exists() || libFile.lastModified() < soTime) {
InputStream reader = new GZIPInputStream(ins);
File tmp = File.createTempFile(name, null, dir);
FileOutputStream writer = new FileOutputStream(tmp);
byte[] buffer = new byte[128 << 10];
int bytesRead = 0;
while ((bytesRead = reader.read(buffer)) != -1) {
writer.write(buffer, 0, bytesRead);
}
writer.close();
reader.close();
tmp.setLastModified(soTime);
tmp.setReadable(true, false);
try {
File org = new File(dir, name);
Files.move(tmp.toPath(), org.toPath(), StandardCopyOption.ATOMIC_MOVE);
libFile = org;
} catch (Exception ade) {
Files.move(tmp.toPath(), libFile.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
}
}
}
ins.close();
} catch (Exception e) {
throw new RuntimeException("Init libjfs failed", e);
}
ins.close();
return libjfsLibraryLoader.load(libFile.getAbsolutePath());
}

Expand Down
23 changes: 23 additions & 0 deletions sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -921,4 +924,24 @@ public void testUserWithMultiGroups() throws Exception {
tomFs.close();
superFs.close();
}

public void testConcurrentCreate() throws Exception {
int threads = 100;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
pool.submit(() -> {
JuiceFileSystem jfs = new JuiceFileSystem();
try {
jfs.initialize(URI.create("jfs://dev/"), cfg);
jfs.listStatus(new Path("/"));
jfs.close();
} catch (IOException e) {
fail("concurrent create failed");
System.exit(1);
}
});
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
}
}

0 comments on commit 3557cd9

Please sign in to comment.