Skip to content

Commit

Permalink
Hadoop: fix background task thread leak
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng committed Dec 11, 2024
1 parent a33d9bb commit ce93ec9
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 142 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/sdktest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ jobs:
ln -s inner_sym_target inner_sym_link
mkdir etc
chmod 777 etc
echo `hostname` > etc/nodes
echo `hostname` > /jfs/etc/nodes
echo "tom:3001" > /jfs/etc/users
echo "g1:2001:tom" > /jfs/etc/groups
- name: Sdk Test
run: |
Expand Down
33 changes: 13 additions & 20 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class JuiceFileSystem extends FilterFileSystem {
private static boolean fileChecksumEnabled = false;
private static boolean distcpPatched = false;

private FileSystem emptierFs;

static {
PatchUtil.patchBefore("org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream",
"waitUntilLeaseIsRevoked",
Expand All @@ -69,24 +67,19 @@ public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
fileChecksumEnabled = Boolean.parseBoolean(getConf(conf, "file.checksum", "false"));
if (!Boolean.parseBoolean(getConf(conf, "disable-trash-emptier", "false"))) {
startTrashEmptier(uri, conf);
}
}

private void startTrashEmptier(URI uri, final Configuration conf) throws IOException {
if (BgTaskUtil.isRunning(uri.getHost(), "Trash emptier")) {
return;
}
try {
UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs"));
emptierFs = superUser.doAs((PrivilegedExceptionAction<FileSystem>) () -> {
JuiceFileSystemImpl fs = new JuiceFileSystemImpl();
fs.initialize(uri, conf);
return fs;
});
BgTaskUtil.startTrashEmptier(uri.getHost(), "Trash emptier", emptierFs, new Trash(emptierFs, conf).getEmptier(), TimeUnit.MINUTES.toMillis(10));
} catch (Exception e) {
throw new IOException("start trash failed!",e);
try {
Configuration newConf = new Configuration(conf);
newConf.setBoolean("juicefs.internal-bg-task", true);
UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs"));
FileSystem emptierFs = superUser.doAs((PrivilegedExceptionAction<FileSystem>) () -> {
JuiceFileSystemImpl fs = new JuiceFileSystemImpl();
fs.initialize(uri, newConf);
return fs;
});
BgTaskUtil.startTrashEmptier(uri.getHost(), emptierFs, new Trash(emptierFs, conf).getEmptier(), 10, TimeUnit.MINUTES);
} catch (Exception e) {
LOG.warn("start trash emptier for {} failed", uri.getHost(), e);
}
}
}

Expand Down
114 changes: 59 additions & 55 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ static String loadVersion() {
private long handle;
private UserGroupInformation ugi;
private String homeDirPrefix = "/user";
private Map<String, String> cachedHosts = new HashMap<>(); // (ip, hostname)
private ConsistentHash<String> hash = new ConsistentHash<>(1, Collections.singletonList("localhost"));
private static Map<String, String> cachedHosts = new HashMap<>(); // (ip, hostname)
private static ConsistentHash<String> hash = new ConsistentHash<>(1, Collections.singletonList("localhost"));
private FsPermission uMask;
private String hflushMethod;

Expand All @@ -135,6 +135,8 @@ static String loadVersion() {
private static final String USERNAME_UID_PATTERN = "[a-zA-Z0-9_-]+:[0-9]+";
private static final String GROUPNAME_GID_USERNAMES_PATTERN = "[a-zA-Z0-9_-]+:[0-9]+:[,a-zA-Z0-9_-]+";

private volatile static JuiceFileSystemImpl bgfs;

/*
go call back
*/
Expand Down Expand Up @@ -384,7 +386,6 @@ public void initialize(URI uri, Configuration conf) throws IOException {
groups.clear();
groups.add(supergroup);
}
String mountpoint = getConf(conf, "mountpoint", "");

synchronized (JuiceFileSystemImpl.class) {
if (callBack == null) {
Expand Down Expand Up @@ -448,10 +449,10 @@ public void initialize(URI uri, Configuration conf) throws IOException {
if (handle <= 0) {
throw new IOException("JuiceFS initialized failed for jfs://" + name);
}

initCache(conf);
refreshCache(conf);

boolean asBgTask = conf.getBoolean("juicefs.internal-bg-task", false);
if (!asBgTask) {
BgTaskUtil.register(name, handle);
}
homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");
this.workingDir = getHomeDirectory();

Expand Down Expand Up @@ -514,15 +515,36 @@ public void initialize(URI uri, Configuration conf) throws IOException {
}
}

String uidFile = getConf(conf, "users", null);
if (!isEmpty(uidFile) || !isEmpty(groupingFile)) {
updateUidAndGrouping(uidFile, groupingFile);
if (!isSuperGroupFileSystem) {
refreshUidAndGrouping(uidFile, groupingFile);
if (!asBgTask && !isSuperGroupFileSystem) {
// use juicefs.users and juicefs.groups for global mapping
String uidFile = getConf(conf, "users", null);
if (!isEmpty(uidFile) || !isEmpty(groupingFile)) {
BgTaskUtil.putTask(name, "Refresh guid", () -> {
try {
updateUidAndGrouping(uidFile, groupingFile);
} catch (IOException e) {
LOG.warn("Update guid task failed for {}", name, e);
}
}, 1, 1, TimeUnit.MINUTES);
}
}
}

private JuiceFileSystemImpl getBgfs() throws IOException {
if (bgfs == null) {
synchronized (JuiceFileSystemImpl.class) {
if (bgfs == null) {
Configuration newConf = new Configuration(getConf());
newConf.setBoolean("juicefs.internal-bg-task", true);
JuiceFileSystemImpl fs = new JuiceFileSystemImpl();
fs.initialize(uri, newConf);
bgfs = fs;
}
}
}
return bgfs;
}

private RangerConfig checkAndGetRangerParams(Configuration conf) throws RuntimeException, IOException {
String rangerRestUrl = getConf(conf, "ranger-rest-url", "");
if (!rangerRestUrl.startsWith("http")) {
Expand Down Expand Up @@ -574,38 +596,16 @@ private boolean isEmpty(String str) {

private String readFile(String file) throws IOException {
Path path = new Path(file);
URI uri = path.toUri();
FileSystem fs;
try {
URI defaultUri = getDefaultUri(getConf());
if (uri.getScheme() == null) {
uri = defaultUri;
} else {
if (uri.getAuthority() == null && (uri.getScheme().equals(defaultUri.getScheme()))) {
uri = defaultUri;
}
}
if (getScheme().equals(uri.getScheme()) &&
(name != null && name.equals(uri.getAuthority()))) {
fs = this;
} else {
fs = path.getFileSystem(getConf());
}

FileStatus lastStatus = lastFileStatus.get(file);
FileStatus status = fs.getFileStatus(path);
if (lastStatus != null && status.getModificationTime() == lastStatus.getModificationTime()
&& status.getLen() == lastStatus.getLen()) {
return null;
}
FSDataInputStream in = fs.open(path);
FileStatus lastStatus = lastFileStatus.get(file);
FileStatus status = getBgfs().getFileStatus(path);
if (lastStatus != null && status.getModificationTime() == lastStatus.getModificationTime()
&& status.getLen() == lastStatus.getLen()) {
return null;
}
try (FSDataInputStream in = getBgfs().open(path)) {
String res = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n"));
in.close();
lastFileStatus.put(file, status);
return res;
} catch (IOException e) {
LOG.warn(String.format("read %s failed", file), e);
throw e;
}
}

Expand Down Expand Up @@ -639,12 +639,6 @@ private void updateUidAndGrouping(String uidFile, String groupFile) throws IOExc
groups = Arrays.stream(group.split(",")).collect(Collectors.toSet());
}

private void refreshUidAndGrouping(String uidFile, String groupFile) {
BgTaskUtil.startScheduleTask(name, "Refresh guid", () -> {
updateUidAndGrouping(uidFile, groupFile);
}, 1, 1, TimeUnit.MINUTES);
}

private void initializeStorageIds(Configuration conf) throws IOException {
try {
Class<?> clazz = Class.forName("org.apache.hadoop.fs.BlockLocation");
Expand Down Expand Up @@ -837,21 +831,15 @@ private void initCache(Configuration conf) {
if (!newCachedHosts.equals(cachedHosts)) {
List<String> ips = new ArrayList<>(newCachedHosts.keySet());
LOG.debug("update nodes to: " + String.join(",", ips));
this.hash = new ConsistentHash<>(100, ips);
this.cachedHosts = newCachedHosts;
hash = new ConsistentHash<>(100, ips);
cachedHosts = newCachedHosts;
}
}
} catch (Throwable e) {
LOG.warn("failed to discover nodes", e);
}
}

private void refreshCache(Configuration conf) {
BgTaskUtil.startScheduleTask(name, "Node fetcher", () -> {
initCache(conf);
}, 10, 10, TimeUnit.MINUTES);
}

private List<String> discoverNodes(String urls) {
LOG.debug("fetching nodes from {}", urls);
NodesFetcher fetcher = NodesFetcherBuilder.buildFetcher(urls, name, this);
Expand Down Expand Up @@ -916,6 +904,11 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
String[] host = new String[]{"localhost"};
return new BlockLocation[]{new BlockLocation(name, host, 0L, file.getLen())};
}

BgTaskUtil.putTask(name, "Node fetcher", () -> {
initCache(getConf());
}, 10, 10, TimeUnit.MINUTES);

if (file.getLen() <= start + len) {
len = file.getLen() - start;
}
Expand Down Expand Up @@ -1872,6 +1865,17 @@ public void setTimes(Path p, long mtime, long atime) throws IOException {
@Override
public void close() throws IOException {
super.close();
BgTaskUtil.unregister(name, handle, () -> {
try {
if (bgfs != null) {
bgfs.close();
}
} catch (IOException e) {
LOG.warn("close background filesystem failed", e);
} finally {
bgfs = null;
}
});
lib.jfs_term(Thread.currentThread().getId(), handle);
if (metricsEnable) {
JuiceFSInstrumentation.close();
Expand Down
Loading

0 comments on commit ce93ec9

Please sign in to comment.